summaryrefslogtreecommitdiffstats
path: root/net/websockets/websocket_throttle.cc
diff options
context:
space:
mode:
authorukai@chromium.org <ukai@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2009-11-04 10:02:28 +0000
committerukai@chromium.org <ukai@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2009-11-04 10:02:28 +0000
commit4c4eac006bdbf1ad0f3e37db47c488f7c3ee4949 (patch)
tree16237134b9e037cf609ca4211b5c9efbcd8759ee /net/websockets/websocket_throttle.cc
parent8ecd3aade85b825c8206735f16c23880023782db (diff)
downloadchromium_src-4c4eac006bdbf1ad0f3e37db47c488f7c3ee4949.zip
chromium_src-4c4eac006bdbf1ad0f3e37db47c488f7c3ee4949.tar.gz
chromium_src-4c4eac006bdbf1ad0f3e37db47c488f7c3ee4949.tar.bz2
Implement websocket throttling.
Implement the client-side requirements in the spec. 4.1 Handshake 1. If the user agent already has a Web Socket connection to the remote host (IP address) identified by /host/, even if known by another name, wait until that connection has been established or for that connection to have failed. BUG=none TEST=net_unittests passes Review URL: http://codereview.chromium.org/342052 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@30949 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'net/websockets/websocket_throttle.cc')
-rw-r--r--net/websockets/websocket_throttle.cc293
1 files changed, 293 insertions, 0 deletions
diff --git a/net/websockets/websocket_throttle.cc b/net/websockets/websocket_throttle.cc
new file mode 100644
index 0000000..fb320b6
--- /dev/null
+++ b/net/websockets/websocket_throttle.cc
@@ -0,0 +1,293 @@
+// Copyright (c) 2009 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "net/websockets/websocket_throttle.h"
+
+#if defined(OS_WIN)
+#include <ws2tcpip.h>
+#else
+#include <netdb.h>
+#endif
+
+#include <string>
+
+#include "base/message_loop.h"
+#include "base/ref_counted.h"
+#include "base/singleton.h"
+#include "base/string_util.h"
+#include "net/base/io_buffer.h"
+#include "net/socket_stream/socket_stream.h"
+
+namespace net {
+
+static std::string AddrinfoToHashkey(const struct addrinfo* addrinfo) {
+ switch (addrinfo->ai_family) {
+ case AF_INET: {
+ const struct sockaddr_in* const addr =
+ reinterpret_cast<const sockaddr_in*>(addrinfo->ai_addr);
+ return StringPrintf("%d:%s",
+ addrinfo->ai_family,
+ HexEncode(&addr->sin_addr, 4).c_str());
+ }
+ case AF_INET6: {
+ const struct sockaddr_in6* const addr6 =
+ reinterpret_cast<const sockaddr_in6*>(addrinfo->ai_addr);
+ return StringPrintf("%d:%s",
+ addrinfo->ai_family,
+ HexEncode(&addr6->sin6_addr,
+ sizeof(addr6->sin6_addr)).c_str());
+ }
+ default:
+ return StringPrintf("%d:%s",
+ addrinfo->ai_family,
+ HexEncode(addrinfo->ai_addr,
+ addrinfo->ai_addrlen).c_str());
+ }
+}
+
+// State for WebSocket protocol on each SocketStream.
+// This is owned in SocketStream as UserData keyed by WebSocketState::kKeyName.
+// This is alive between connection starts and handshake is finished.
+// In this class, it doesn't check actual handshake finishes, but only checks
+// end of header is found in read data.
+class WebSocketThrottle::WebSocketState : public SocketStream::UserData {
+ public:
+ explicit WebSocketState(const AddressList& addrs)
+ : address_list_(addrs),
+ callback_(NULL),
+ waiting_(false),
+ handshake_finished_(false),
+ buffer_(NULL) {
+ }
+ ~WebSocketState() {}
+
+ int OnStartOpenConnection(CompletionCallback* callback) {
+ DCHECK(!callback_);
+ if (!waiting_)
+ return OK;
+ callback_ = callback;
+ return ERR_IO_PENDING;
+ }
+
+ int OnRead(const char* data, int len, CompletionCallback* callback) {
+ DCHECK(!waiting_);
+ DCHECK(!callback_);
+ DCHECK(!handshake_finished_);
+ static const int kBufferSize = 8129;
+
+ if (!buffer_) {
+ // Fast path.
+ int eoh = HttpUtil::LocateEndOfHeaders(data, len, 0);
+ if (eoh > 0) {
+ handshake_finished_ = true;
+ return OK;
+ }
+ buffer_ = new GrowableIOBuffer();
+ buffer_->SetCapacity(kBufferSize);
+ } else {
+ if (buffer_->RemainingCapacity() < len) {
+ if (!buffer_->SetCapacity(buffer_->capacity() + kBufferSize)) {
+ // TODO(ukai): Check more correctly.
+ // Seek to the last CR or LF and reduce memory usage.
+ LOG(ERROR) << "Too large headers? capacity=" << buffer_->capacity();
+ handshake_finished_ = true;
+ return OK;
+ }
+ }
+ }
+ memcpy(buffer_->data(), data, len);
+ buffer_->set_offset(buffer_->offset() + len);
+
+ int eoh = HttpUtil::LocateEndOfHeaders(buffer_->StartOfBuffer(),
+ buffer_->offset(), 0);
+ handshake_finished_ = (eoh > 0);
+ return OK;
+ }
+
+ const AddressList& address_list() const { return address_list_; }
+ void SetWaiting() { waiting_ = true; }
+ bool IsWaiting() const { return waiting_; }
+ bool HandshakeFinished() const { return handshake_finished_; }
+ void Wakeup() {
+ waiting_ = false;
+ // We wrap |callback_| to keep this alive while this is released.
+ scoped_refptr<CompletionCallbackRunner> runner =
+ new CompletionCallbackRunner(callback_);
+ callback_ = NULL;
+ MessageLoopForIO::current()->PostTask(
+ FROM_HERE,
+ NewRunnableMethod(runner.get(),
+ &CompletionCallbackRunner::Run));
+ }
+
+ static const char* kKeyName;
+
+ private:
+ class CompletionCallbackRunner
+ : public base::RefCountedThreadSafe<CompletionCallbackRunner> {
+ public:
+ explicit CompletionCallbackRunner(CompletionCallback* callback)
+ : callback_(callback) {
+ DCHECK(callback_);
+ }
+ virtual ~CompletionCallbackRunner() {}
+ void Run() {
+ callback_->Run(OK);
+ }
+ private:
+ CompletionCallback* callback_;
+
+ DISALLOW_COPY_AND_ASSIGN(CompletionCallbackRunner);
+ };
+
+ const AddressList& address_list_;
+
+ CompletionCallback* callback_;
+ // True if waiting another websocket connection is established.
+ // False if the websocket is performing handshaking.
+ bool waiting_;
+
+ // True if the websocket handshake is completed.
+ // If true, it will be removed from queue and deleted from the SocketStream
+ // UserData soon.
+ bool handshake_finished_;
+
+ // Buffer for read data to check handshake response message.
+ scoped_refptr<GrowableIOBuffer> buffer_;
+
+ DISALLOW_COPY_AND_ASSIGN(WebSocketState);
+};
+
+const char* WebSocketThrottle::WebSocketState::kKeyName = "WebSocketState";
+
+WebSocketThrottle::WebSocketThrottle() {
+ SocketStreamThrottle::RegisterSocketStreamThrottle("ws", this);
+ SocketStreamThrottle::RegisterSocketStreamThrottle("wss", this);
+}
+
+WebSocketThrottle::~WebSocketThrottle() {
+ DCHECK(queue_.empty());
+ DCHECK(addr_map_.empty());
+}
+
+int WebSocketThrottle::OnStartOpenConnection(
+ SocketStream* socket, CompletionCallback* callback) {
+ WebSocketState* state = new WebSocketState(socket->address_list());
+ PutInQueue(socket, state);
+ return state->OnStartOpenConnection(callback);
+}
+
+int WebSocketThrottle::OnRead(SocketStream* socket,
+ const char* data, int len,
+ CompletionCallback* callback) {
+ WebSocketState* state = static_cast<WebSocketState*>(
+ socket->GetUserData(WebSocketState::kKeyName));
+ // If no state, handshake was already completed. Do nothing.
+ if (!state)
+ return OK;
+
+ int result = state->OnRead(data, len, callback);
+ if (state->HandshakeFinished()) {
+ RemoveFromQueue(socket, state);
+ WakeupSocketIfNecessary();
+ }
+ return result;
+}
+
+int WebSocketThrottle::OnWrite(SocketStream* socket,
+ const char* data, int len,
+ CompletionCallback* callback) {
+ // Do nothing.
+ return OK;
+}
+
+void WebSocketThrottle::OnClose(SocketStream* socket) {
+ WebSocketState* state = static_cast<WebSocketState*>(
+ socket->GetUserData(WebSocketState::kKeyName));
+ if (!state)
+ return;
+ RemoveFromQueue(socket, state);
+ WakeupSocketIfNecessary();
+}
+
+void WebSocketThrottle::PutInQueue(SocketStream* socket,
+ WebSocketState* state) {
+ socket->SetUserData(WebSocketState::kKeyName, state);
+ queue_.push_back(state);
+ const AddressList& address_list = socket->address_list();
+ for (const struct addrinfo* addrinfo = address_list.head();
+ addrinfo != NULL;
+ addrinfo = addrinfo->ai_next) {
+ std::string addrkey = AddrinfoToHashkey(addrinfo);
+ ConnectingAddressMap::iterator iter = addr_map_.find(addrkey);
+ if (iter == addr_map_.end()) {
+ ConnectingQueue* queue = new ConnectingQueue();
+ queue->push_back(state);
+ addr_map_[addrkey] = queue;
+ } else {
+ iter->second->push_back(state);
+ state->SetWaiting();
+ }
+ }
+}
+
+void WebSocketThrottle::RemoveFromQueue(SocketStream* socket,
+ WebSocketState* state) {
+ const AddressList& address_list = socket->address_list();
+ for (const struct addrinfo* addrinfo = address_list.head();
+ addrinfo != NULL;
+ addrinfo = addrinfo->ai_next) {
+ std::string addrkey = AddrinfoToHashkey(addrinfo);
+ ConnectingAddressMap::iterator iter = addr_map_.find(addrkey);
+ DCHECK(iter != addr_map_.end());
+ ConnectingQueue* queue = iter->second;
+ DCHECK(state == queue->front());
+ queue->pop_front();
+ if (queue->empty())
+ addr_map_.erase(iter);
+ }
+ for (ConnectingQueue::iterator iter = queue_.begin();
+ iter != queue_.end();
+ ++iter) {
+ if (*iter == state) {
+ queue_.erase(iter);
+ break;
+ }
+ }
+ socket->SetUserData(WebSocketState::kKeyName, NULL);
+}
+
+void WebSocketThrottle::WakeupSocketIfNecessary() {
+ for (ConnectingQueue::iterator iter = queue_.begin();
+ iter != queue_.end();
+ ++iter) {
+ WebSocketState* state = *iter;
+ if (!state->IsWaiting())
+ continue;
+
+ bool should_wakeup = true;
+ const AddressList& address_list = state->address_list();
+ for (const struct addrinfo* addrinfo = address_list.head();
+ addrinfo != NULL;
+ addrinfo = addrinfo->ai_next) {
+ std::string addrkey = AddrinfoToHashkey(addrinfo);
+ ConnectingAddressMap::iterator iter = addr_map_.find(addrkey);
+ DCHECK(iter != addr_map_.end());
+ ConnectingQueue* queue = iter->second;
+ if (state != queue->front()) {
+ should_wakeup = false;
+ break;
+ }
+ }
+ if (should_wakeup)
+ state->Wakeup();
+ }
+}
+
+/* static */
+void WebSocketThrottle::Init() {
+ Singleton<WebSocketThrottle>::get();
+}
+
+} // namespace net