From b4384a798aef3f2e52c34d733a9af2db792c6c1d Mon Sep 17 00:00:00 2001
From: "ukai@chromium.org"
 <ukai@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>
Date: Wed, 17 Mar 2010 07:29:54 +0000
Subject: Refactor WebSocket throttling feature.

Protocol specific handling should be done in SocketStreamJob subclasss,
so websocket throttling should be handled in WebSocketJob.

Review URL: http://codereview.chromium.org/669157

git-svn-id: svn://svn.chromium.org/chrome/trunk/src@41818 0039d316-1c4b-4281-b951-d872f2087c98
---
 net/net.gyp                                   |   2 -
 net/socket_stream/socket_stream.cc            |  15 +-
 net/socket_stream/socket_stream.h             |   9 +-
 net/socket_stream/socket_stream_throttle.cc   |  80 ----------
 net/socket_stream/socket_stream_throttle.h    |  81 ----------
 net/websockets/websocket_job.cc               |  73 +++++++++
 net/websockets/websocket_job.h                |  16 +-
 net/websockets/websocket_job_unittest.cc      |  19 ++-
 net/websockets/websocket_throttle.cc          | 219 ++++----------------------
 net/websockets/websocket_throttle.h           |  48 +++---
 net/websockets/websocket_throttle_unittest.cc | 160 ++++++++++++++++---
 11 files changed, 305 insertions(+), 417 deletions(-)
 delete mode 100644 net/socket_stream/socket_stream_throttle.cc
 delete mode 100644 net/socket_stream/socket_stream_throttle.h

(limited to 'net')

diff --git a/net/net.gyp b/net/net.gyp
index 571f9ae..5637f41 100644
--- a/net/net.gyp
+++ b/net/net.gyp
@@ -453,8 +453,6 @@
         'socket_stream/socket_stream_job_manager.h',
         'socket_stream/socket_stream_metrics.cc',
         'socket_stream/socket_stream_metrics.h',
-        'socket_stream/socket_stream_throttle.cc',
-        'socket_stream/socket_stream_throttle.h',
         'spdy/spdy_bitmasks.h',
         'spdy/spdy_frame_builder.cc',
         'spdy/spdy_frame_builder.h',
diff --git a/net/socket_stream/socket_stream.cc b/net/socket_stream/socket_stream.cc
index 03d7a9f..e46f306 100644
--- a/net/socket_stream/socket_stream.cc
+++ b/net/socket_stream/socket_stream.cc
@@ -26,7 +26,6 @@
 #include "net/socket/socks_client_socket.h"
 #include "net/socket/tcp_client_socket.h"
 #include "net/socket_stream/socket_stream_metrics.h"
-#include "net/socket_stream/socket_stream_throttle.h"
 #include "net/url_request/url_request.h"
 
 static const int kMaxPendingSendAllowed = 32768;  // 32 kilobytes.
@@ -59,16 +58,12 @@ SocketStream::SocketStream(const GURL& url, Delegate* delegate)
       current_write_buf_(NULL),
       write_buf_offset_(0),
       write_buf_size_(0),
-      throttle_(
-          SocketStreamThrottle::GetSocketStreamThrottleForScheme(
-              url.scheme())),
       metrics_(new SocketStreamMetrics(url)) {
   DCHECK(MessageLoop::current()) <<
       "The current MessageLoop must exist";
   DCHECK_EQ(MessageLoop::TYPE_IO, MessageLoop::current()->type()) <<
       "The current MessageLoop must be TYPE_IO";
   DCHECK(delegate_);
-  DCHECK(throttle_);
 }
 
 SocketStream::~SocketStream() {
@@ -235,7 +230,6 @@ void SocketStream::Finish(int result) {
   if (delegate) {
     delegate->OnClose(this);
   }
-  throttle_->OnClose(this);
   Release();
 }
 
@@ -275,13 +269,12 @@ int SocketStream::DidReceiveData(int result) {
   net_log_.AddEvent(NetLog::TYPE_SOCKET_STREAM_RECEIVED);
   int len = result;
   metrics_->OnRead(len);
-  result = throttle_->OnRead(this, read_buf_->data(), len, &io_callback_);
   if (delegate_) {
     // Notify recevied data to delegate.
     delegate_->OnReceivedData(this, read_buf_->data(), len);
   }
   read_buf_ = NULL;
-  return result;
+  return OK;
 }
 
 int SocketStream::DidSendData(int result) {
@@ -289,8 +282,6 @@ int SocketStream::DidSendData(int result) {
   net_log_.AddEvent(NetLog::TYPE_SOCKET_STREAM_SENT);
   int len = result;
   metrics_->OnWrite(len);
-  result = throttle_->OnWrite(this, current_write_buf_->data(), len,
-                              &io_callback_);
   current_write_buf_ = NULL;
   if (delegate_)
     delegate_->OnSentData(this, len);
@@ -309,7 +300,7 @@ int SocketStream::DidSendData(int result) {
   } else {
     write_buf_offset_ += len;
   }
-  return result;
+  return OK;
 }
 
 void SocketStream::OnIOCompleted(int result) {
@@ -490,7 +481,7 @@ int SocketStream::DoResolveHost() {
 int SocketStream::DoResolveHostComplete(int result) {
   if (result == OK) {
     next_state_ = STATE_TCP_CONNECT;
-    result = throttle_->OnStartOpenConnection(this, &io_callback_);
+    result = delegate_->OnStartOpenConnection(this, &io_callback_);
     if (result == net::ERR_IO_PENDING)
       metrics_->OnWaitConnection();
   } else {
diff --git a/net/socket_stream/socket_stream.h b/net/socket_stream/socket_stream.h
index e8b250e..b8fd0bf 100644
--- a/net/socket_stream/socket_stream.h
+++ b/net/socket_stream/socket_stream.h
@@ -18,6 +18,7 @@
 #include "net/base/completion_callback.h"
 #include "net/base/io_buffer.h"
 #include "net/base/net_log.h"
+#include "net/base/net_errors.h"
 #include "net/http/http_auth.h"
 #include "net/http/http_auth_cache.h"
 #include "net/http/http_auth_handler.h"
@@ -34,7 +35,6 @@ class HttpAuthHandlerFactory;
 class SSLConfigService;
 class SingleRequestHostResolver;
 class SocketStreamMetrics;
-class SocketStreamThrottle;
 
 // SocketStream is used to implement Web Sockets.
 // It provides plain full-duplex stream with proxy and SSL support.
@@ -57,6 +57,11 @@ class SocketStream : public base::RefCountedThreadSafe<SocketStream> {
    public:
     virtual ~Delegate() {}
 
+    virtual int OnStartOpenConnection(SocketStream* socket,
+                                      CompletionCallback* callback) {
+      return OK;
+    }
+
     // Called when socket stream has been connected.  The socket stream accepts
     // at most |max_pending_send_allowed| so that a client of the socket stream
     // should keep track of how much it has pending and shouldn't go over
@@ -313,8 +318,6 @@ class SocketStream : public base::RefCountedThreadSafe<SocketStream> {
   int write_buf_size_;
   PendingDataQueue pending_write_bufs_;
 
-  SocketStreamThrottle* throttle_;
-
   scoped_ptr<SocketStreamMetrics> metrics_;
 
   DISALLOW_COPY_AND_ASSIGN(SocketStream);
diff --git a/net/socket_stream/socket_stream_throttle.cc b/net/socket_stream/socket_stream_throttle.cc
deleted file mode 100644
index 6a1d20d..0000000
--- a/net/socket_stream/socket_stream_throttle.cc
+++ /dev/null
@@ -1,80 +0,0 @@
-// Copyright (c) 2009 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#include <string>
-
-#include "net/socket_stream/socket_stream_throttle.h"
-
-#include "base/hash_tables.h"
-#include "base/singleton.h"
-#include "net/base/completion_callback.h"
-#include "net/socket_stream/socket_stream.h"
-
-namespace net {
-
-// Default SocketStreamThrottle.  No throttling. Used for unknown URL scheme.
-class DefaultSocketStreamThrottle : public SocketStreamThrottle {
- private:
-  DefaultSocketStreamThrottle() {}
-  virtual ~DefaultSocketStreamThrottle() {}
-  friend struct DefaultSingletonTraits<DefaultSocketStreamThrottle>;
-
-  DISALLOW_COPY_AND_ASSIGN(DefaultSocketStreamThrottle);
-};
-
-class SocketStreamThrottleRegistry {
- public:
-  SocketStreamThrottle* GetSocketStreamThrottleForScheme(
-      const std::string& scheme);
-
-  void RegisterSocketStreamThrottle(
-      const std::string& scheme, SocketStreamThrottle* throttle);
-
- private:
-  typedef base::hash_map<std::string, SocketStreamThrottle*> ThrottleMap;
-
-  SocketStreamThrottleRegistry() {}
-  ~SocketStreamThrottleRegistry() {}
-  friend struct DefaultSingletonTraits<SocketStreamThrottleRegistry>;
-
-  ThrottleMap throttles_;
-
-  DISALLOW_COPY_AND_ASSIGN(SocketStreamThrottleRegistry);
-};
-
-SocketStreamThrottle*
-SocketStreamThrottleRegistry::GetSocketStreamThrottleForScheme(
-    const std::string& scheme) {
-  ThrottleMap::const_iterator found = throttles_.find(scheme);
-  if (found == throttles_.end()) {
-    SocketStreamThrottle* throttle =
-        Singleton<DefaultSocketStreamThrottle>::get();
-    throttles_[scheme] = throttle;
-    return throttle;
-  }
-  return found->second;
-}
-
-void SocketStreamThrottleRegistry::RegisterSocketStreamThrottle(
-    const std::string& scheme, SocketStreamThrottle* throttle) {
-  throttles_[scheme] = throttle;
-}
-
-/* static */
-SocketStreamThrottle* SocketStreamThrottle::GetSocketStreamThrottleForScheme(
-    const std::string& scheme) {
-  SocketStreamThrottleRegistry* registry =
-      Singleton<SocketStreamThrottleRegistry>::get();
-  return registry->GetSocketStreamThrottleForScheme(scheme);
-}
-
-/* static */
-void SocketStreamThrottle::RegisterSocketStreamThrottle(
-    const std::string& scheme, SocketStreamThrottle* throttle) {
-  SocketStreamThrottleRegistry* registry =
-      Singleton<SocketStreamThrottleRegistry>::get();
-  registry->RegisterSocketStreamThrottle(scheme, throttle);
-}
-
-}  // namespace net
diff --git a/net/socket_stream/socket_stream_throttle.h b/net/socket_stream/socket_stream_throttle.h
deleted file mode 100644
index 7726cbe..0000000
--- a/net/socket_stream/socket_stream_throttle.h
+++ /dev/null
@@ -1,81 +0,0 @@
-// Copyright (c) 2009 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#ifndef NET_SOCKET_STREAM_SOCKET_STREAM_THROTTLE_H_
-#define NET_SOCKET_STREAM_SOCKET_STREAM_THROTTLE_H_
-
-#include <string>
-
-#include "base/basictypes.h"
-#include "net/base/completion_callback.h"
-#include "net/base/net_errors.h"
-
-namespace net {
-
-class SocketStream;
-
-// Abstract interface to throttle SocketStream per URL scheme.
-// Each URL scheme (protocol) could define own SocketStreamThrottle.
-// These methods will be called on IO thread.
-class SocketStreamThrottle {
- public:
-  // Called when |socket| is about to open connection.
-  // Returns net::OK if the connection can open now.
-  // Returns net::ERR_IO_PENDING if the connection should wait.  In this case,
-  // |callback| will be called when it's ready to open connection.
-  virtual int OnStartOpenConnection(SocketStream* socket,
-                                    CompletionCallback* callback) {
-    // No throttle by default.
-    return OK;
-  }
-
-  // Called when |socket| read |len| bytes of |data|.
-  // May wake up another waiting socket.
-  // Returns net::OK if |socket| can continue to run.
-  // Returns net::ERR_IO_PENDING if |socket| should suspend to run.  In this
-  // case, |callback| will be called when it's ready to resume running.
-  virtual int OnRead(SocketStream* socket, const char* data, int len,
-                     CompletionCallback* callback) {
-    // No throttle by default.
-    return OK;
-  }
-
-  // Called when |socket| wrote |len| bytes of |data|.
-  // May wake up another waiting socket.
-  // Returns net::OK if |socket| can continue to run.
-  // Returns net::ERR_IO_PENDING if |socket| should suspend to run.  In this
-  // case, |callback| will be called when it's ready to resume running.
-  virtual int OnWrite(SocketStream* socket, const char* data, int len,
-                      CompletionCallback* callback) {
-    // No throttle by default.
-    return OK;
-  }
-
-  // Called when |socket| is closed.
-  // May wake up another waiting socket.
-  virtual void OnClose(SocketStream* socket) {}
-
-  // Gets SocketStreamThrottle for URL |scheme|.
-  // Doesn't pass ownership of the SocketStreamThrottle.
-  static SocketStreamThrottle* GetSocketStreamThrottleForScheme(
-      const std::string& scheme);
-
-  // Registers |throttle| for URL |scheme|.
-  // Doesn't take ownership of |throttle|.  Typically |throttle| is
-  // singleton instance.
-  static void RegisterSocketStreamThrottle(
-      const std::string& scheme,
-      SocketStreamThrottle* throttle);
-
- protected:
-  SocketStreamThrottle() {}
-  virtual ~SocketStreamThrottle() {}
-
- private:
-  DISALLOW_COPY_AND_ASSIGN(SocketStreamThrottle);
-};
-
-}  // namespace net
-
-#endif  // NET_SOCKET_STREAM_SOCKET_STREAM_THROTTLE_H_
diff --git a/net/websockets/websocket_job.cc b/net/websockets/websocket_job.cc
index 59acfc5..62a62b7 100644
--- a/net/websockets/websocket_job.cc
+++ b/net/websockets/websocket_job.cc
@@ -10,6 +10,31 @@
 #include "net/base/cookie_store.h"
 #include "net/http/http_util.h"
 #include "net/url_request/url_request_context.h"
+#include "net/websockets/websocket_throttle.h"
+
+namespace {
+
+class CompletionCallbackRunner
+    : public base::RefCountedThreadSafe<CompletionCallbackRunner> {
+ public:
+  explicit CompletionCallbackRunner(net::CompletionCallback* callback)
+      : callback_(callback) {
+    DCHECK(callback_);
+  }
+  void Run() {
+    callback_->Run(net::OK);
+  }
+ private:
+  friend class base::RefCountedThreadSafe<CompletionCallbackRunner>;
+
+  virtual ~CompletionCallbackRunner() {}
+
+  net::CompletionCallback* callback_;
+
+  DISALLOW_COPY_AND_ASSIGN(CompletionCallbackRunner);
+};
+
+}
 
 namespace net {
 
@@ -75,6 +100,8 @@ void WebSocketJob::EnsureInit() {
 WebSocketJob::WebSocketJob(SocketStream::Delegate* delegate)
     : delegate_(delegate),
       state_(INITIALIZED),
+      waiting_(false),
+      callback_(NULL),
       handshake_request_sent_(0),
       handshake_response_header_length_(0),
       response_cookies_save_index_(0),
@@ -128,12 +155,27 @@ void WebSocketJob::RestartWithAuth(
 
 void WebSocketJob::DetachDelegate() {
   state_ = CLOSED;
+  Singleton<WebSocketThrottle>::get()->RemoveFromQueue(this);
+  Singleton<WebSocketThrottle>::get()->WakeupSocketIfNecessary();
+
   delegate_ = NULL;
   if (socket_)
     socket_->DetachDelegate();
   socket_ = NULL;
 }
 
+int WebSocketJob::OnStartOpenConnection(
+    SocketStream* socket, CompletionCallback* callback) {
+  DCHECK(!callback_);
+  state_ = CONNECTING;
+  addresses_.Copy(socket->address_list().head(), true);
+  Singleton<WebSocketThrottle>::get()->PutInQueue(this);
+  if (!waiting_)
+    return OK;
+  callback_ = callback;
+  return ERR_IO_PENDING;
+}
+
 void WebSocketJob::OnConnected(
     SocketStream* socket, int max_pending_send_allowed) {
   if (delegate_)
@@ -161,6 +203,9 @@ void WebSocketJob::OnReceivedData(
 
 void WebSocketJob::OnClose(SocketStream* socket) {
   state_ = CLOSED;
+  Singleton<WebSocketThrottle>::get()->RemoveFromQueue(this);
+  Singleton<WebSocketThrottle>::get()->WakeupSocketIfNecessary();
+
   SocketStream::Delegate* delegate = delegate_;
   delegate_ = NULL;
   socket_ = NULL;
@@ -325,6 +370,9 @@ void WebSocketJob::SaveNextCookie() {
         "\r\n" +
         remaining_data;
     state_ = OPEN;
+    Singleton<WebSocketThrottle>::get()->RemoveFromQueue(this);
+    Singleton<WebSocketThrottle>::get()->WakeupSocketIfNecessary();
+
     if (delegate_)
       delegate_->OnReceivedData(socket_,
                                 received_data.data(), received_data.size());
@@ -376,4 +424,29 @@ GURL WebSocketJob::GetURLForCookies() const {
   return url.ReplaceComponents(replacements);
 }
 
+const AddressList& WebSocketJob::address_list() const {
+  return addresses_;
+}
+
+void WebSocketJob::SetWaiting() {
+  waiting_ = true;
+}
+
+bool WebSocketJob::IsWaiting() const {
+  return waiting_;
+}
+
+void WebSocketJob::Wakeup() {
+  waiting_ = false;
+  DCHECK(callback_);
+  // We wrap |callback_| to keep this alive while this is released.
+  scoped_refptr<CompletionCallbackRunner> runner =
+      new CompletionCallbackRunner(callback_);
+  callback_ = NULL;
+  MessageLoopForIO::current()->PostTask(
+      FROM_HERE,
+      NewRunnableMethod(runner.get(),
+                        &CompletionCallbackRunner::Run));
+}
+
 }  // namespace net
diff --git a/net/websockets/websocket_job.h b/net/websockets/websocket_job.h
index 31fa503..23140c5 100644
--- a/net/websockets/websocket_job.h
+++ b/net/websockets/websocket_job.h
@@ -9,6 +9,7 @@
 #include <vector>
 
 #include "base/ref_counted.h"
+#include "net/base/address_list.h"
 #include "net/base/completion_callback.h"
 #include "net/socket_stream/socket_stream_job.h"
 
@@ -18,10 +19,9 @@ namespace net {
 
 // WebSocket protocol specific job on SocketStream.
 // It captures WebSocket handshake message and handles cookie operations.
-// Chome security policy doesn't allow renderer process (except dev tools)
+// Chrome security policy doesn't allow renderer process (except dev tools)
 // see HttpOnly cookies, so it injects cookie header in handshake request and
 // strips set-cookie headers in handshake response.
-// TODO(ukai): refactor to merge WebSocketThrottle functionality.
 // TODO(ukai): refactor websocket.cc to use this.
 class WebSocketJob : public SocketStreamJob, public SocketStream::Delegate {
  public:
@@ -36,6 +36,7 @@ class WebSocketJob : public SocketStreamJob, public SocketStream::Delegate {
 
   explicit WebSocketJob(SocketStream::Delegate* delegate);
 
+  State state() const { return state_; }
   virtual void Connect();
   virtual bool SendData(const char* data, int len);
   virtual void Close();
@@ -45,6 +46,8 @@ class WebSocketJob : public SocketStreamJob, public SocketStream::Delegate {
   virtual void DetachDelegate();
 
   // SocketStream::Delegate methods.
+  virtual int OnStartOpenConnection(
+      SocketStream* socket, CompletionCallback* callback);
   virtual void OnConnected(
       SocketStream* socket, int max_pending_send_allowed);
   virtual void OnSentData(
@@ -58,6 +61,7 @@ class WebSocketJob : public SocketStreamJob, public SocketStream::Delegate {
       const SocketStream* socket, int error);
 
  private:
+  friend class WebSocketThrottle;
   friend class WebSocketJobTest;
   virtual ~WebSocketJob();
 
@@ -74,8 +78,16 @@ class WebSocketJob : public SocketStreamJob, public SocketStream::Delegate {
 
   GURL GetURLForCookies() const;
 
+  const AddressList& address_list() const;
+  void SetWaiting();
+  bool IsWaiting() const;
+  void Wakeup();
+
   SocketStream::Delegate* delegate_;
   State state_;
+  bool waiting_;
+  AddressList addresses_;
+  CompletionCallback* callback_;  // for throttling.
 
   std::string original_handshake_request_;
   int original_handshake_request_header_length_;
diff --git a/net/websockets/websocket_job_unittest.cc b/net/websockets/websocket_job_unittest.cc
index de96a32..7bf9822 100644
--- a/net/websockets/websocket_job_unittest.cc
+++ b/net/websockets/websocket_job_unittest.cc
@@ -10,9 +10,11 @@
 #include "net/base/cookie_policy.h"
 #include "net/base/cookie_store.h"
 #include "net/base/net_errors.h"
+#include "net/base/sys_addrinfo.h"
 #include "net/socket_stream/socket_stream.h"
 #include "net/url_request/url_request_context.h"
 #include "net/websockets/websocket_job.h"
+#include "net/websockets/websocket_throttle.h"
 #include "testing/gtest/include/gtest/gtest.h"
 #include "testing/gmock/include/gmock/gmock.h"
 #include "testing/platform_test.h"
@@ -206,15 +208,28 @@ class WebSocketJobTest : public PlatformTest {
     websocket_ = new WebSocketJob(delegate);
     socket_ = new MockSocketStream(url, websocket_.get());
     websocket_->InitSocketStream(socket_.get());
-    websocket_->state_ = WebSocketJob::CONNECTING;
     websocket_->set_context(context_.get());
+    websocket_->state_ = WebSocketJob::CONNECTING;
+    struct addrinfo addr;
+    memset(&addr, 0, sizeof(struct addrinfo));
+    addr.ai_family = AF_INET;
+    addr.ai_addrlen = sizeof(struct sockaddr_in);
+    struct sockaddr_in sa_in;
+    memset(&sa_in, 0, sizeof(struct sockaddr_in));
+    memcpy(&sa_in.sin_addr, "\x7f\0\0\1", 4);
+    addr.ai_addr = reinterpret_cast<sockaddr*>(&sa_in);
+    addr.ai_next = NULL;
+    websocket_->addresses_.Copy(&addr, true);
+    Singleton<WebSocketThrottle>::get()->PutInQueue(websocket_);
   }
   WebSocketJob::State GetWebSocketJobState() {
     return websocket_->state_;
   }
   void CloseWebSocketJob() {
-    if (websocket_->socket_)
+    if (websocket_->socket_) {
       websocket_->socket_->DetachDelegate();
+      Singleton<WebSocketThrottle>::get()->RemoveFromQueue(websocket_);
+    }
     websocket_->state_ = WebSocketJob::CLOSED;
     websocket_->delegate_ = NULL;
     websocket_->socket_ = NULL;
diff --git a/net/websockets/websocket_throttle.cc b/net/websockets/websocket_throttle.cc
index 8d0d1fb..e2e98c3 100644
--- a/net/websockets/websocket_throttle.cc
+++ b/net/websockets/websocket_throttle.cc
@@ -13,6 +13,7 @@
 #include "net/base/io_buffer.h"
 #include "net/base/sys_addrinfo.h"
 #include "net/socket_stream/socket_stream.h"
+#include "net/websockets/websocket_job.h"
 
 namespace net {
 
@@ -41,119 +42,7 @@ static std::string AddrinfoToHashkey(const struct addrinfo* addrinfo) {
   }
 }
 
-// State for WebSocket protocol on each SocketStream.
-// This is owned in SocketStream as UserData keyed by WebSocketState::kKeyName.
-// This is alive between connection starts and handshake is finished.
-// In this class, it doesn't check actual handshake finishes, but only checks
-// end of header is found in read data.
-class WebSocketThrottle::WebSocketState : public SocketStream::UserData {
- public:
-  explicit WebSocketState(const AddressList& addrs)
-      : address_list_(addrs),
-        callback_(NULL),
-        waiting_(false),
-        handshake_finished_(false),
-        buffer_(NULL) {
-  }
-  ~WebSocketState() {}
-
-  int OnStartOpenConnection(CompletionCallback* callback) {
-    DCHECK(!callback_);
-    if (!waiting_)
-      return OK;
-    callback_ = callback;
-    return ERR_IO_PENDING;
-  }
-
-  int OnRead(const char* data, int len, CompletionCallback* callback) {
-    DCHECK(!waiting_);
-    DCHECK(!callback_);
-    DCHECK(!handshake_finished_);
-    static const int kBufferSize = 8129;
-
-    if (!buffer_) {
-      // Fast path.
-      int eoh = HttpUtil::LocateEndOfHeaders(data, len, 0);
-      if (eoh > 0) {
-        handshake_finished_ = true;
-        return OK;
-      }
-      buffer_ = new GrowableIOBuffer();
-      buffer_->SetCapacity(kBufferSize);
-    } else if (buffer_->RemainingCapacity() < len) {
-      buffer_->SetCapacity(buffer_->capacity() + kBufferSize);
-    }
-    memcpy(buffer_->data(), data, len);
-    buffer_->set_offset(buffer_->offset() + len);
-
-    int eoh = HttpUtil::LocateEndOfHeaders(buffer_->StartOfBuffer(),
-                                           buffer_->offset(), 0);
-    handshake_finished_ = (eoh > 0);
-    return OK;
-  }
-
-  const AddressList& address_list() const { return address_list_; }
-  void SetWaiting() { waiting_ = true; }
-  bool IsWaiting() const { return waiting_; }
-  bool HandshakeFinished() const { return handshake_finished_; }
-  void Wakeup() {
-    waiting_ = false;
-    // We wrap |callback_| to keep this alive while this is released.
-    scoped_refptr<CompletionCallbackRunner> runner =
-        new CompletionCallbackRunner(callback_);
-    callback_ = NULL;
-    MessageLoopForIO::current()->PostTask(
-        FROM_HERE,
-        NewRunnableMethod(runner.get(),
-                          &CompletionCallbackRunner::Run));
-  }
-
-  static const char* kKeyName;
-
- private:
-  class CompletionCallbackRunner
-      : public base::RefCountedThreadSafe<CompletionCallbackRunner> {
-   public:
-    explicit CompletionCallbackRunner(CompletionCallback* callback)
-        : callback_(callback) {
-      DCHECK(callback_);
-    }
-    void Run() {
-      callback_->Run(OK);
-    }
-   private:
-    friend class base::RefCountedThreadSafe<CompletionCallbackRunner>;
-
-    virtual ~CompletionCallbackRunner() {}
-
-    CompletionCallback* callback_;
-
-    DISALLOW_COPY_AND_ASSIGN(CompletionCallbackRunner);
-  };
-
-  const AddressList& address_list_;
-
-  CompletionCallback* callback_;
-  // True if waiting another websocket connection is established.
-  // False if the websocket is performing handshaking.
-  bool waiting_;
-
-  // True if the websocket handshake is completed.
-  // If true, it will be removed from queue and deleted from the SocketStream
-  // UserData soon.
-  bool handshake_finished_;
-
-  // Buffer for read data to check handshake response message.
-  scoped_refptr<GrowableIOBuffer> buffer_;
-
-  DISALLOW_COPY_AND_ASSIGN(WebSocketState);
-};
-
-const char* WebSocketThrottle::WebSocketState::kKeyName = "WebSocketState";
-
 WebSocketThrottle::WebSocketThrottle() {
-  SocketStreamThrottle::RegisterSocketStreamThrottle("ws", this);
-  SocketStreamThrottle::RegisterSocketStreamThrottle("wss", this);
 }
 
 WebSocketThrottle::~WebSocketThrottle() {
@@ -161,51 +50,9 @@ WebSocketThrottle::~WebSocketThrottle() {
   DCHECK(addr_map_.empty());
 }
 
-int WebSocketThrottle::OnStartOpenConnection(
-    SocketStream* socket, CompletionCallback* callback) {
-  WebSocketState* state = new WebSocketState(socket->address_list());
-  PutInQueue(socket, state);
-  return state->OnStartOpenConnection(callback);
-}
-
-int WebSocketThrottle::OnRead(SocketStream* socket,
-                              const char* data, int len,
-                              CompletionCallback* callback) {
-  WebSocketState* state = static_cast<WebSocketState*>(
-      socket->GetUserData(WebSocketState::kKeyName));
-  // If no state, handshake was already completed. Do nothing.
-  if (!state)
-    return OK;
-
-  int result = state->OnRead(data, len, callback);
-  if (state->HandshakeFinished()) {
-    RemoveFromQueue(socket, state);
-    WakeupSocketIfNecessary();
-  }
-  return result;
-}
-
-int WebSocketThrottle::OnWrite(SocketStream* socket,
-                               const char* data, int len,
-                               CompletionCallback* callback) {
-  // Do nothing.
-  return OK;
-}
-
-void WebSocketThrottle::OnClose(SocketStream* socket) {
-  WebSocketState* state = static_cast<WebSocketState*>(
-      socket->GetUserData(WebSocketState::kKeyName));
-  if (!state)
-    return;
-  RemoveFromQueue(socket, state);
-  WakeupSocketIfNecessary();
-}
-
-void WebSocketThrottle::PutInQueue(SocketStream* socket,
-                                   WebSocketState* state) {
-  socket->SetUserData(WebSocketState::kKeyName, state);
-  queue_.push_back(state);
-  const AddressList& address_list = socket->address_list();
+void WebSocketThrottle::PutInQueue(WebSocketJob* job) {
+  queue_.push_back(job);
+  const AddressList& address_list = job->address_list();
   for (const struct addrinfo* addrinfo = address_list.head();
        addrinfo != NULL;
        addrinfo = addrinfo->ai_next) {
@@ -213,18 +60,29 @@ void WebSocketThrottle::PutInQueue(SocketStream* socket,
     ConnectingAddressMap::iterator iter = addr_map_.find(addrkey);
     if (iter == addr_map_.end()) {
       ConnectingQueue* queue = new ConnectingQueue();
-      queue->push_back(state);
+      queue->push_back(job);
       addr_map_[addrkey] = queue;
     } else {
-      iter->second->push_back(state);
-      state->SetWaiting();
+      iter->second->push_back(job);
+      job->SetWaiting();
     }
   }
 }
 
-void WebSocketThrottle::RemoveFromQueue(SocketStream* socket,
-                                        WebSocketState* state) {
-  const AddressList& address_list = socket->address_list();
+void WebSocketThrottle::RemoveFromQueue(WebSocketJob* job) {
+  bool in_queue = false;
+  for (ConnectingQueue::iterator iter = queue_.begin();
+       iter != queue_.end();
+       ++iter) {
+    if (*iter == job) {
+      queue_.erase(iter);
+      in_queue = true;
+      break;
+    }
+  }
+  if (!in_queue)
+    return;
+  const AddressList& address_list = job->address_list();
   for (const struct addrinfo* addrinfo = address_list.head();
        addrinfo != NULL;
        addrinfo = addrinfo->ai_next) {
@@ -232,34 +90,32 @@ void WebSocketThrottle::RemoveFromQueue(SocketStream* socket,
     ConnectingAddressMap::iterator iter = addr_map_.find(addrkey);
     DCHECK(iter != addr_map_.end());
     ConnectingQueue* queue = iter->second;
-    DCHECK(state == queue->front());
-    queue->pop_front();
+    // Job may not be front of queue when job is closed early while waiting.
+    for (ConnectingQueue::iterator iter = queue->begin();
+         iter != queue->end();
+         ++iter) {
+      if (*iter == job) {
+        queue->erase(iter);
+        break;
+      }
+    }
     if (queue->empty()) {
       delete queue;
       addr_map_.erase(iter);
     }
   }
-  for (ConnectingQueue::iterator iter = queue_.begin();
-       iter != queue_.end();
-       ++iter) {
-    if (*iter == state) {
-      queue_.erase(iter);
-      break;
-    }
-  }
-  socket->SetUserData(WebSocketState::kKeyName, NULL);
 }
 
 void WebSocketThrottle::WakeupSocketIfNecessary() {
   for (ConnectingQueue::iterator iter = queue_.begin();
        iter != queue_.end();
        ++iter) {
-    WebSocketState* state = *iter;
-    if (!state->IsWaiting())
+    WebSocketJob* job = *iter;
+    if (!job->IsWaiting())
       continue;
 
     bool should_wakeup = true;
-    const AddressList& address_list = state->address_list();
+    const AddressList& address_list = job->address_list();
     for (const struct addrinfo* addrinfo = address_list.head();
          addrinfo != NULL;
          addrinfo = addrinfo->ai_next) {
@@ -267,19 +123,14 @@ void WebSocketThrottle::WakeupSocketIfNecessary() {
       ConnectingAddressMap::iterator iter = addr_map_.find(addrkey);
       DCHECK(iter != addr_map_.end());
       ConnectingQueue* queue = iter->second;
-      if (state != queue->front()) {
+      if (job != queue->front()) {
         should_wakeup = false;
         break;
       }
     }
     if (should_wakeup)
-      state->Wakeup();
+      job->Wakeup();
   }
 }
 
-/* static */
-void WebSocketThrottle::Init() {
-  Singleton<WebSocketThrottle>::get();
-}
-
 }  // namespace net
diff --git a/net/websockets/websocket_throttle.h b/net/websockets/websocket_throttle.h
index 279aea2..d05b246 100644
--- a/net/websockets/websocket_throttle.h
+++ b/net/websockets/websocket_throttle.h
@@ -5,12 +5,17 @@
 #ifndef NET_WEBSOCKETS_WEBSOCKET_THROTTLE_H_
 #define NET_WEBSOCKETS_WEBSOCKET_THROTTLE_H_
 
+#include <deque>
+#include <string>
+
 #include "base/hash_tables.h"
 #include "base/singleton.h"
-#include "net/socket_stream/socket_stream_throttle.h"
 
 namespace net {
 
+class SocketStream;
+class WebSocketJob;
+
 // SocketStreamThrottle for WebSocket protocol.
 // Implements the client-side requirements in the spec.
 // http://tools.ietf.org/html/draft-hixie-thewebsocketprotocol
@@ -19,42 +24,31 @@ namespace net {
 //        remote host (IP address) identified by /host/, even if known by
 //        another name, wait until that connection has been established or
 //        for that connection to have failed.
-class WebSocketThrottle : public SocketStreamThrottle {
+class WebSocketThrottle {
  public:
-  virtual int OnStartOpenConnection(SocketStream* socket,
-                                    CompletionCallback* callback);
-  virtual int OnRead(SocketStream* socket, const char* data, int len,
-                     CompletionCallback* callback);
-  virtual int OnWrite(SocketStream* socket, const char* data, int len,
-                      CompletionCallback* callback);
-  virtual void OnClose(SocketStream* socket);
+  // Puts |job| in |queue_| and queues for the destination addresses
+  // of |job|.
+  // If other job is using the same destination address, set |job| waiting.
+  void PutInQueue(WebSocketJob* job);
 
-  static void Init();
+  // Removes |job| from |queue_| and queues for the destination addresses
+  // of |job|.
+  void RemoveFromQueue(WebSocketJob* job);
+
+  // Checks sockets waiting in |queue_| and check the socket is the front of
+  // every queue for the destination addresses of |socket|.
+  // If so, the socket can resume estabilshing connection, so wake up
+  // the socket.
+  void WakeupSocketIfNecessary();
 
  private:
-  class WebSocketState;
-  typedef std::deque<WebSocketState*> ConnectingQueue;
+  typedef std::deque<WebSocketJob*> ConnectingQueue;
   typedef base::hash_map<std::string, ConnectingQueue*> ConnectingAddressMap;
 
   WebSocketThrottle();
   virtual ~WebSocketThrottle();
   friend struct DefaultSingletonTraits<WebSocketThrottle>;
 
-  // Puts |socket| in |queue_| and queues for the destination addresses
-  // of |socket|.  Also sets |state| as UserData of |socket|.
-  // If other socket is using the same destination address, set |state| waiting.
-  void PutInQueue(SocketStream* socket, WebSocketState* state);
-
-  // Removes |socket| from |queue_| and queues for the destination addresses
-  // of |socket|.  Also releases |state| from UserData of |socket|.
-  void RemoveFromQueue(SocketStream* socket, WebSocketState* state);
-
-  // Checks sockets waiting in |queue_| and check the socket is the front of
-  // every queue for the destination addresses of |socket|.
-  // If so, the socket can resume estabilshing connection, so wake up
-  // the socket.
-  void WakeupSocketIfNecessary();
-
   // Key: string of host's address.  Value: queue of sockets for the address.
   ConnectingAddressMap addr_map_;
 
diff --git a/net/websockets/websocket_throttle_unittest.cc b/net/websockets/websocket_throttle_unittest.cc
index 55276e9..d568292 100644
--- a/net/websockets/websocket_throttle_unittest.cc
+++ b/net/websockets/websocket_throttle_unittest.cc
@@ -10,6 +10,7 @@
 #include "net/base/sys_addrinfo.h"
 #include "net/base/test_completion_callback.h"
 #include "net/socket_stream/socket_stream.h"
+#include "net/websockets/websocket_job.h"
 #include "net/websockets/websocket_throttle.h"
 #include "testing/gtest/include/gtest/gtest.h"
 #include "testing/platform_test.h"
@@ -66,94 +67,205 @@ class WebSocketThrottleTest : public PlatformTest {
 };
 
 TEST_F(WebSocketThrottleTest, Throttle) {
-  WebSocketThrottle::Init();
   DummySocketStreamDelegate delegate;
 
-  WebSocketThrottle* throttle = Singleton<WebSocketThrottle>::get();
-
-  EXPECT_EQ(throttle,
-            SocketStreamThrottle::GetSocketStreamThrottleForScheme("ws"));
-  EXPECT_EQ(throttle,
-            SocketStreamThrottle::GetSocketStreamThrottleForScheme("wss"));
-
   // For host1: 1.2.3.4, 1.2.3.5, 1.2.3.6
   struct addrinfo* addr = AddAddr(1, 2, 3, 4, NULL);
   addr = AddAddr(1, 2, 3, 5, addr);
   addr = AddAddr(1, 2, 3, 6, addr);
+  scoped_refptr<WebSocketJob> w1 = new WebSocketJob(&delegate);
   scoped_refptr<SocketStream> s1 =
-      new SocketStream(GURL("ws://host1/"), &delegate);
+      new SocketStream(GURL("ws://host1/"), w1.get());
+  w1->InitSocketStream(s1.get());
   WebSocketThrottleTest::SetAddressList(s1, addr);
   DeleteAddrInfo(addr);
 
+  DLOG(INFO) << "socket1";
   TestCompletionCallback callback_s1;
-  EXPECT_EQ(OK, throttle->OnStartOpenConnection(s1, &callback_s1));
+  // Trying to open connection to host1 will start without wait.
+  EXPECT_EQ(OK, w1->OnStartOpenConnection(s1, &callback_s1));
+
+  // Now connecting to host1, so waiting queue looks like
+  // Address | head -> tail
+  // 1.2.3.4 | w1
+  // 1.2.3.5 | w1
+  // 1.2.3.6 | w1
 
   // For host2: 1.2.3.4
   addr = AddAddr(1, 2, 3, 4, NULL);
+  scoped_refptr<WebSocketJob> w2 = new WebSocketJob(&delegate);
   scoped_refptr<SocketStream> s2 =
-      new SocketStream(GURL("ws://host2/"), &delegate);
+      new SocketStream(GURL("ws://host2/"), w2.get());
+  w2->InitSocketStream(s2.get());
   WebSocketThrottleTest::SetAddressList(s2, addr);
   DeleteAddrInfo(addr);
 
+  DLOG(INFO) << "socket2";
   TestCompletionCallback callback_s2;
-  EXPECT_EQ(ERR_IO_PENDING, throttle->OnStartOpenConnection(s2, &callback_s2));
+  // Trying to open connection to host2 will wait for w1.
+  EXPECT_EQ(ERR_IO_PENDING, w2->OnStartOpenConnection(s2, &callback_s2));
+  // Now waiting queue looks like
+  // Address | head -> tail
+  // 1.2.3.4 | w1 w2
+  // 1.2.3.5 | w1
+  // 1.2.3.6 | w1
 
   // For host3: 1.2.3.5
   addr = AddAddr(1, 2, 3, 5, NULL);
+  scoped_refptr<WebSocketJob> w3 = new WebSocketJob(&delegate);
   scoped_refptr<SocketStream> s3 =
-      new SocketStream(GURL("ws://host3/"), &delegate);
+      new SocketStream(GURL("ws://host3/"), w3.get());
+  w3->InitSocketStream(s3.get());
   WebSocketThrottleTest::SetAddressList(s3, addr);
   DeleteAddrInfo(addr);
 
+  DLOG(INFO) << "socket3";
   TestCompletionCallback callback_s3;
-  EXPECT_EQ(ERR_IO_PENDING, throttle->OnStartOpenConnection(s3, &callback_s3));
+  // Trying to open connection to host3 will wait for w1.
+  EXPECT_EQ(ERR_IO_PENDING, w3->OnStartOpenConnection(s3, &callback_s3));
+  // Address | head -> tail
+  // 1.2.3.4 | w1 w2
+  // 1.2.3.5 | w1    w3
+  // 1.2.3.6 | w1
 
   // For host4: 1.2.3.4, 1.2.3.6
   addr = AddAddr(1, 2, 3, 4, NULL);
   addr = AddAddr(1, 2, 3, 6, addr);
+  scoped_refptr<WebSocketJob> w4 = new WebSocketJob(&delegate);
   scoped_refptr<SocketStream> s4 =
-      new SocketStream(GURL("ws://host4/"), &delegate);
+      new SocketStream(GURL("ws://host4/"), w4.get());
+  w4->InitSocketStream(s4.get());
   WebSocketThrottleTest::SetAddressList(s4, addr);
   DeleteAddrInfo(addr);
 
+  DLOG(INFO) << "socket4";
   TestCompletionCallback callback_s4;
-  EXPECT_EQ(ERR_IO_PENDING, throttle->OnStartOpenConnection(s4, &callback_s4));
+  // Trying to open connection to host4 will wait for w1, w2.
+  EXPECT_EQ(ERR_IO_PENDING, w4->OnStartOpenConnection(s4, &callback_s4));
+  // Address | head -> tail
+  // 1.2.3.4 | w1 w2    w4
+  // 1.2.3.5 | w1    w3
+  // 1.2.3.6 | w1       w4
+
+  // For host5: 1.2.3.6
+  addr = AddAddr(1, 2, 3, 6, NULL);
+  scoped_refptr<WebSocketJob> w5 = new WebSocketJob(&delegate);
+  scoped_refptr<SocketStream> s5 =
+      new SocketStream(GURL("ws://host5/"), w5.get());
+  w5->InitSocketStream(s5.get());
+  WebSocketThrottleTest::SetAddressList(s5, addr);
+  DeleteAddrInfo(addr);
+
+  DLOG(INFO) << "socket5";
+  TestCompletionCallback callback_s5;
+  // Trying to open connection to host5 will wait for w1, w4
+  EXPECT_EQ(ERR_IO_PENDING, w5->OnStartOpenConnection(s5, &callback_s5));
+  // Address | head -> tail
+  // 1.2.3.4 | w1 w2    w4
+  // 1.2.3.5 | w1    w3
+  // 1.2.3.6 | w1       w4 w5
+
+  // For host6: 1.2.3.6
+  addr = AddAddr(1, 2, 3, 6, NULL);
+  scoped_refptr<WebSocketJob> w6 = new WebSocketJob(&delegate);
+  scoped_refptr<SocketStream> s6 =
+      new SocketStream(GURL("ws://host6/"), w6.get());
+  w6->InitSocketStream(s6.get());
+  WebSocketThrottleTest::SetAddressList(s6, addr);
+  DeleteAddrInfo(addr);
 
+  DLOG(INFO) << "socket6";
+  TestCompletionCallback callback_s6;
+  // Trying to open connection to host6 will wait for w1, w4, w5
+  EXPECT_EQ(ERR_IO_PENDING, w6->OnStartOpenConnection(s6, &callback_s6));
+  // Address | head -> tail
+  // 1.2.3.4 | w1 w2    w4
+  // 1.2.3.5 | w1    w3
+  // 1.2.3.6 | w1       w4 w5 w6
+
+  // Receive partial response on w1, still connecting.
+  DLOG(INFO) << "socket1 1";
   static const char kHeader[] = "HTTP/1.1 101 Web Socket Protocol\r\n";
-  EXPECT_EQ(OK,
-            throttle->OnRead(s1.get(), kHeader, sizeof(kHeader) - 1, NULL));
+  w1->OnReceivedData(s1.get(), kHeader, sizeof(kHeader) - 1);
   EXPECT_FALSE(callback_s2.have_result());
   EXPECT_FALSE(callback_s3.have_result());
   EXPECT_FALSE(callback_s4.have_result());
+  EXPECT_FALSE(callback_s5.have_result());
+  EXPECT_FALSE(callback_s6.have_result());
 
+  // Receive rest of handshake response on w1.
+  DLOG(INFO) << "socket1 2";
   static const char kHeader2[] =
       "Upgrade: WebSocket\r\n"
       "Connection: Upgrade\r\n"
       "WebSocket-Origin: http://www.google.com\r\n"
       "WebSocket-Location: ws://websocket.chromium.org\r\n"
       "\r\n";
-  EXPECT_EQ(OK,
-            throttle->OnRead(s1.get(), kHeader2, sizeof(kHeader2) - 1, NULL));
+  w1->OnReceivedData(s1.get(), kHeader2, sizeof(kHeader2) - 1);
   MessageLoopForIO::current()->RunAllPending();
+  // Now, w1 is open.
+  EXPECT_EQ(WebSocketJob::OPEN, w1->state());
+  // So, w2 and w3 can start connecting. w4 needs to wait w2 (1.2.3.4)
   EXPECT_TRUE(callback_s2.have_result());
   EXPECT_TRUE(callback_s3.have_result());
   EXPECT_FALSE(callback_s4.have_result());
+  // Address | head -> tail
+  // 1.2.3.4 |    w2    w4
+  // 1.2.3.5 |       w3
+  // 1.2.3.6 |          w4 w5 w6
 
-  throttle->OnClose(s1.get());
+  // Closing s1 doesn't change waiting queue.
+  DLOG(INFO) << "socket1 close";
+  w1->OnClose(s1.get());
   MessageLoopForIO::current()->RunAllPending();
   EXPECT_FALSE(callback_s4.have_result());
   s1->DetachDelegate();
+  // Address | head -> tail
+  // 1.2.3.4 |    w2    w4
+  // 1.2.3.5 |       w3
+  // 1.2.3.6 |          w4 w5 w6
+
+  // w5 can close while waiting in queue.
+  DLOG(INFO) << "socket5 close";
+  // w5 close() closes SocketStream that change state to STATE_CLOSE, calls
+  // DoLoop(), so OnClose() callback will be called.
+  w5->OnClose(s5.get());
+  MessageLoopForIO::current()->RunAllPending();
+  EXPECT_FALSE(callback_s4.have_result());
+  // Address | head -> tail
+  // 1.2.3.4 |    w2    w4
+  // 1.2.3.5 |       w3
+  // 1.2.3.6 |          w4 w6
+  s5->DetachDelegate();
+
+  // w6 close abnormally (e.g. renderer finishes) while waiting in queue.
+  DLOG(INFO) << "socket6 close abnormally";
+  w6->DetachDelegate();
+  MessageLoopForIO::current()->RunAllPending();
+  EXPECT_FALSE(callback_s4.have_result());
+  // Address | head -> tail
+  // 1.2.3.4 |    w2    w4
+  // 1.2.3.5 |       w3
+  // 1.2.3.6 |          w4
 
-  throttle->OnClose(s2.get());
+  // Closing s2 kicks w4 to start connecting.
+  DLOG(INFO) << "socket2 close";
+  w2->OnClose(s2.get());
   MessageLoopForIO::current()->RunAllPending();
   EXPECT_TRUE(callback_s4.have_result());
+  // Address | head -> tail
+  // 1.2.3.4 |          w4
+  // 1.2.3.5 |       w3
+  // 1.2.3.6 |          w4
   s2->DetachDelegate();
 
-  throttle->OnClose(s3.get());
+  DLOG(INFO) << "socket3 close";
+  w3->OnClose(s3.get());
   MessageLoopForIO::current()->RunAllPending();
   s3->DetachDelegate();
-  throttle->OnClose(s4.get());
+  w4->OnClose(s4.get());
   s4->DetachDelegate();
+  DLOG(INFO) << "Done";
 }
 
 }
-- 
cgit v1.1