diff options
-rw-r--r-- | chrome/browser/renderer_host/socket_stream_dispatcher_host.cc | 1 | ||||
-rw-r--r-- | net/net.gyp | 2 | ||||
-rw-r--r-- | net/socket_stream/socket_stream.cc | 15 | ||||
-rw-r--r-- | net/socket_stream/socket_stream.h | 9 | ||||
-rw-r--r-- | net/socket_stream/socket_stream_throttle.cc | 80 | ||||
-rw-r--r-- | net/socket_stream/socket_stream_throttle.h | 81 | ||||
-rw-r--r-- | net/websockets/websocket_job.cc | 73 | ||||
-rw-r--r-- | net/websockets/websocket_job.h | 16 | ||||
-rw-r--r-- | net/websockets/websocket_job_unittest.cc | 19 | ||||
-rw-r--r-- | net/websockets/websocket_throttle.cc | 219 | ||||
-rw-r--r-- | net/websockets/websocket_throttle.h | 48 | ||||
-rw-r--r-- | net/websockets/websocket_throttle_unittest.cc | 160 |
12 files changed, 305 insertions, 418 deletions
diff --git a/chrome/browser/renderer_host/socket_stream_dispatcher_host.cc b/chrome/browser/renderer_host/socket_stream_dispatcher_host.cc index daddfcc..b99e70d 100644 --- a/chrome/browser/renderer_host/socket_stream_dispatcher_host.cc +++ b/chrome/browser/renderer_host/socket_stream_dispatcher_host.cc @@ -14,7 +14,6 @@ SocketStreamDispatcherHost::SocketStreamDispatcherHost() : receiver_(NULL) { net::WebSocketJob::EnsureInit(); - net::WebSocketThrottle::Init(); } SocketStreamDispatcherHost::~SocketStreamDispatcherHost() { diff --git a/net/net.gyp b/net/net.gyp index 571f9ae..5637f41 100644 --- a/net/net.gyp +++ b/net/net.gyp @@ -453,8 +453,6 @@ 'socket_stream/socket_stream_job_manager.h', 'socket_stream/socket_stream_metrics.cc', 'socket_stream/socket_stream_metrics.h', - 'socket_stream/socket_stream_throttle.cc', - 'socket_stream/socket_stream_throttle.h', 'spdy/spdy_bitmasks.h', 'spdy/spdy_frame_builder.cc', 'spdy/spdy_frame_builder.h', diff --git a/net/socket_stream/socket_stream.cc b/net/socket_stream/socket_stream.cc index 03d7a9f..e46f306 100644 --- a/net/socket_stream/socket_stream.cc +++ b/net/socket_stream/socket_stream.cc @@ -26,7 +26,6 @@ #include "net/socket/socks_client_socket.h" #include "net/socket/tcp_client_socket.h" #include "net/socket_stream/socket_stream_metrics.h" -#include "net/socket_stream/socket_stream_throttle.h" #include "net/url_request/url_request.h" static const int kMaxPendingSendAllowed = 32768; // 32 kilobytes. @@ -59,16 +58,12 @@ SocketStream::SocketStream(const GURL& url, Delegate* delegate) current_write_buf_(NULL), write_buf_offset_(0), write_buf_size_(0), - throttle_( - SocketStreamThrottle::GetSocketStreamThrottleForScheme( - url.scheme())), metrics_(new SocketStreamMetrics(url)) { DCHECK(MessageLoop::current()) << "The current MessageLoop must exist"; DCHECK_EQ(MessageLoop::TYPE_IO, MessageLoop::current()->type()) << "The current MessageLoop must be TYPE_IO"; DCHECK(delegate_); - DCHECK(throttle_); } SocketStream::~SocketStream() { @@ -235,7 +230,6 @@ void SocketStream::Finish(int result) { if (delegate) { delegate->OnClose(this); } - throttle_->OnClose(this); Release(); } @@ -275,13 +269,12 @@ int SocketStream::DidReceiveData(int result) { net_log_.AddEvent(NetLog::TYPE_SOCKET_STREAM_RECEIVED); int len = result; metrics_->OnRead(len); - result = throttle_->OnRead(this, read_buf_->data(), len, &io_callback_); if (delegate_) { // Notify recevied data to delegate. delegate_->OnReceivedData(this, read_buf_->data(), len); } read_buf_ = NULL; - return result; + return OK; } int SocketStream::DidSendData(int result) { @@ -289,8 +282,6 @@ int SocketStream::DidSendData(int result) { net_log_.AddEvent(NetLog::TYPE_SOCKET_STREAM_SENT); int len = result; metrics_->OnWrite(len); - result = throttle_->OnWrite(this, current_write_buf_->data(), len, - &io_callback_); current_write_buf_ = NULL; if (delegate_) delegate_->OnSentData(this, len); @@ -309,7 +300,7 @@ int SocketStream::DidSendData(int result) { } else { write_buf_offset_ += len; } - return result; + return OK; } void SocketStream::OnIOCompleted(int result) { @@ -490,7 +481,7 @@ int SocketStream::DoResolveHost() { int SocketStream::DoResolveHostComplete(int result) { if (result == OK) { next_state_ = STATE_TCP_CONNECT; - result = throttle_->OnStartOpenConnection(this, &io_callback_); + result = delegate_->OnStartOpenConnection(this, &io_callback_); if (result == net::ERR_IO_PENDING) metrics_->OnWaitConnection(); } else { diff --git a/net/socket_stream/socket_stream.h b/net/socket_stream/socket_stream.h index e8b250e..b8fd0bf 100644 --- a/net/socket_stream/socket_stream.h +++ b/net/socket_stream/socket_stream.h @@ -18,6 +18,7 @@ #include "net/base/completion_callback.h" #include "net/base/io_buffer.h" #include "net/base/net_log.h" +#include "net/base/net_errors.h" #include "net/http/http_auth.h" #include "net/http/http_auth_cache.h" #include "net/http/http_auth_handler.h" @@ -34,7 +35,6 @@ class HttpAuthHandlerFactory; class SSLConfigService; class SingleRequestHostResolver; class SocketStreamMetrics; -class SocketStreamThrottle; // SocketStream is used to implement Web Sockets. // It provides plain full-duplex stream with proxy and SSL support. @@ -57,6 +57,11 @@ class SocketStream : public base::RefCountedThreadSafe<SocketStream> { public: virtual ~Delegate() {} + virtual int OnStartOpenConnection(SocketStream* socket, + CompletionCallback* callback) { + return OK; + } + // Called when socket stream has been connected. The socket stream accepts // at most |max_pending_send_allowed| so that a client of the socket stream // should keep track of how much it has pending and shouldn't go over @@ -313,8 +318,6 @@ class SocketStream : public base::RefCountedThreadSafe<SocketStream> { int write_buf_size_; PendingDataQueue pending_write_bufs_; - SocketStreamThrottle* throttle_; - scoped_ptr<SocketStreamMetrics> metrics_; DISALLOW_COPY_AND_ASSIGN(SocketStream); diff --git a/net/socket_stream/socket_stream_throttle.cc b/net/socket_stream/socket_stream_throttle.cc deleted file mode 100644 index 6a1d20d..0000000 --- a/net/socket_stream/socket_stream_throttle.cc +++ /dev/null @@ -1,80 +0,0 @@ -// Copyright (c) 2009 The Chromium Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. - -#include <string> - -#include "net/socket_stream/socket_stream_throttle.h" - -#include "base/hash_tables.h" -#include "base/singleton.h" -#include "net/base/completion_callback.h" -#include "net/socket_stream/socket_stream.h" - -namespace net { - -// Default SocketStreamThrottle. No throttling. Used for unknown URL scheme. -class DefaultSocketStreamThrottle : public SocketStreamThrottle { - private: - DefaultSocketStreamThrottle() {} - virtual ~DefaultSocketStreamThrottle() {} - friend struct DefaultSingletonTraits<DefaultSocketStreamThrottle>; - - DISALLOW_COPY_AND_ASSIGN(DefaultSocketStreamThrottle); -}; - -class SocketStreamThrottleRegistry { - public: - SocketStreamThrottle* GetSocketStreamThrottleForScheme( - const std::string& scheme); - - void RegisterSocketStreamThrottle( - const std::string& scheme, SocketStreamThrottle* throttle); - - private: - typedef base::hash_map<std::string, SocketStreamThrottle*> ThrottleMap; - - SocketStreamThrottleRegistry() {} - ~SocketStreamThrottleRegistry() {} - friend struct DefaultSingletonTraits<SocketStreamThrottleRegistry>; - - ThrottleMap throttles_; - - DISALLOW_COPY_AND_ASSIGN(SocketStreamThrottleRegistry); -}; - -SocketStreamThrottle* -SocketStreamThrottleRegistry::GetSocketStreamThrottleForScheme( - const std::string& scheme) { - ThrottleMap::const_iterator found = throttles_.find(scheme); - if (found == throttles_.end()) { - SocketStreamThrottle* throttle = - Singleton<DefaultSocketStreamThrottle>::get(); - throttles_[scheme] = throttle; - return throttle; - } - return found->second; -} - -void SocketStreamThrottleRegistry::RegisterSocketStreamThrottle( - const std::string& scheme, SocketStreamThrottle* throttle) { - throttles_[scheme] = throttle; -} - -/* static */ -SocketStreamThrottle* SocketStreamThrottle::GetSocketStreamThrottleForScheme( - const std::string& scheme) { - SocketStreamThrottleRegistry* registry = - Singleton<SocketStreamThrottleRegistry>::get(); - return registry->GetSocketStreamThrottleForScheme(scheme); -} - -/* static */ -void SocketStreamThrottle::RegisterSocketStreamThrottle( - const std::string& scheme, SocketStreamThrottle* throttle) { - SocketStreamThrottleRegistry* registry = - Singleton<SocketStreamThrottleRegistry>::get(); - registry->RegisterSocketStreamThrottle(scheme, throttle); -} - -} // namespace net diff --git a/net/socket_stream/socket_stream_throttle.h b/net/socket_stream/socket_stream_throttle.h deleted file mode 100644 index 7726cbe..0000000 --- a/net/socket_stream/socket_stream_throttle.h +++ /dev/null @@ -1,81 +0,0 @@ -// Copyright (c) 2009 The Chromium Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. - -#ifndef NET_SOCKET_STREAM_SOCKET_STREAM_THROTTLE_H_ -#define NET_SOCKET_STREAM_SOCKET_STREAM_THROTTLE_H_ - -#include <string> - -#include "base/basictypes.h" -#include "net/base/completion_callback.h" -#include "net/base/net_errors.h" - -namespace net { - -class SocketStream; - -// Abstract interface to throttle SocketStream per URL scheme. -// Each URL scheme (protocol) could define own SocketStreamThrottle. -// These methods will be called on IO thread. -class SocketStreamThrottle { - public: - // Called when |socket| is about to open connection. - // Returns net::OK if the connection can open now. - // Returns net::ERR_IO_PENDING if the connection should wait. In this case, - // |callback| will be called when it's ready to open connection. - virtual int OnStartOpenConnection(SocketStream* socket, - CompletionCallback* callback) { - // No throttle by default. - return OK; - } - - // Called when |socket| read |len| bytes of |data|. - // May wake up another waiting socket. - // Returns net::OK if |socket| can continue to run. - // Returns net::ERR_IO_PENDING if |socket| should suspend to run. In this - // case, |callback| will be called when it's ready to resume running. - virtual int OnRead(SocketStream* socket, const char* data, int len, - CompletionCallback* callback) { - // No throttle by default. - return OK; - } - - // Called when |socket| wrote |len| bytes of |data|. - // May wake up another waiting socket. - // Returns net::OK if |socket| can continue to run. - // Returns net::ERR_IO_PENDING if |socket| should suspend to run. In this - // case, |callback| will be called when it's ready to resume running. - virtual int OnWrite(SocketStream* socket, const char* data, int len, - CompletionCallback* callback) { - // No throttle by default. - return OK; - } - - // Called when |socket| is closed. - // May wake up another waiting socket. - virtual void OnClose(SocketStream* socket) {} - - // Gets SocketStreamThrottle for URL |scheme|. - // Doesn't pass ownership of the SocketStreamThrottle. - static SocketStreamThrottle* GetSocketStreamThrottleForScheme( - const std::string& scheme); - - // Registers |throttle| for URL |scheme|. - // Doesn't take ownership of |throttle|. Typically |throttle| is - // singleton instance. - static void RegisterSocketStreamThrottle( - const std::string& scheme, - SocketStreamThrottle* throttle); - - protected: - SocketStreamThrottle() {} - virtual ~SocketStreamThrottle() {} - - private: - DISALLOW_COPY_AND_ASSIGN(SocketStreamThrottle); -}; - -} // namespace net - -#endif // NET_SOCKET_STREAM_SOCKET_STREAM_THROTTLE_H_ diff --git a/net/websockets/websocket_job.cc b/net/websockets/websocket_job.cc index 59acfc5..62a62b7 100644 --- a/net/websockets/websocket_job.cc +++ b/net/websockets/websocket_job.cc @@ -10,6 +10,31 @@ #include "net/base/cookie_store.h" #include "net/http/http_util.h" #include "net/url_request/url_request_context.h" +#include "net/websockets/websocket_throttle.h" + +namespace { + +class CompletionCallbackRunner + : public base::RefCountedThreadSafe<CompletionCallbackRunner> { + public: + explicit CompletionCallbackRunner(net::CompletionCallback* callback) + : callback_(callback) { + DCHECK(callback_); + } + void Run() { + callback_->Run(net::OK); + } + private: + friend class base::RefCountedThreadSafe<CompletionCallbackRunner>; + + virtual ~CompletionCallbackRunner() {} + + net::CompletionCallback* callback_; + + DISALLOW_COPY_AND_ASSIGN(CompletionCallbackRunner); +}; + +} namespace net { @@ -75,6 +100,8 @@ void WebSocketJob::EnsureInit() { WebSocketJob::WebSocketJob(SocketStream::Delegate* delegate) : delegate_(delegate), state_(INITIALIZED), + waiting_(false), + callback_(NULL), handshake_request_sent_(0), handshake_response_header_length_(0), response_cookies_save_index_(0), @@ -128,12 +155,27 @@ void WebSocketJob::RestartWithAuth( void WebSocketJob::DetachDelegate() { state_ = CLOSED; + Singleton<WebSocketThrottle>::get()->RemoveFromQueue(this); + Singleton<WebSocketThrottle>::get()->WakeupSocketIfNecessary(); + delegate_ = NULL; if (socket_) socket_->DetachDelegate(); socket_ = NULL; } +int WebSocketJob::OnStartOpenConnection( + SocketStream* socket, CompletionCallback* callback) { + DCHECK(!callback_); + state_ = CONNECTING; + addresses_.Copy(socket->address_list().head(), true); + Singleton<WebSocketThrottle>::get()->PutInQueue(this); + if (!waiting_) + return OK; + callback_ = callback; + return ERR_IO_PENDING; +} + void WebSocketJob::OnConnected( SocketStream* socket, int max_pending_send_allowed) { if (delegate_) @@ -161,6 +203,9 @@ void WebSocketJob::OnReceivedData( void WebSocketJob::OnClose(SocketStream* socket) { state_ = CLOSED; + Singleton<WebSocketThrottle>::get()->RemoveFromQueue(this); + Singleton<WebSocketThrottle>::get()->WakeupSocketIfNecessary(); + SocketStream::Delegate* delegate = delegate_; delegate_ = NULL; socket_ = NULL; @@ -325,6 +370,9 @@ void WebSocketJob::SaveNextCookie() { "\r\n" + remaining_data; state_ = OPEN; + Singleton<WebSocketThrottle>::get()->RemoveFromQueue(this); + Singleton<WebSocketThrottle>::get()->WakeupSocketIfNecessary(); + if (delegate_) delegate_->OnReceivedData(socket_, received_data.data(), received_data.size()); @@ -376,4 +424,29 @@ GURL WebSocketJob::GetURLForCookies() const { return url.ReplaceComponents(replacements); } +const AddressList& WebSocketJob::address_list() const { + return addresses_; +} + +void WebSocketJob::SetWaiting() { + waiting_ = true; +} + +bool WebSocketJob::IsWaiting() const { + return waiting_; +} + +void WebSocketJob::Wakeup() { + waiting_ = false; + DCHECK(callback_); + // We wrap |callback_| to keep this alive while this is released. + scoped_refptr<CompletionCallbackRunner> runner = + new CompletionCallbackRunner(callback_); + callback_ = NULL; + MessageLoopForIO::current()->PostTask( + FROM_HERE, + NewRunnableMethod(runner.get(), + &CompletionCallbackRunner::Run)); +} + } // namespace net diff --git a/net/websockets/websocket_job.h b/net/websockets/websocket_job.h index 31fa503..23140c5 100644 --- a/net/websockets/websocket_job.h +++ b/net/websockets/websocket_job.h @@ -9,6 +9,7 @@ #include <vector> #include "base/ref_counted.h" +#include "net/base/address_list.h" #include "net/base/completion_callback.h" #include "net/socket_stream/socket_stream_job.h" @@ -18,10 +19,9 @@ namespace net { // WebSocket protocol specific job on SocketStream. // It captures WebSocket handshake message and handles cookie operations. -// Chome security policy doesn't allow renderer process (except dev tools) +// Chrome security policy doesn't allow renderer process (except dev tools) // see HttpOnly cookies, so it injects cookie header in handshake request and // strips set-cookie headers in handshake response. -// TODO(ukai): refactor to merge WebSocketThrottle functionality. // TODO(ukai): refactor websocket.cc to use this. class WebSocketJob : public SocketStreamJob, public SocketStream::Delegate { public: @@ -36,6 +36,7 @@ class WebSocketJob : public SocketStreamJob, public SocketStream::Delegate { explicit WebSocketJob(SocketStream::Delegate* delegate); + State state() const { return state_; } virtual void Connect(); virtual bool SendData(const char* data, int len); virtual void Close(); @@ -45,6 +46,8 @@ class WebSocketJob : public SocketStreamJob, public SocketStream::Delegate { virtual void DetachDelegate(); // SocketStream::Delegate methods. + virtual int OnStartOpenConnection( + SocketStream* socket, CompletionCallback* callback); virtual void OnConnected( SocketStream* socket, int max_pending_send_allowed); virtual void OnSentData( @@ -58,6 +61,7 @@ class WebSocketJob : public SocketStreamJob, public SocketStream::Delegate { const SocketStream* socket, int error); private: + friend class WebSocketThrottle; friend class WebSocketJobTest; virtual ~WebSocketJob(); @@ -74,8 +78,16 @@ class WebSocketJob : public SocketStreamJob, public SocketStream::Delegate { GURL GetURLForCookies() const; + const AddressList& address_list() const; + void SetWaiting(); + bool IsWaiting() const; + void Wakeup(); + SocketStream::Delegate* delegate_; State state_; + bool waiting_; + AddressList addresses_; + CompletionCallback* callback_; // for throttling. std::string original_handshake_request_; int original_handshake_request_header_length_; diff --git a/net/websockets/websocket_job_unittest.cc b/net/websockets/websocket_job_unittest.cc index de96a32..7bf9822 100644 --- a/net/websockets/websocket_job_unittest.cc +++ b/net/websockets/websocket_job_unittest.cc @@ -10,9 +10,11 @@ #include "net/base/cookie_policy.h" #include "net/base/cookie_store.h" #include "net/base/net_errors.h" +#include "net/base/sys_addrinfo.h" #include "net/socket_stream/socket_stream.h" #include "net/url_request/url_request_context.h" #include "net/websockets/websocket_job.h" +#include "net/websockets/websocket_throttle.h" #include "testing/gtest/include/gtest/gtest.h" #include "testing/gmock/include/gmock/gmock.h" #include "testing/platform_test.h" @@ -206,15 +208,28 @@ class WebSocketJobTest : public PlatformTest { websocket_ = new WebSocketJob(delegate); socket_ = new MockSocketStream(url, websocket_.get()); websocket_->InitSocketStream(socket_.get()); - websocket_->state_ = WebSocketJob::CONNECTING; websocket_->set_context(context_.get()); + websocket_->state_ = WebSocketJob::CONNECTING; + struct addrinfo addr; + memset(&addr, 0, sizeof(struct addrinfo)); + addr.ai_family = AF_INET; + addr.ai_addrlen = sizeof(struct sockaddr_in); + struct sockaddr_in sa_in; + memset(&sa_in, 0, sizeof(struct sockaddr_in)); + memcpy(&sa_in.sin_addr, "\x7f\0\0\1", 4); + addr.ai_addr = reinterpret_cast<sockaddr*>(&sa_in); + addr.ai_next = NULL; + websocket_->addresses_.Copy(&addr, true); + Singleton<WebSocketThrottle>::get()->PutInQueue(websocket_); } WebSocketJob::State GetWebSocketJobState() { return websocket_->state_; } void CloseWebSocketJob() { - if (websocket_->socket_) + if (websocket_->socket_) { websocket_->socket_->DetachDelegate(); + Singleton<WebSocketThrottle>::get()->RemoveFromQueue(websocket_); + } websocket_->state_ = WebSocketJob::CLOSED; websocket_->delegate_ = NULL; websocket_->socket_ = NULL; diff --git a/net/websockets/websocket_throttle.cc b/net/websockets/websocket_throttle.cc index 8d0d1fb..e2e98c3 100644 --- a/net/websockets/websocket_throttle.cc +++ b/net/websockets/websocket_throttle.cc @@ -13,6 +13,7 @@ #include "net/base/io_buffer.h" #include "net/base/sys_addrinfo.h" #include "net/socket_stream/socket_stream.h" +#include "net/websockets/websocket_job.h" namespace net { @@ -41,119 +42,7 @@ static std::string AddrinfoToHashkey(const struct addrinfo* addrinfo) { } } -// State for WebSocket protocol on each SocketStream. -// This is owned in SocketStream as UserData keyed by WebSocketState::kKeyName. -// This is alive between connection starts and handshake is finished. -// In this class, it doesn't check actual handshake finishes, but only checks -// end of header is found in read data. -class WebSocketThrottle::WebSocketState : public SocketStream::UserData { - public: - explicit WebSocketState(const AddressList& addrs) - : address_list_(addrs), - callback_(NULL), - waiting_(false), - handshake_finished_(false), - buffer_(NULL) { - } - ~WebSocketState() {} - - int OnStartOpenConnection(CompletionCallback* callback) { - DCHECK(!callback_); - if (!waiting_) - return OK; - callback_ = callback; - return ERR_IO_PENDING; - } - - int OnRead(const char* data, int len, CompletionCallback* callback) { - DCHECK(!waiting_); - DCHECK(!callback_); - DCHECK(!handshake_finished_); - static const int kBufferSize = 8129; - - if (!buffer_) { - // Fast path. - int eoh = HttpUtil::LocateEndOfHeaders(data, len, 0); - if (eoh > 0) { - handshake_finished_ = true; - return OK; - } - buffer_ = new GrowableIOBuffer(); - buffer_->SetCapacity(kBufferSize); - } else if (buffer_->RemainingCapacity() < len) { - buffer_->SetCapacity(buffer_->capacity() + kBufferSize); - } - memcpy(buffer_->data(), data, len); - buffer_->set_offset(buffer_->offset() + len); - - int eoh = HttpUtil::LocateEndOfHeaders(buffer_->StartOfBuffer(), - buffer_->offset(), 0); - handshake_finished_ = (eoh > 0); - return OK; - } - - const AddressList& address_list() const { return address_list_; } - void SetWaiting() { waiting_ = true; } - bool IsWaiting() const { return waiting_; } - bool HandshakeFinished() const { return handshake_finished_; } - void Wakeup() { - waiting_ = false; - // We wrap |callback_| to keep this alive while this is released. - scoped_refptr<CompletionCallbackRunner> runner = - new CompletionCallbackRunner(callback_); - callback_ = NULL; - MessageLoopForIO::current()->PostTask( - FROM_HERE, - NewRunnableMethod(runner.get(), - &CompletionCallbackRunner::Run)); - } - - static const char* kKeyName; - - private: - class CompletionCallbackRunner - : public base::RefCountedThreadSafe<CompletionCallbackRunner> { - public: - explicit CompletionCallbackRunner(CompletionCallback* callback) - : callback_(callback) { - DCHECK(callback_); - } - void Run() { - callback_->Run(OK); - } - private: - friend class base::RefCountedThreadSafe<CompletionCallbackRunner>; - - virtual ~CompletionCallbackRunner() {} - - CompletionCallback* callback_; - - DISALLOW_COPY_AND_ASSIGN(CompletionCallbackRunner); - }; - - const AddressList& address_list_; - - CompletionCallback* callback_; - // True if waiting another websocket connection is established. - // False if the websocket is performing handshaking. - bool waiting_; - - // True if the websocket handshake is completed. - // If true, it will be removed from queue and deleted from the SocketStream - // UserData soon. - bool handshake_finished_; - - // Buffer for read data to check handshake response message. - scoped_refptr<GrowableIOBuffer> buffer_; - - DISALLOW_COPY_AND_ASSIGN(WebSocketState); -}; - -const char* WebSocketThrottle::WebSocketState::kKeyName = "WebSocketState"; - WebSocketThrottle::WebSocketThrottle() { - SocketStreamThrottle::RegisterSocketStreamThrottle("ws", this); - SocketStreamThrottle::RegisterSocketStreamThrottle("wss", this); } WebSocketThrottle::~WebSocketThrottle() { @@ -161,51 +50,9 @@ WebSocketThrottle::~WebSocketThrottle() { DCHECK(addr_map_.empty()); } -int WebSocketThrottle::OnStartOpenConnection( - SocketStream* socket, CompletionCallback* callback) { - WebSocketState* state = new WebSocketState(socket->address_list()); - PutInQueue(socket, state); - return state->OnStartOpenConnection(callback); -} - -int WebSocketThrottle::OnRead(SocketStream* socket, - const char* data, int len, - CompletionCallback* callback) { - WebSocketState* state = static_cast<WebSocketState*>( - socket->GetUserData(WebSocketState::kKeyName)); - // If no state, handshake was already completed. Do nothing. - if (!state) - return OK; - - int result = state->OnRead(data, len, callback); - if (state->HandshakeFinished()) { - RemoveFromQueue(socket, state); - WakeupSocketIfNecessary(); - } - return result; -} - -int WebSocketThrottle::OnWrite(SocketStream* socket, - const char* data, int len, - CompletionCallback* callback) { - // Do nothing. - return OK; -} - -void WebSocketThrottle::OnClose(SocketStream* socket) { - WebSocketState* state = static_cast<WebSocketState*>( - socket->GetUserData(WebSocketState::kKeyName)); - if (!state) - return; - RemoveFromQueue(socket, state); - WakeupSocketIfNecessary(); -} - -void WebSocketThrottle::PutInQueue(SocketStream* socket, - WebSocketState* state) { - socket->SetUserData(WebSocketState::kKeyName, state); - queue_.push_back(state); - const AddressList& address_list = socket->address_list(); +void WebSocketThrottle::PutInQueue(WebSocketJob* job) { + queue_.push_back(job); + const AddressList& address_list = job->address_list(); for (const struct addrinfo* addrinfo = address_list.head(); addrinfo != NULL; addrinfo = addrinfo->ai_next) { @@ -213,18 +60,29 @@ void WebSocketThrottle::PutInQueue(SocketStream* socket, ConnectingAddressMap::iterator iter = addr_map_.find(addrkey); if (iter == addr_map_.end()) { ConnectingQueue* queue = new ConnectingQueue(); - queue->push_back(state); + queue->push_back(job); addr_map_[addrkey] = queue; } else { - iter->second->push_back(state); - state->SetWaiting(); + iter->second->push_back(job); + job->SetWaiting(); } } } -void WebSocketThrottle::RemoveFromQueue(SocketStream* socket, - WebSocketState* state) { - const AddressList& address_list = socket->address_list(); +void WebSocketThrottle::RemoveFromQueue(WebSocketJob* job) { + bool in_queue = false; + for (ConnectingQueue::iterator iter = queue_.begin(); + iter != queue_.end(); + ++iter) { + if (*iter == job) { + queue_.erase(iter); + in_queue = true; + break; + } + } + if (!in_queue) + return; + const AddressList& address_list = job->address_list(); for (const struct addrinfo* addrinfo = address_list.head(); addrinfo != NULL; addrinfo = addrinfo->ai_next) { @@ -232,34 +90,32 @@ void WebSocketThrottle::RemoveFromQueue(SocketStream* socket, ConnectingAddressMap::iterator iter = addr_map_.find(addrkey); DCHECK(iter != addr_map_.end()); ConnectingQueue* queue = iter->second; - DCHECK(state == queue->front()); - queue->pop_front(); + // Job may not be front of queue when job is closed early while waiting. + for (ConnectingQueue::iterator iter = queue->begin(); + iter != queue->end(); + ++iter) { + if (*iter == job) { + queue->erase(iter); + break; + } + } if (queue->empty()) { delete queue; addr_map_.erase(iter); } } - for (ConnectingQueue::iterator iter = queue_.begin(); - iter != queue_.end(); - ++iter) { - if (*iter == state) { - queue_.erase(iter); - break; - } - } - socket->SetUserData(WebSocketState::kKeyName, NULL); } void WebSocketThrottle::WakeupSocketIfNecessary() { for (ConnectingQueue::iterator iter = queue_.begin(); iter != queue_.end(); ++iter) { - WebSocketState* state = *iter; - if (!state->IsWaiting()) + WebSocketJob* job = *iter; + if (!job->IsWaiting()) continue; bool should_wakeup = true; - const AddressList& address_list = state->address_list(); + const AddressList& address_list = job->address_list(); for (const struct addrinfo* addrinfo = address_list.head(); addrinfo != NULL; addrinfo = addrinfo->ai_next) { @@ -267,19 +123,14 @@ void WebSocketThrottle::WakeupSocketIfNecessary() { ConnectingAddressMap::iterator iter = addr_map_.find(addrkey); DCHECK(iter != addr_map_.end()); ConnectingQueue* queue = iter->second; - if (state != queue->front()) { + if (job != queue->front()) { should_wakeup = false; break; } } if (should_wakeup) - state->Wakeup(); + job->Wakeup(); } } -/* static */ -void WebSocketThrottle::Init() { - Singleton<WebSocketThrottle>::get(); -} - } // namespace net diff --git a/net/websockets/websocket_throttle.h b/net/websockets/websocket_throttle.h index 279aea2..d05b246 100644 --- a/net/websockets/websocket_throttle.h +++ b/net/websockets/websocket_throttle.h @@ -5,12 +5,17 @@ #ifndef NET_WEBSOCKETS_WEBSOCKET_THROTTLE_H_ #define NET_WEBSOCKETS_WEBSOCKET_THROTTLE_H_ +#include <deque> +#include <string> + #include "base/hash_tables.h" #include "base/singleton.h" -#include "net/socket_stream/socket_stream_throttle.h" namespace net { +class SocketStream; +class WebSocketJob; + // SocketStreamThrottle for WebSocket protocol. // Implements the client-side requirements in the spec. // http://tools.ietf.org/html/draft-hixie-thewebsocketprotocol @@ -19,42 +24,31 @@ namespace net { // remote host (IP address) identified by /host/, even if known by // another name, wait until that connection has been established or // for that connection to have failed. -class WebSocketThrottle : public SocketStreamThrottle { +class WebSocketThrottle { public: - virtual int OnStartOpenConnection(SocketStream* socket, - CompletionCallback* callback); - virtual int OnRead(SocketStream* socket, const char* data, int len, - CompletionCallback* callback); - virtual int OnWrite(SocketStream* socket, const char* data, int len, - CompletionCallback* callback); - virtual void OnClose(SocketStream* socket); + // Puts |job| in |queue_| and queues for the destination addresses + // of |job|. + // If other job is using the same destination address, set |job| waiting. + void PutInQueue(WebSocketJob* job); - static void Init(); + // Removes |job| from |queue_| and queues for the destination addresses + // of |job|. + void RemoveFromQueue(WebSocketJob* job); + + // Checks sockets waiting in |queue_| and check the socket is the front of + // every queue for the destination addresses of |socket|. + // If so, the socket can resume estabilshing connection, so wake up + // the socket. + void WakeupSocketIfNecessary(); private: - class WebSocketState; - typedef std::deque<WebSocketState*> ConnectingQueue; + typedef std::deque<WebSocketJob*> ConnectingQueue; typedef base::hash_map<std::string, ConnectingQueue*> ConnectingAddressMap; WebSocketThrottle(); virtual ~WebSocketThrottle(); friend struct DefaultSingletonTraits<WebSocketThrottle>; - // Puts |socket| in |queue_| and queues for the destination addresses - // of |socket|. Also sets |state| as UserData of |socket|. - // If other socket is using the same destination address, set |state| waiting. - void PutInQueue(SocketStream* socket, WebSocketState* state); - - // Removes |socket| from |queue_| and queues for the destination addresses - // of |socket|. Also releases |state| from UserData of |socket|. - void RemoveFromQueue(SocketStream* socket, WebSocketState* state); - - // Checks sockets waiting in |queue_| and check the socket is the front of - // every queue for the destination addresses of |socket|. - // If so, the socket can resume estabilshing connection, so wake up - // the socket. - void WakeupSocketIfNecessary(); - // Key: string of host's address. Value: queue of sockets for the address. ConnectingAddressMap addr_map_; diff --git a/net/websockets/websocket_throttle_unittest.cc b/net/websockets/websocket_throttle_unittest.cc index 55276e9..d568292 100644 --- a/net/websockets/websocket_throttle_unittest.cc +++ b/net/websockets/websocket_throttle_unittest.cc @@ -10,6 +10,7 @@ #include "net/base/sys_addrinfo.h" #include "net/base/test_completion_callback.h" #include "net/socket_stream/socket_stream.h" +#include "net/websockets/websocket_job.h" #include "net/websockets/websocket_throttle.h" #include "testing/gtest/include/gtest/gtest.h" #include "testing/platform_test.h" @@ -66,94 +67,205 @@ class WebSocketThrottleTest : public PlatformTest { }; TEST_F(WebSocketThrottleTest, Throttle) { - WebSocketThrottle::Init(); DummySocketStreamDelegate delegate; - WebSocketThrottle* throttle = Singleton<WebSocketThrottle>::get(); - - EXPECT_EQ(throttle, - SocketStreamThrottle::GetSocketStreamThrottleForScheme("ws")); - EXPECT_EQ(throttle, - SocketStreamThrottle::GetSocketStreamThrottleForScheme("wss")); - // For host1: 1.2.3.4, 1.2.3.5, 1.2.3.6 struct addrinfo* addr = AddAddr(1, 2, 3, 4, NULL); addr = AddAddr(1, 2, 3, 5, addr); addr = AddAddr(1, 2, 3, 6, addr); + scoped_refptr<WebSocketJob> w1 = new WebSocketJob(&delegate); scoped_refptr<SocketStream> s1 = - new SocketStream(GURL("ws://host1/"), &delegate); + new SocketStream(GURL("ws://host1/"), w1.get()); + w1->InitSocketStream(s1.get()); WebSocketThrottleTest::SetAddressList(s1, addr); DeleteAddrInfo(addr); + DLOG(INFO) << "socket1"; TestCompletionCallback callback_s1; - EXPECT_EQ(OK, throttle->OnStartOpenConnection(s1, &callback_s1)); + // Trying to open connection to host1 will start without wait. + EXPECT_EQ(OK, w1->OnStartOpenConnection(s1, &callback_s1)); + + // Now connecting to host1, so waiting queue looks like + // Address | head -> tail + // 1.2.3.4 | w1 + // 1.2.3.5 | w1 + // 1.2.3.6 | w1 // For host2: 1.2.3.4 addr = AddAddr(1, 2, 3, 4, NULL); + scoped_refptr<WebSocketJob> w2 = new WebSocketJob(&delegate); scoped_refptr<SocketStream> s2 = - new SocketStream(GURL("ws://host2/"), &delegate); + new SocketStream(GURL("ws://host2/"), w2.get()); + w2->InitSocketStream(s2.get()); WebSocketThrottleTest::SetAddressList(s2, addr); DeleteAddrInfo(addr); + DLOG(INFO) << "socket2"; TestCompletionCallback callback_s2; - EXPECT_EQ(ERR_IO_PENDING, throttle->OnStartOpenConnection(s2, &callback_s2)); + // Trying to open connection to host2 will wait for w1. + EXPECT_EQ(ERR_IO_PENDING, w2->OnStartOpenConnection(s2, &callback_s2)); + // Now waiting queue looks like + // Address | head -> tail + // 1.2.3.4 | w1 w2 + // 1.2.3.5 | w1 + // 1.2.3.6 | w1 // For host3: 1.2.3.5 addr = AddAddr(1, 2, 3, 5, NULL); + scoped_refptr<WebSocketJob> w3 = new WebSocketJob(&delegate); scoped_refptr<SocketStream> s3 = - new SocketStream(GURL("ws://host3/"), &delegate); + new SocketStream(GURL("ws://host3/"), w3.get()); + w3->InitSocketStream(s3.get()); WebSocketThrottleTest::SetAddressList(s3, addr); DeleteAddrInfo(addr); + DLOG(INFO) << "socket3"; TestCompletionCallback callback_s3; - EXPECT_EQ(ERR_IO_PENDING, throttle->OnStartOpenConnection(s3, &callback_s3)); + // Trying to open connection to host3 will wait for w1. + EXPECT_EQ(ERR_IO_PENDING, w3->OnStartOpenConnection(s3, &callback_s3)); + // Address | head -> tail + // 1.2.3.4 | w1 w2 + // 1.2.3.5 | w1 w3 + // 1.2.3.6 | w1 // For host4: 1.2.3.4, 1.2.3.6 addr = AddAddr(1, 2, 3, 4, NULL); addr = AddAddr(1, 2, 3, 6, addr); + scoped_refptr<WebSocketJob> w4 = new WebSocketJob(&delegate); scoped_refptr<SocketStream> s4 = - new SocketStream(GURL("ws://host4/"), &delegate); + new SocketStream(GURL("ws://host4/"), w4.get()); + w4->InitSocketStream(s4.get()); WebSocketThrottleTest::SetAddressList(s4, addr); DeleteAddrInfo(addr); + DLOG(INFO) << "socket4"; TestCompletionCallback callback_s4; - EXPECT_EQ(ERR_IO_PENDING, throttle->OnStartOpenConnection(s4, &callback_s4)); + // Trying to open connection to host4 will wait for w1, w2. + EXPECT_EQ(ERR_IO_PENDING, w4->OnStartOpenConnection(s4, &callback_s4)); + // Address | head -> tail + // 1.2.3.4 | w1 w2 w4 + // 1.2.3.5 | w1 w3 + // 1.2.3.6 | w1 w4 + + // For host5: 1.2.3.6 + addr = AddAddr(1, 2, 3, 6, NULL); + scoped_refptr<WebSocketJob> w5 = new WebSocketJob(&delegate); + scoped_refptr<SocketStream> s5 = + new SocketStream(GURL("ws://host5/"), w5.get()); + w5->InitSocketStream(s5.get()); + WebSocketThrottleTest::SetAddressList(s5, addr); + DeleteAddrInfo(addr); + + DLOG(INFO) << "socket5"; + TestCompletionCallback callback_s5; + // Trying to open connection to host5 will wait for w1, w4 + EXPECT_EQ(ERR_IO_PENDING, w5->OnStartOpenConnection(s5, &callback_s5)); + // Address | head -> tail + // 1.2.3.4 | w1 w2 w4 + // 1.2.3.5 | w1 w3 + // 1.2.3.6 | w1 w4 w5 + + // For host6: 1.2.3.6 + addr = AddAddr(1, 2, 3, 6, NULL); + scoped_refptr<WebSocketJob> w6 = new WebSocketJob(&delegate); + scoped_refptr<SocketStream> s6 = + new SocketStream(GURL("ws://host6/"), w6.get()); + w6->InitSocketStream(s6.get()); + WebSocketThrottleTest::SetAddressList(s6, addr); + DeleteAddrInfo(addr); + DLOG(INFO) << "socket6"; + TestCompletionCallback callback_s6; + // Trying to open connection to host6 will wait for w1, w4, w5 + EXPECT_EQ(ERR_IO_PENDING, w6->OnStartOpenConnection(s6, &callback_s6)); + // Address | head -> tail + // 1.2.3.4 | w1 w2 w4 + // 1.2.3.5 | w1 w3 + // 1.2.3.6 | w1 w4 w5 w6 + + // Receive partial response on w1, still connecting. + DLOG(INFO) << "socket1 1"; static const char kHeader[] = "HTTP/1.1 101 Web Socket Protocol\r\n"; - EXPECT_EQ(OK, - throttle->OnRead(s1.get(), kHeader, sizeof(kHeader) - 1, NULL)); + w1->OnReceivedData(s1.get(), kHeader, sizeof(kHeader) - 1); EXPECT_FALSE(callback_s2.have_result()); EXPECT_FALSE(callback_s3.have_result()); EXPECT_FALSE(callback_s4.have_result()); + EXPECT_FALSE(callback_s5.have_result()); + EXPECT_FALSE(callback_s6.have_result()); + // Receive rest of handshake response on w1. + DLOG(INFO) << "socket1 2"; static const char kHeader2[] = "Upgrade: WebSocket\r\n" "Connection: Upgrade\r\n" "WebSocket-Origin: http://www.google.com\r\n" "WebSocket-Location: ws://websocket.chromium.org\r\n" "\r\n"; - EXPECT_EQ(OK, - throttle->OnRead(s1.get(), kHeader2, sizeof(kHeader2) - 1, NULL)); + w1->OnReceivedData(s1.get(), kHeader2, sizeof(kHeader2) - 1); MessageLoopForIO::current()->RunAllPending(); + // Now, w1 is open. + EXPECT_EQ(WebSocketJob::OPEN, w1->state()); + // So, w2 and w3 can start connecting. w4 needs to wait w2 (1.2.3.4) EXPECT_TRUE(callback_s2.have_result()); EXPECT_TRUE(callback_s3.have_result()); EXPECT_FALSE(callback_s4.have_result()); + // Address | head -> tail + // 1.2.3.4 | w2 w4 + // 1.2.3.5 | w3 + // 1.2.3.6 | w4 w5 w6 - throttle->OnClose(s1.get()); + // Closing s1 doesn't change waiting queue. + DLOG(INFO) << "socket1 close"; + w1->OnClose(s1.get()); MessageLoopForIO::current()->RunAllPending(); EXPECT_FALSE(callback_s4.have_result()); s1->DetachDelegate(); + // Address | head -> tail + // 1.2.3.4 | w2 w4 + // 1.2.3.5 | w3 + // 1.2.3.6 | w4 w5 w6 + + // w5 can close while waiting in queue. + DLOG(INFO) << "socket5 close"; + // w5 close() closes SocketStream that change state to STATE_CLOSE, calls + // DoLoop(), so OnClose() callback will be called. + w5->OnClose(s5.get()); + MessageLoopForIO::current()->RunAllPending(); + EXPECT_FALSE(callback_s4.have_result()); + // Address | head -> tail + // 1.2.3.4 | w2 w4 + // 1.2.3.5 | w3 + // 1.2.3.6 | w4 w6 + s5->DetachDelegate(); + + // w6 close abnormally (e.g. renderer finishes) while waiting in queue. + DLOG(INFO) << "socket6 close abnormally"; + w6->DetachDelegate(); + MessageLoopForIO::current()->RunAllPending(); + EXPECT_FALSE(callback_s4.have_result()); + // Address | head -> tail + // 1.2.3.4 | w2 w4 + // 1.2.3.5 | w3 + // 1.2.3.6 | w4 - throttle->OnClose(s2.get()); + // Closing s2 kicks w4 to start connecting. + DLOG(INFO) << "socket2 close"; + w2->OnClose(s2.get()); MessageLoopForIO::current()->RunAllPending(); EXPECT_TRUE(callback_s4.have_result()); + // Address | head -> tail + // 1.2.3.4 | w4 + // 1.2.3.5 | w3 + // 1.2.3.6 | w4 s2->DetachDelegate(); - throttle->OnClose(s3.get()); + DLOG(INFO) << "socket3 close"; + w3->OnClose(s3.get()); MessageLoopForIO::current()->RunAllPending(); s3->DetachDelegate(); - throttle->OnClose(s4.get()); + w4->OnClose(s4.get()); s4->DetachDelegate(); + DLOG(INFO) << "Done"; } } |