summaryrefslogtreecommitdiffstats
path: root/net/websockets/websocket_throttle.cc
diff options
context:
space:
mode:
authorukai@chromium.org <ukai@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2010-03-17 07:29:54 +0000
committerukai@chromium.org <ukai@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2010-03-17 07:29:54 +0000
commitb4384a798aef3f2e52c34d733a9af2db792c6c1d (patch)
treef0bdc9e8f054e23481ac619cb38238ed59584626 /net/websockets/websocket_throttle.cc
parentea477c25a06ac425257f1a94081aaca98929ab0f (diff)
downloadchromium_src-b4384a798aef3f2e52c34d733a9af2db792c6c1d.zip
chromium_src-b4384a798aef3f2e52c34d733a9af2db792c6c1d.tar.gz
chromium_src-b4384a798aef3f2e52c34d733a9af2db792c6c1d.tar.bz2
Refactor WebSocket throttling feature.
Protocol specific handling should be done in SocketStreamJob subclasss, so websocket throttling should be handled in WebSocketJob. Review URL: http://codereview.chromium.org/669157 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@41818 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'net/websockets/websocket_throttle.cc')
-rw-r--r--net/websockets/websocket_throttle.cc219
1 files changed, 35 insertions, 184 deletions
diff --git a/net/websockets/websocket_throttle.cc b/net/websockets/websocket_throttle.cc
index 8d0d1fb..e2e98c3 100644
--- a/net/websockets/websocket_throttle.cc
+++ b/net/websockets/websocket_throttle.cc
@@ -13,6 +13,7 @@
#include "net/base/io_buffer.h"
#include "net/base/sys_addrinfo.h"
#include "net/socket_stream/socket_stream.h"
+#include "net/websockets/websocket_job.h"
namespace net {
@@ -41,119 +42,7 @@ static std::string AddrinfoToHashkey(const struct addrinfo* addrinfo) {
}
}
-// State for WebSocket protocol on each SocketStream.
-// This is owned in SocketStream as UserData keyed by WebSocketState::kKeyName.
-// This is alive between connection starts and handshake is finished.
-// In this class, it doesn't check actual handshake finishes, but only checks
-// end of header is found in read data.
-class WebSocketThrottle::WebSocketState : public SocketStream::UserData {
- public:
- explicit WebSocketState(const AddressList& addrs)
- : address_list_(addrs),
- callback_(NULL),
- waiting_(false),
- handshake_finished_(false),
- buffer_(NULL) {
- }
- ~WebSocketState() {}
-
- int OnStartOpenConnection(CompletionCallback* callback) {
- DCHECK(!callback_);
- if (!waiting_)
- return OK;
- callback_ = callback;
- return ERR_IO_PENDING;
- }
-
- int OnRead(const char* data, int len, CompletionCallback* callback) {
- DCHECK(!waiting_);
- DCHECK(!callback_);
- DCHECK(!handshake_finished_);
- static const int kBufferSize = 8129;
-
- if (!buffer_) {
- // Fast path.
- int eoh = HttpUtil::LocateEndOfHeaders(data, len, 0);
- if (eoh > 0) {
- handshake_finished_ = true;
- return OK;
- }
- buffer_ = new GrowableIOBuffer();
- buffer_->SetCapacity(kBufferSize);
- } else if (buffer_->RemainingCapacity() < len) {
- buffer_->SetCapacity(buffer_->capacity() + kBufferSize);
- }
- memcpy(buffer_->data(), data, len);
- buffer_->set_offset(buffer_->offset() + len);
-
- int eoh = HttpUtil::LocateEndOfHeaders(buffer_->StartOfBuffer(),
- buffer_->offset(), 0);
- handshake_finished_ = (eoh > 0);
- return OK;
- }
-
- const AddressList& address_list() const { return address_list_; }
- void SetWaiting() { waiting_ = true; }
- bool IsWaiting() const { return waiting_; }
- bool HandshakeFinished() const { return handshake_finished_; }
- void Wakeup() {
- waiting_ = false;
- // We wrap |callback_| to keep this alive while this is released.
- scoped_refptr<CompletionCallbackRunner> runner =
- new CompletionCallbackRunner(callback_);
- callback_ = NULL;
- MessageLoopForIO::current()->PostTask(
- FROM_HERE,
- NewRunnableMethod(runner.get(),
- &CompletionCallbackRunner::Run));
- }
-
- static const char* kKeyName;
-
- private:
- class CompletionCallbackRunner
- : public base::RefCountedThreadSafe<CompletionCallbackRunner> {
- public:
- explicit CompletionCallbackRunner(CompletionCallback* callback)
- : callback_(callback) {
- DCHECK(callback_);
- }
- void Run() {
- callback_->Run(OK);
- }
- private:
- friend class base::RefCountedThreadSafe<CompletionCallbackRunner>;
-
- virtual ~CompletionCallbackRunner() {}
-
- CompletionCallback* callback_;
-
- DISALLOW_COPY_AND_ASSIGN(CompletionCallbackRunner);
- };
-
- const AddressList& address_list_;
-
- CompletionCallback* callback_;
- // True if waiting another websocket connection is established.
- // False if the websocket is performing handshaking.
- bool waiting_;
-
- // True if the websocket handshake is completed.
- // If true, it will be removed from queue and deleted from the SocketStream
- // UserData soon.
- bool handshake_finished_;
-
- // Buffer for read data to check handshake response message.
- scoped_refptr<GrowableIOBuffer> buffer_;
-
- DISALLOW_COPY_AND_ASSIGN(WebSocketState);
-};
-
-const char* WebSocketThrottle::WebSocketState::kKeyName = "WebSocketState";
-
WebSocketThrottle::WebSocketThrottle() {
- SocketStreamThrottle::RegisterSocketStreamThrottle("ws", this);
- SocketStreamThrottle::RegisterSocketStreamThrottle("wss", this);
}
WebSocketThrottle::~WebSocketThrottle() {
@@ -161,51 +50,9 @@ WebSocketThrottle::~WebSocketThrottle() {
DCHECK(addr_map_.empty());
}
-int WebSocketThrottle::OnStartOpenConnection(
- SocketStream* socket, CompletionCallback* callback) {
- WebSocketState* state = new WebSocketState(socket->address_list());
- PutInQueue(socket, state);
- return state->OnStartOpenConnection(callback);
-}
-
-int WebSocketThrottle::OnRead(SocketStream* socket,
- const char* data, int len,
- CompletionCallback* callback) {
- WebSocketState* state = static_cast<WebSocketState*>(
- socket->GetUserData(WebSocketState::kKeyName));
- // If no state, handshake was already completed. Do nothing.
- if (!state)
- return OK;
-
- int result = state->OnRead(data, len, callback);
- if (state->HandshakeFinished()) {
- RemoveFromQueue(socket, state);
- WakeupSocketIfNecessary();
- }
- return result;
-}
-
-int WebSocketThrottle::OnWrite(SocketStream* socket,
- const char* data, int len,
- CompletionCallback* callback) {
- // Do nothing.
- return OK;
-}
-
-void WebSocketThrottle::OnClose(SocketStream* socket) {
- WebSocketState* state = static_cast<WebSocketState*>(
- socket->GetUserData(WebSocketState::kKeyName));
- if (!state)
- return;
- RemoveFromQueue(socket, state);
- WakeupSocketIfNecessary();
-}
-
-void WebSocketThrottle::PutInQueue(SocketStream* socket,
- WebSocketState* state) {
- socket->SetUserData(WebSocketState::kKeyName, state);
- queue_.push_back(state);
- const AddressList& address_list = socket->address_list();
+void WebSocketThrottle::PutInQueue(WebSocketJob* job) {
+ queue_.push_back(job);
+ const AddressList& address_list = job->address_list();
for (const struct addrinfo* addrinfo = address_list.head();
addrinfo != NULL;
addrinfo = addrinfo->ai_next) {
@@ -213,18 +60,29 @@ void WebSocketThrottle::PutInQueue(SocketStream* socket,
ConnectingAddressMap::iterator iter = addr_map_.find(addrkey);
if (iter == addr_map_.end()) {
ConnectingQueue* queue = new ConnectingQueue();
- queue->push_back(state);
+ queue->push_back(job);
addr_map_[addrkey] = queue;
} else {
- iter->second->push_back(state);
- state->SetWaiting();
+ iter->second->push_back(job);
+ job->SetWaiting();
}
}
}
-void WebSocketThrottle::RemoveFromQueue(SocketStream* socket,
- WebSocketState* state) {
- const AddressList& address_list = socket->address_list();
+void WebSocketThrottle::RemoveFromQueue(WebSocketJob* job) {
+ bool in_queue = false;
+ for (ConnectingQueue::iterator iter = queue_.begin();
+ iter != queue_.end();
+ ++iter) {
+ if (*iter == job) {
+ queue_.erase(iter);
+ in_queue = true;
+ break;
+ }
+ }
+ if (!in_queue)
+ return;
+ const AddressList& address_list = job->address_list();
for (const struct addrinfo* addrinfo = address_list.head();
addrinfo != NULL;
addrinfo = addrinfo->ai_next) {
@@ -232,34 +90,32 @@ void WebSocketThrottle::RemoveFromQueue(SocketStream* socket,
ConnectingAddressMap::iterator iter = addr_map_.find(addrkey);
DCHECK(iter != addr_map_.end());
ConnectingQueue* queue = iter->second;
- DCHECK(state == queue->front());
- queue->pop_front();
+ // Job may not be front of queue when job is closed early while waiting.
+ for (ConnectingQueue::iterator iter = queue->begin();
+ iter != queue->end();
+ ++iter) {
+ if (*iter == job) {
+ queue->erase(iter);
+ break;
+ }
+ }
if (queue->empty()) {
delete queue;
addr_map_.erase(iter);
}
}
- for (ConnectingQueue::iterator iter = queue_.begin();
- iter != queue_.end();
- ++iter) {
- if (*iter == state) {
- queue_.erase(iter);
- break;
- }
- }
- socket->SetUserData(WebSocketState::kKeyName, NULL);
}
void WebSocketThrottle::WakeupSocketIfNecessary() {
for (ConnectingQueue::iterator iter = queue_.begin();
iter != queue_.end();
++iter) {
- WebSocketState* state = *iter;
- if (!state->IsWaiting())
+ WebSocketJob* job = *iter;
+ if (!job->IsWaiting())
continue;
bool should_wakeup = true;
- const AddressList& address_list = state->address_list();
+ const AddressList& address_list = job->address_list();
for (const struct addrinfo* addrinfo = address_list.head();
addrinfo != NULL;
addrinfo = addrinfo->ai_next) {
@@ -267,19 +123,14 @@ void WebSocketThrottle::WakeupSocketIfNecessary() {
ConnectingAddressMap::iterator iter = addr_map_.find(addrkey);
DCHECK(iter != addr_map_.end());
ConnectingQueue* queue = iter->second;
- if (state != queue->front()) {
+ if (job != queue->front()) {
should_wakeup = false;
break;
}
}
if (should_wakeup)
- state->Wakeup();
+ job->Wakeup();
}
}
-/* static */
-void WebSocketThrottle::Init() {
- Singleton<WebSocketThrottle>::get();
-}
-
} // namespace net