diff options
author | ukai@chromium.org <ukai@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2010-03-17 07:29:54 +0000 |
---|---|---|
committer | ukai@chromium.org <ukai@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2010-03-17 07:29:54 +0000 |
commit | b4384a798aef3f2e52c34d733a9af2db792c6c1d (patch) | |
tree | f0bdc9e8f054e23481ac619cb38238ed59584626 /net/websockets/websocket_throttle.cc | |
parent | ea477c25a06ac425257f1a94081aaca98929ab0f (diff) | |
download | chromium_src-b4384a798aef3f2e52c34d733a9af2db792c6c1d.zip chromium_src-b4384a798aef3f2e52c34d733a9af2db792c6c1d.tar.gz chromium_src-b4384a798aef3f2e52c34d733a9af2db792c6c1d.tar.bz2 |
Refactor WebSocket throttling feature.
Protocol specific handling should be done in SocketStreamJob subclasss,
so websocket throttling should be handled in WebSocketJob.
Review URL: http://codereview.chromium.org/669157
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@41818 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'net/websockets/websocket_throttle.cc')
-rw-r--r-- | net/websockets/websocket_throttle.cc | 219 |
1 files changed, 35 insertions, 184 deletions
diff --git a/net/websockets/websocket_throttle.cc b/net/websockets/websocket_throttle.cc index 8d0d1fb..e2e98c3 100644 --- a/net/websockets/websocket_throttle.cc +++ b/net/websockets/websocket_throttle.cc @@ -13,6 +13,7 @@ #include "net/base/io_buffer.h" #include "net/base/sys_addrinfo.h" #include "net/socket_stream/socket_stream.h" +#include "net/websockets/websocket_job.h" namespace net { @@ -41,119 +42,7 @@ static std::string AddrinfoToHashkey(const struct addrinfo* addrinfo) { } } -// State for WebSocket protocol on each SocketStream. -// This is owned in SocketStream as UserData keyed by WebSocketState::kKeyName. -// This is alive between connection starts and handshake is finished. -// In this class, it doesn't check actual handshake finishes, but only checks -// end of header is found in read data. -class WebSocketThrottle::WebSocketState : public SocketStream::UserData { - public: - explicit WebSocketState(const AddressList& addrs) - : address_list_(addrs), - callback_(NULL), - waiting_(false), - handshake_finished_(false), - buffer_(NULL) { - } - ~WebSocketState() {} - - int OnStartOpenConnection(CompletionCallback* callback) { - DCHECK(!callback_); - if (!waiting_) - return OK; - callback_ = callback; - return ERR_IO_PENDING; - } - - int OnRead(const char* data, int len, CompletionCallback* callback) { - DCHECK(!waiting_); - DCHECK(!callback_); - DCHECK(!handshake_finished_); - static const int kBufferSize = 8129; - - if (!buffer_) { - // Fast path. - int eoh = HttpUtil::LocateEndOfHeaders(data, len, 0); - if (eoh > 0) { - handshake_finished_ = true; - return OK; - } - buffer_ = new GrowableIOBuffer(); - buffer_->SetCapacity(kBufferSize); - } else if (buffer_->RemainingCapacity() < len) { - buffer_->SetCapacity(buffer_->capacity() + kBufferSize); - } - memcpy(buffer_->data(), data, len); - buffer_->set_offset(buffer_->offset() + len); - - int eoh = HttpUtil::LocateEndOfHeaders(buffer_->StartOfBuffer(), - buffer_->offset(), 0); - handshake_finished_ = (eoh > 0); - return OK; - } - - const AddressList& address_list() const { return address_list_; } - void SetWaiting() { waiting_ = true; } - bool IsWaiting() const { return waiting_; } - bool HandshakeFinished() const { return handshake_finished_; } - void Wakeup() { - waiting_ = false; - // We wrap |callback_| to keep this alive while this is released. - scoped_refptr<CompletionCallbackRunner> runner = - new CompletionCallbackRunner(callback_); - callback_ = NULL; - MessageLoopForIO::current()->PostTask( - FROM_HERE, - NewRunnableMethod(runner.get(), - &CompletionCallbackRunner::Run)); - } - - static const char* kKeyName; - - private: - class CompletionCallbackRunner - : public base::RefCountedThreadSafe<CompletionCallbackRunner> { - public: - explicit CompletionCallbackRunner(CompletionCallback* callback) - : callback_(callback) { - DCHECK(callback_); - } - void Run() { - callback_->Run(OK); - } - private: - friend class base::RefCountedThreadSafe<CompletionCallbackRunner>; - - virtual ~CompletionCallbackRunner() {} - - CompletionCallback* callback_; - - DISALLOW_COPY_AND_ASSIGN(CompletionCallbackRunner); - }; - - const AddressList& address_list_; - - CompletionCallback* callback_; - // True if waiting another websocket connection is established. - // False if the websocket is performing handshaking. - bool waiting_; - - // True if the websocket handshake is completed. - // If true, it will be removed from queue and deleted from the SocketStream - // UserData soon. - bool handshake_finished_; - - // Buffer for read data to check handshake response message. - scoped_refptr<GrowableIOBuffer> buffer_; - - DISALLOW_COPY_AND_ASSIGN(WebSocketState); -}; - -const char* WebSocketThrottle::WebSocketState::kKeyName = "WebSocketState"; - WebSocketThrottle::WebSocketThrottle() { - SocketStreamThrottle::RegisterSocketStreamThrottle("ws", this); - SocketStreamThrottle::RegisterSocketStreamThrottle("wss", this); } WebSocketThrottle::~WebSocketThrottle() { @@ -161,51 +50,9 @@ WebSocketThrottle::~WebSocketThrottle() { DCHECK(addr_map_.empty()); } -int WebSocketThrottle::OnStartOpenConnection( - SocketStream* socket, CompletionCallback* callback) { - WebSocketState* state = new WebSocketState(socket->address_list()); - PutInQueue(socket, state); - return state->OnStartOpenConnection(callback); -} - -int WebSocketThrottle::OnRead(SocketStream* socket, - const char* data, int len, - CompletionCallback* callback) { - WebSocketState* state = static_cast<WebSocketState*>( - socket->GetUserData(WebSocketState::kKeyName)); - // If no state, handshake was already completed. Do nothing. - if (!state) - return OK; - - int result = state->OnRead(data, len, callback); - if (state->HandshakeFinished()) { - RemoveFromQueue(socket, state); - WakeupSocketIfNecessary(); - } - return result; -} - -int WebSocketThrottle::OnWrite(SocketStream* socket, - const char* data, int len, - CompletionCallback* callback) { - // Do nothing. - return OK; -} - -void WebSocketThrottle::OnClose(SocketStream* socket) { - WebSocketState* state = static_cast<WebSocketState*>( - socket->GetUserData(WebSocketState::kKeyName)); - if (!state) - return; - RemoveFromQueue(socket, state); - WakeupSocketIfNecessary(); -} - -void WebSocketThrottle::PutInQueue(SocketStream* socket, - WebSocketState* state) { - socket->SetUserData(WebSocketState::kKeyName, state); - queue_.push_back(state); - const AddressList& address_list = socket->address_list(); +void WebSocketThrottle::PutInQueue(WebSocketJob* job) { + queue_.push_back(job); + const AddressList& address_list = job->address_list(); for (const struct addrinfo* addrinfo = address_list.head(); addrinfo != NULL; addrinfo = addrinfo->ai_next) { @@ -213,18 +60,29 @@ void WebSocketThrottle::PutInQueue(SocketStream* socket, ConnectingAddressMap::iterator iter = addr_map_.find(addrkey); if (iter == addr_map_.end()) { ConnectingQueue* queue = new ConnectingQueue(); - queue->push_back(state); + queue->push_back(job); addr_map_[addrkey] = queue; } else { - iter->second->push_back(state); - state->SetWaiting(); + iter->second->push_back(job); + job->SetWaiting(); } } } -void WebSocketThrottle::RemoveFromQueue(SocketStream* socket, - WebSocketState* state) { - const AddressList& address_list = socket->address_list(); +void WebSocketThrottle::RemoveFromQueue(WebSocketJob* job) { + bool in_queue = false; + for (ConnectingQueue::iterator iter = queue_.begin(); + iter != queue_.end(); + ++iter) { + if (*iter == job) { + queue_.erase(iter); + in_queue = true; + break; + } + } + if (!in_queue) + return; + const AddressList& address_list = job->address_list(); for (const struct addrinfo* addrinfo = address_list.head(); addrinfo != NULL; addrinfo = addrinfo->ai_next) { @@ -232,34 +90,32 @@ void WebSocketThrottle::RemoveFromQueue(SocketStream* socket, ConnectingAddressMap::iterator iter = addr_map_.find(addrkey); DCHECK(iter != addr_map_.end()); ConnectingQueue* queue = iter->second; - DCHECK(state == queue->front()); - queue->pop_front(); + // Job may not be front of queue when job is closed early while waiting. + for (ConnectingQueue::iterator iter = queue->begin(); + iter != queue->end(); + ++iter) { + if (*iter == job) { + queue->erase(iter); + break; + } + } if (queue->empty()) { delete queue; addr_map_.erase(iter); } } - for (ConnectingQueue::iterator iter = queue_.begin(); - iter != queue_.end(); - ++iter) { - if (*iter == state) { - queue_.erase(iter); - break; - } - } - socket->SetUserData(WebSocketState::kKeyName, NULL); } void WebSocketThrottle::WakeupSocketIfNecessary() { for (ConnectingQueue::iterator iter = queue_.begin(); iter != queue_.end(); ++iter) { - WebSocketState* state = *iter; - if (!state->IsWaiting()) + WebSocketJob* job = *iter; + if (!job->IsWaiting()) continue; bool should_wakeup = true; - const AddressList& address_list = state->address_list(); + const AddressList& address_list = job->address_list(); for (const struct addrinfo* addrinfo = address_list.head(); addrinfo != NULL; addrinfo = addrinfo->ai_next) { @@ -267,19 +123,14 @@ void WebSocketThrottle::WakeupSocketIfNecessary() { ConnectingAddressMap::iterator iter = addr_map_.find(addrkey); DCHECK(iter != addr_map_.end()); ConnectingQueue* queue = iter->second; - if (state != queue->front()) { + if (job != queue->front()) { should_wakeup = false; break; } } if (should_wakeup) - state->Wakeup(); + job->Wakeup(); } } -/* static */ -void WebSocketThrottle::Init() { - Singleton<WebSocketThrottle>::get(); -} - } // namespace net |