summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--chrome/browser/renderer_host/socket_stream_dispatcher_host.cc1
-rw-r--r--net/net.gyp2
-rw-r--r--net/socket_stream/socket_stream.cc15
-rw-r--r--net/socket_stream/socket_stream.h9
-rw-r--r--net/socket_stream/socket_stream_throttle.cc80
-rw-r--r--net/socket_stream/socket_stream_throttle.h81
-rw-r--r--net/websockets/websocket_job.cc73
-rw-r--r--net/websockets/websocket_job.h16
-rw-r--r--net/websockets/websocket_job_unittest.cc19
-rw-r--r--net/websockets/websocket_throttle.cc219
-rw-r--r--net/websockets/websocket_throttle.h48
-rw-r--r--net/websockets/websocket_throttle_unittest.cc160
12 files changed, 305 insertions, 418 deletions
diff --git a/chrome/browser/renderer_host/socket_stream_dispatcher_host.cc b/chrome/browser/renderer_host/socket_stream_dispatcher_host.cc
index daddfcc..b99e70d 100644
--- a/chrome/browser/renderer_host/socket_stream_dispatcher_host.cc
+++ b/chrome/browser/renderer_host/socket_stream_dispatcher_host.cc
@@ -14,7 +14,6 @@
SocketStreamDispatcherHost::SocketStreamDispatcherHost() : receiver_(NULL) {
net::WebSocketJob::EnsureInit();
- net::WebSocketThrottle::Init();
}
SocketStreamDispatcherHost::~SocketStreamDispatcherHost() {
diff --git a/net/net.gyp b/net/net.gyp
index 571f9ae..5637f41 100644
--- a/net/net.gyp
+++ b/net/net.gyp
@@ -453,8 +453,6 @@
'socket_stream/socket_stream_job_manager.h',
'socket_stream/socket_stream_metrics.cc',
'socket_stream/socket_stream_metrics.h',
- 'socket_stream/socket_stream_throttle.cc',
- 'socket_stream/socket_stream_throttle.h',
'spdy/spdy_bitmasks.h',
'spdy/spdy_frame_builder.cc',
'spdy/spdy_frame_builder.h',
diff --git a/net/socket_stream/socket_stream.cc b/net/socket_stream/socket_stream.cc
index 03d7a9f..e46f306 100644
--- a/net/socket_stream/socket_stream.cc
+++ b/net/socket_stream/socket_stream.cc
@@ -26,7 +26,6 @@
#include "net/socket/socks_client_socket.h"
#include "net/socket/tcp_client_socket.h"
#include "net/socket_stream/socket_stream_metrics.h"
-#include "net/socket_stream/socket_stream_throttle.h"
#include "net/url_request/url_request.h"
static const int kMaxPendingSendAllowed = 32768; // 32 kilobytes.
@@ -59,16 +58,12 @@ SocketStream::SocketStream(const GURL& url, Delegate* delegate)
current_write_buf_(NULL),
write_buf_offset_(0),
write_buf_size_(0),
- throttle_(
- SocketStreamThrottle::GetSocketStreamThrottleForScheme(
- url.scheme())),
metrics_(new SocketStreamMetrics(url)) {
DCHECK(MessageLoop::current()) <<
"The current MessageLoop must exist";
DCHECK_EQ(MessageLoop::TYPE_IO, MessageLoop::current()->type()) <<
"The current MessageLoop must be TYPE_IO";
DCHECK(delegate_);
- DCHECK(throttle_);
}
SocketStream::~SocketStream() {
@@ -235,7 +230,6 @@ void SocketStream::Finish(int result) {
if (delegate) {
delegate->OnClose(this);
}
- throttle_->OnClose(this);
Release();
}
@@ -275,13 +269,12 @@ int SocketStream::DidReceiveData(int result) {
net_log_.AddEvent(NetLog::TYPE_SOCKET_STREAM_RECEIVED);
int len = result;
metrics_->OnRead(len);
- result = throttle_->OnRead(this, read_buf_->data(), len, &io_callback_);
if (delegate_) {
// Notify recevied data to delegate.
delegate_->OnReceivedData(this, read_buf_->data(), len);
}
read_buf_ = NULL;
- return result;
+ return OK;
}
int SocketStream::DidSendData(int result) {
@@ -289,8 +282,6 @@ int SocketStream::DidSendData(int result) {
net_log_.AddEvent(NetLog::TYPE_SOCKET_STREAM_SENT);
int len = result;
metrics_->OnWrite(len);
- result = throttle_->OnWrite(this, current_write_buf_->data(), len,
- &io_callback_);
current_write_buf_ = NULL;
if (delegate_)
delegate_->OnSentData(this, len);
@@ -309,7 +300,7 @@ int SocketStream::DidSendData(int result) {
} else {
write_buf_offset_ += len;
}
- return result;
+ return OK;
}
void SocketStream::OnIOCompleted(int result) {
@@ -490,7 +481,7 @@ int SocketStream::DoResolveHost() {
int SocketStream::DoResolveHostComplete(int result) {
if (result == OK) {
next_state_ = STATE_TCP_CONNECT;
- result = throttle_->OnStartOpenConnection(this, &io_callback_);
+ result = delegate_->OnStartOpenConnection(this, &io_callback_);
if (result == net::ERR_IO_PENDING)
metrics_->OnWaitConnection();
} else {
diff --git a/net/socket_stream/socket_stream.h b/net/socket_stream/socket_stream.h
index e8b250e..b8fd0bf 100644
--- a/net/socket_stream/socket_stream.h
+++ b/net/socket_stream/socket_stream.h
@@ -18,6 +18,7 @@
#include "net/base/completion_callback.h"
#include "net/base/io_buffer.h"
#include "net/base/net_log.h"
+#include "net/base/net_errors.h"
#include "net/http/http_auth.h"
#include "net/http/http_auth_cache.h"
#include "net/http/http_auth_handler.h"
@@ -34,7 +35,6 @@ class HttpAuthHandlerFactory;
class SSLConfigService;
class SingleRequestHostResolver;
class SocketStreamMetrics;
-class SocketStreamThrottle;
// SocketStream is used to implement Web Sockets.
// It provides plain full-duplex stream with proxy and SSL support.
@@ -57,6 +57,11 @@ class SocketStream : public base::RefCountedThreadSafe<SocketStream> {
public:
virtual ~Delegate() {}
+ virtual int OnStartOpenConnection(SocketStream* socket,
+ CompletionCallback* callback) {
+ return OK;
+ }
+
// Called when socket stream has been connected. The socket stream accepts
// at most |max_pending_send_allowed| so that a client of the socket stream
// should keep track of how much it has pending and shouldn't go over
@@ -313,8 +318,6 @@ class SocketStream : public base::RefCountedThreadSafe<SocketStream> {
int write_buf_size_;
PendingDataQueue pending_write_bufs_;
- SocketStreamThrottle* throttle_;
-
scoped_ptr<SocketStreamMetrics> metrics_;
DISALLOW_COPY_AND_ASSIGN(SocketStream);
diff --git a/net/socket_stream/socket_stream_throttle.cc b/net/socket_stream/socket_stream_throttle.cc
deleted file mode 100644
index 6a1d20d..0000000
--- a/net/socket_stream/socket_stream_throttle.cc
+++ /dev/null
@@ -1,80 +0,0 @@
-// Copyright (c) 2009 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#include <string>
-
-#include "net/socket_stream/socket_stream_throttle.h"
-
-#include "base/hash_tables.h"
-#include "base/singleton.h"
-#include "net/base/completion_callback.h"
-#include "net/socket_stream/socket_stream.h"
-
-namespace net {
-
-// Default SocketStreamThrottle. No throttling. Used for unknown URL scheme.
-class DefaultSocketStreamThrottle : public SocketStreamThrottle {
- private:
- DefaultSocketStreamThrottle() {}
- virtual ~DefaultSocketStreamThrottle() {}
- friend struct DefaultSingletonTraits<DefaultSocketStreamThrottle>;
-
- DISALLOW_COPY_AND_ASSIGN(DefaultSocketStreamThrottle);
-};
-
-class SocketStreamThrottleRegistry {
- public:
- SocketStreamThrottle* GetSocketStreamThrottleForScheme(
- const std::string& scheme);
-
- void RegisterSocketStreamThrottle(
- const std::string& scheme, SocketStreamThrottle* throttle);
-
- private:
- typedef base::hash_map<std::string, SocketStreamThrottle*> ThrottleMap;
-
- SocketStreamThrottleRegistry() {}
- ~SocketStreamThrottleRegistry() {}
- friend struct DefaultSingletonTraits<SocketStreamThrottleRegistry>;
-
- ThrottleMap throttles_;
-
- DISALLOW_COPY_AND_ASSIGN(SocketStreamThrottleRegistry);
-};
-
-SocketStreamThrottle*
-SocketStreamThrottleRegistry::GetSocketStreamThrottleForScheme(
- const std::string& scheme) {
- ThrottleMap::const_iterator found = throttles_.find(scheme);
- if (found == throttles_.end()) {
- SocketStreamThrottle* throttle =
- Singleton<DefaultSocketStreamThrottle>::get();
- throttles_[scheme] = throttle;
- return throttle;
- }
- return found->second;
-}
-
-void SocketStreamThrottleRegistry::RegisterSocketStreamThrottle(
- const std::string& scheme, SocketStreamThrottle* throttle) {
- throttles_[scheme] = throttle;
-}
-
-/* static */
-SocketStreamThrottle* SocketStreamThrottle::GetSocketStreamThrottleForScheme(
- const std::string& scheme) {
- SocketStreamThrottleRegistry* registry =
- Singleton<SocketStreamThrottleRegistry>::get();
- return registry->GetSocketStreamThrottleForScheme(scheme);
-}
-
-/* static */
-void SocketStreamThrottle::RegisterSocketStreamThrottle(
- const std::string& scheme, SocketStreamThrottle* throttle) {
- SocketStreamThrottleRegistry* registry =
- Singleton<SocketStreamThrottleRegistry>::get();
- registry->RegisterSocketStreamThrottle(scheme, throttle);
-}
-
-} // namespace net
diff --git a/net/socket_stream/socket_stream_throttle.h b/net/socket_stream/socket_stream_throttle.h
deleted file mode 100644
index 7726cbe..0000000
--- a/net/socket_stream/socket_stream_throttle.h
+++ /dev/null
@@ -1,81 +0,0 @@
-// Copyright (c) 2009 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#ifndef NET_SOCKET_STREAM_SOCKET_STREAM_THROTTLE_H_
-#define NET_SOCKET_STREAM_SOCKET_STREAM_THROTTLE_H_
-
-#include <string>
-
-#include "base/basictypes.h"
-#include "net/base/completion_callback.h"
-#include "net/base/net_errors.h"
-
-namespace net {
-
-class SocketStream;
-
-// Abstract interface to throttle SocketStream per URL scheme.
-// Each URL scheme (protocol) could define own SocketStreamThrottle.
-// These methods will be called on IO thread.
-class SocketStreamThrottle {
- public:
- // Called when |socket| is about to open connection.
- // Returns net::OK if the connection can open now.
- // Returns net::ERR_IO_PENDING if the connection should wait. In this case,
- // |callback| will be called when it's ready to open connection.
- virtual int OnStartOpenConnection(SocketStream* socket,
- CompletionCallback* callback) {
- // No throttle by default.
- return OK;
- }
-
- // Called when |socket| read |len| bytes of |data|.
- // May wake up another waiting socket.
- // Returns net::OK if |socket| can continue to run.
- // Returns net::ERR_IO_PENDING if |socket| should suspend to run. In this
- // case, |callback| will be called when it's ready to resume running.
- virtual int OnRead(SocketStream* socket, const char* data, int len,
- CompletionCallback* callback) {
- // No throttle by default.
- return OK;
- }
-
- // Called when |socket| wrote |len| bytes of |data|.
- // May wake up another waiting socket.
- // Returns net::OK if |socket| can continue to run.
- // Returns net::ERR_IO_PENDING if |socket| should suspend to run. In this
- // case, |callback| will be called when it's ready to resume running.
- virtual int OnWrite(SocketStream* socket, const char* data, int len,
- CompletionCallback* callback) {
- // No throttle by default.
- return OK;
- }
-
- // Called when |socket| is closed.
- // May wake up another waiting socket.
- virtual void OnClose(SocketStream* socket) {}
-
- // Gets SocketStreamThrottle for URL |scheme|.
- // Doesn't pass ownership of the SocketStreamThrottle.
- static SocketStreamThrottle* GetSocketStreamThrottleForScheme(
- const std::string& scheme);
-
- // Registers |throttle| for URL |scheme|.
- // Doesn't take ownership of |throttle|. Typically |throttle| is
- // singleton instance.
- static void RegisterSocketStreamThrottle(
- const std::string& scheme,
- SocketStreamThrottle* throttle);
-
- protected:
- SocketStreamThrottle() {}
- virtual ~SocketStreamThrottle() {}
-
- private:
- DISALLOW_COPY_AND_ASSIGN(SocketStreamThrottle);
-};
-
-} // namespace net
-
-#endif // NET_SOCKET_STREAM_SOCKET_STREAM_THROTTLE_H_
diff --git a/net/websockets/websocket_job.cc b/net/websockets/websocket_job.cc
index 59acfc5..62a62b7 100644
--- a/net/websockets/websocket_job.cc
+++ b/net/websockets/websocket_job.cc
@@ -10,6 +10,31 @@
#include "net/base/cookie_store.h"
#include "net/http/http_util.h"
#include "net/url_request/url_request_context.h"
+#include "net/websockets/websocket_throttle.h"
+
+namespace {
+
+class CompletionCallbackRunner
+ : public base::RefCountedThreadSafe<CompletionCallbackRunner> {
+ public:
+ explicit CompletionCallbackRunner(net::CompletionCallback* callback)
+ : callback_(callback) {
+ DCHECK(callback_);
+ }
+ void Run() {
+ callback_->Run(net::OK);
+ }
+ private:
+ friend class base::RefCountedThreadSafe<CompletionCallbackRunner>;
+
+ virtual ~CompletionCallbackRunner() {}
+
+ net::CompletionCallback* callback_;
+
+ DISALLOW_COPY_AND_ASSIGN(CompletionCallbackRunner);
+};
+
+}
namespace net {
@@ -75,6 +100,8 @@ void WebSocketJob::EnsureInit() {
WebSocketJob::WebSocketJob(SocketStream::Delegate* delegate)
: delegate_(delegate),
state_(INITIALIZED),
+ waiting_(false),
+ callback_(NULL),
handshake_request_sent_(0),
handshake_response_header_length_(0),
response_cookies_save_index_(0),
@@ -128,12 +155,27 @@ void WebSocketJob::RestartWithAuth(
void WebSocketJob::DetachDelegate() {
state_ = CLOSED;
+ Singleton<WebSocketThrottle>::get()->RemoveFromQueue(this);
+ Singleton<WebSocketThrottle>::get()->WakeupSocketIfNecessary();
+
delegate_ = NULL;
if (socket_)
socket_->DetachDelegate();
socket_ = NULL;
}
+int WebSocketJob::OnStartOpenConnection(
+ SocketStream* socket, CompletionCallback* callback) {
+ DCHECK(!callback_);
+ state_ = CONNECTING;
+ addresses_.Copy(socket->address_list().head(), true);
+ Singleton<WebSocketThrottle>::get()->PutInQueue(this);
+ if (!waiting_)
+ return OK;
+ callback_ = callback;
+ return ERR_IO_PENDING;
+}
+
void WebSocketJob::OnConnected(
SocketStream* socket, int max_pending_send_allowed) {
if (delegate_)
@@ -161,6 +203,9 @@ void WebSocketJob::OnReceivedData(
void WebSocketJob::OnClose(SocketStream* socket) {
state_ = CLOSED;
+ Singleton<WebSocketThrottle>::get()->RemoveFromQueue(this);
+ Singleton<WebSocketThrottle>::get()->WakeupSocketIfNecessary();
+
SocketStream::Delegate* delegate = delegate_;
delegate_ = NULL;
socket_ = NULL;
@@ -325,6 +370,9 @@ void WebSocketJob::SaveNextCookie() {
"\r\n" +
remaining_data;
state_ = OPEN;
+ Singleton<WebSocketThrottle>::get()->RemoveFromQueue(this);
+ Singleton<WebSocketThrottle>::get()->WakeupSocketIfNecessary();
+
if (delegate_)
delegate_->OnReceivedData(socket_,
received_data.data(), received_data.size());
@@ -376,4 +424,29 @@ GURL WebSocketJob::GetURLForCookies() const {
return url.ReplaceComponents(replacements);
}
+const AddressList& WebSocketJob::address_list() const {
+ return addresses_;
+}
+
+void WebSocketJob::SetWaiting() {
+ waiting_ = true;
+}
+
+bool WebSocketJob::IsWaiting() const {
+ return waiting_;
+}
+
+void WebSocketJob::Wakeup() {
+ waiting_ = false;
+ DCHECK(callback_);
+ // We wrap |callback_| to keep this alive while this is released.
+ scoped_refptr<CompletionCallbackRunner> runner =
+ new CompletionCallbackRunner(callback_);
+ callback_ = NULL;
+ MessageLoopForIO::current()->PostTask(
+ FROM_HERE,
+ NewRunnableMethod(runner.get(),
+ &CompletionCallbackRunner::Run));
+}
+
} // namespace net
diff --git a/net/websockets/websocket_job.h b/net/websockets/websocket_job.h
index 31fa503..23140c5 100644
--- a/net/websockets/websocket_job.h
+++ b/net/websockets/websocket_job.h
@@ -9,6 +9,7 @@
#include <vector>
#include "base/ref_counted.h"
+#include "net/base/address_list.h"
#include "net/base/completion_callback.h"
#include "net/socket_stream/socket_stream_job.h"
@@ -18,10 +19,9 @@ namespace net {
// WebSocket protocol specific job on SocketStream.
// It captures WebSocket handshake message and handles cookie operations.
-// Chome security policy doesn't allow renderer process (except dev tools)
+// Chrome security policy doesn't allow renderer process (except dev tools)
// see HttpOnly cookies, so it injects cookie header in handshake request and
// strips set-cookie headers in handshake response.
-// TODO(ukai): refactor to merge WebSocketThrottle functionality.
// TODO(ukai): refactor websocket.cc to use this.
class WebSocketJob : public SocketStreamJob, public SocketStream::Delegate {
public:
@@ -36,6 +36,7 @@ class WebSocketJob : public SocketStreamJob, public SocketStream::Delegate {
explicit WebSocketJob(SocketStream::Delegate* delegate);
+ State state() const { return state_; }
virtual void Connect();
virtual bool SendData(const char* data, int len);
virtual void Close();
@@ -45,6 +46,8 @@ class WebSocketJob : public SocketStreamJob, public SocketStream::Delegate {
virtual void DetachDelegate();
// SocketStream::Delegate methods.
+ virtual int OnStartOpenConnection(
+ SocketStream* socket, CompletionCallback* callback);
virtual void OnConnected(
SocketStream* socket, int max_pending_send_allowed);
virtual void OnSentData(
@@ -58,6 +61,7 @@ class WebSocketJob : public SocketStreamJob, public SocketStream::Delegate {
const SocketStream* socket, int error);
private:
+ friend class WebSocketThrottle;
friend class WebSocketJobTest;
virtual ~WebSocketJob();
@@ -74,8 +78,16 @@ class WebSocketJob : public SocketStreamJob, public SocketStream::Delegate {
GURL GetURLForCookies() const;
+ const AddressList& address_list() const;
+ void SetWaiting();
+ bool IsWaiting() const;
+ void Wakeup();
+
SocketStream::Delegate* delegate_;
State state_;
+ bool waiting_;
+ AddressList addresses_;
+ CompletionCallback* callback_; // for throttling.
std::string original_handshake_request_;
int original_handshake_request_header_length_;
diff --git a/net/websockets/websocket_job_unittest.cc b/net/websockets/websocket_job_unittest.cc
index de96a32..7bf9822 100644
--- a/net/websockets/websocket_job_unittest.cc
+++ b/net/websockets/websocket_job_unittest.cc
@@ -10,9 +10,11 @@
#include "net/base/cookie_policy.h"
#include "net/base/cookie_store.h"
#include "net/base/net_errors.h"
+#include "net/base/sys_addrinfo.h"
#include "net/socket_stream/socket_stream.h"
#include "net/url_request/url_request_context.h"
#include "net/websockets/websocket_job.h"
+#include "net/websockets/websocket_throttle.h"
#include "testing/gtest/include/gtest/gtest.h"
#include "testing/gmock/include/gmock/gmock.h"
#include "testing/platform_test.h"
@@ -206,15 +208,28 @@ class WebSocketJobTest : public PlatformTest {
websocket_ = new WebSocketJob(delegate);
socket_ = new MockSocketStream(url, websocket_.get());
websocket_->InitSocketStream(socket_.get());
- websocket_->state_ = WebSocketJob::CONNECTING;
websocket_->set_context(context_.get());
+ websocket_->state_ = WebSocketJob::CONNECTING;
+ struct addrinfo addr;
+ memset(&addr, 0, sizeof(struct addrinfo));
+ addr.ai_family = AF_INET;
+ addr.ai_addrlen = sizeof(struct sockaddr_in);
+ struct sockaddr_in sa_in;
+ memset(&sa_in, 0, sizeof(struct sockaddr_in));
+ memcpy(&sa_in.sin_addr, "\x7f\0\0\1", 4);
+ addr.ai_addr = reinterpret_cast<sockaddr*>(&sa_in);
+ addr.ai_next = NULL;
+ websocket_->addresses_.Copy(&addr, true);
+ Singleton<WebSocketThrottle>::get()->PutInQueue(websocket_);
}
WebSocketJob::State GetWebSocketJobState() {
return websocket_->state_;
}
void CloseWebSocketJob() {
- if (websocket_->socket_)
+ if (websocket_->socket_) {
websocket_->socket_->DetachDelegate();
+ Singleton<WebSocketThrottle>::get()->RemoveFromQueue(websocket_);
+ }
websocket_->state_ = WebSocketJob::CLOSED;
websocket_->delegate_ = NULL;
websocket_->socket_ = NULL;
diff --git a/net/websockets/websocket_throttle.cc b/net/websockets/websocket_throttle.cc
index 8d0d1fb..e2e98c3 100644
--- a/net/websockets/websocket_throttle.cc
+++ b/net/websockets/websocket_throttle.cc
@@ -13,6 +13,7 @@
#include "net/base/io_buffer.h"
#include "net/base/sys_addrinfo.h"
#include "net/socket_stream/socket_stream.h"
+#include "net/websockets/websocket_job.h"
namespace net {
@@ -41,119 +42,7 @@ static std::string AddrinfoToHashkey(const struct addrinfo* addrinfo) {
}
}
-// State for WebSocket protocol on each SocketStream.
-// This is owned in SocketStream as UserData keyed by WebSocketState::kKeyName.
-// This is alive between connection starts and handshake is finished.
-// In this class, it doesn't check actual handshake finishes, but only checks
-// end of header is found in read data.
-class WebSocketThrottle::WebSocketState : public SocketStream::UserData {
- public:
- explicit WebSocketState(const AddressList& addrs)
- : address_list_(addrs),
- callback_(NULL),
- waiting_(false),
- handshake_finished_(false),
- buffer_(NULL) {
- }
- ~WebSocketState() {}
-
- int OnStartOpenConnection(CompletionCallback* callback) {
- DCHECK(!callback_);
- if (!waiting_)
- return OK;
- callback_ = callback;
- return ERR_IO_PENDING;
- }
-
- int OnRead(const char* data, int len, CompletionCallback* callback) {
- DCHECK(!waiting_);
- DCHECK(!callback_);
- DCHECK(!handshake_finished_);
- static const int kBufferSize = 8129;
-
- if (!buffer_) {
- // Fast path.
- int eoh = HttpUtil::LocateEndOfHeaders(data, len, 0);
- if (eoh > 0) {
- handshake_finished_ = true;
- return OK;
- }
- buffer_ = new GrowableIOBuffer();
- buffer_->SetCapacity(kBufferSize);
- } else if (buffer_->RemainingCapacity() < len) {
- buffer_->SetCapacity(buffer_->capacity() + kBufferSize);
- }
- memcpy(buffer_->data(), data, len);
- buffer_->set_offset(buffer_->offset() + len);
-
- int eoh = HttpUtil::LocateEndOfHeaders(buffer_->StartOfBuffer(),
- buffer_->offset(), 0);
- handshake_finished_ = (eoh > 0);
- return OK;
- }
-
- const AddressList& address_list() const { return address_list_; }
- void SetWaiting() { waiting_ = true; }
- bool IsWaiting() const { return waiting_; }
- bool HandshakeFinished() const { return handshake_finished_; }
- void Wakeup() {
- waiting_ = false;
- // We wrap |callback_| to keep this alive while this is released.
- scoped_refptr<CompletionCallbackRunner> runner =
- new CompletionCallbackRunner(callback_);
- callback_ = NULL;
- MessageLoopForIO::current()->PostTask(
- FROM_HERE,
- NewRunnableMethod(runner.get(),
- &CompletionCallbackRunner::Run));
- }
-
- static const char* kKeyName;
-
- private:
- class CompletionCallbackRunner
- : public base::RefCountedThreadSafe<CompletionCallbackRunner> {
- public:
- explicit CompletionCallbackRunner(CompletionCallback* callback)
- : callback_(callback) {
- DCHECK(callback_);
- }
- void Run() {
- callback_->Run(OK);
- }
- private:
- friend class base::RefCountedThreadSafe<CompletionCallbackRunner>;
-
- virtual ~CompletionCallbackRunner() {}
-
- CompletionCallback* callback_;
-
- DISALLOW_COPY_AND_ASSIGN(CompletionCallbackRunner);
- };
-
- const AddressList& address_list_;
-
- CompletionCallback* callback_;
- // True if waiting another websocket connection is established.
- // False if the websocket is performing handshaking.
- bool waiting_;
-
- // True if the websocket handshake is completed.
- // If true, it will be removed from queue and deleted from the SocketStream
- // UserData soon.
- bool handshake_finished_;
-
- // Buffer for read data to check handshake response message.
- scoped_refptr<GrowableIOBuffer> buffer_;
-
- DISALLOW_COPY_AND_ASSIGN(WebSocketState);
-};
-
-const char* WebSocketThrottle::WebSocketState::kKeyName = "WebSocketState";
-
WebSocketThrottle::WebSocketThrottle() {
- SocketStreamThrottle::RegisterSocketStreamThrottle("ws", this);
- SocketStreamThrottle::RegisterSocketStreamThrottle("wss", this);
}
WebSocketThrottle::~WebSocketThrottle() {
@@ -161,51 +50,9 @@ WebSocketThrottle::~WebSocketThrottle() {
DCHECK(addr_map_.empty());
}
-int WebSocketThrottle::OnStartOpenConnection(
- SocketStream* socket, CompletionCallback* callback) {
- WebSocketState* state = new WebSocketState(socket->address_list());
- PutInQueue(socket, state);
- return state->OnStartOpenConnection(callback);
-}
-
-int WebSocketThrottle::OnRead(SocketStream* socket,
- const char* data, int len,
- CompletionCallback* callback) {
- WebSocketState* state = static_cast<WebSocketState*>(
- socket->GetUserData(WebSocketState::kKeyName));
- // If no state, handshake was already completed. Do nothing.
- if (!state)
- return OK;
-
- int result = state->OnRead(data, len, callback);
- if (state->HandshakeFinished()) {
- RemoveFromQueue(socket, state);
- WakeupSocketIfNecessary();
- }
- return result;
-}
-
-int WebSocketThrottle::OnWrite(SocketStream* socket,
- const char* data, int len,
- CompletionCallback* callback) {
- // Do nothing.
- return OK;
-}
-
-void WebSocketThrottle::OnClose(SocketStream* socket) {
- WebSocketState* state = static_cast<WebSocketState*>(
- socket->GetUserData(WebSocketState::kKeyName));
- if (!state)
- return;
- RemoveFromQueue(socket, state);
- WakeupSocketIfNecessary();
-}
-
-void WebSocketThrottle::PutInQueue(SocketStream* socket,
- WebSocketState* state) {
- socket->SetUserData(WebSocketState::kKeyName, state);
- queue_.push_back(state);
- const AddressList& address_list = socket->address_list();
+void WebSocketThrottle::PutInQueue(WebSocketJob* job) {
+ queue_.push_back(job);
+ const AddressList& address_list = job->address_list();
for (const struct addrinfo* addrinfo = address_list.head();
addrinfo != NULL;
addrinfo = addrinfo->ai_next) {
@@ -213,18 +60,29 @@ void WebSocketThrottle::PutInQueue(SocketStream* socket,
ConnectingAddressMap::iterator iter = addr_map_.find(addrkey);
if (iter == addr_map_.end()) {
ConnectingQueue* queue = new ConnectingQueue();
- queue->push_back(state);
+ queue->push_back(job);
addr_map_[addrkey] = queue;
} else {
- iter->second->push_back(state);
- state->SetWaiting();
+ iter->second->push_back(job);
+ job->SetWaiting();
}
}
}
-void WebSocketThrottle::RemoveFromQueue(SocketStream* socket,
- WebSocketState* state) {
- const AddressList& address_list = socket->address_list();
+void WebSocketThrottle::RemoveFromQueue(WebSocketJob* job) {
+ bool in_queue = false;
+ for (ConnectingQueue::iterator iter = queue_.begin();
+ iter != queue_.end();
+ ++iter) {
+ if (*iter == job) {
+ queue_.erase(iter);
+ in_queue = true;
+ break;
+ }
+ }
+ if (!in_queue)
+ return;
+ const AddressList& address_list = job->address_list();
for (const struct addrinfo* addrinfo = address_list.head();
addrinfo != NULL;
addrinfo = addrinfo->ai_next) {
@@ -232,34 +90,32 @@ void WebSocketThrottle::RemoveFromQueue(SocketStream* socket,
ConnectingAddressMap::iterator iter = addr_map_.find(addrkey);
DCHECK(iter != addr_map_.end());
ConnectingQueue* queue = iter->second;
- DCHECK(state == queue->front());
- queue->pop_front();
+ // Job may not be front of queue when job is closed early while waiting.
+ for (ConnectingQueue::iterator iter = queue->begin();
+ iter != queue->end();
+ ++iter) {
+ if (*iter == job) {
+ queue->erase(iter);
+ break;
+ }
+ }
if (queue->empty()) {
delete queue;
addr_map_.erase(iter);
}
}
- for (ConnectingQueue::iterator iter = queue_.begin();
- iter != queue_.end();
- ++iter) {
- if (*iter == state) {
- queue_.erase(iter);
- break;
- }
- }
- socket->SetUserData(WebSocketState::kKeyName, NULL);
}
void WebSocketThrottle::WakeupSocketIfNecessary() {
for (ConnectingQueue::iterator iter = queue_.begin();
iter != queue_.end();
++iter) {
- WebSocketState* state = *iter;
- if (!state->IsWaiting())
+ WebSocketJob* job = *iter;
+ if (!job->IsWaiting())
continue;
bool should_wakeup = true;
- const AddressList& address_list = state->address_list();
+ const AddressList& address_list = job->address_list();
for (const struct addrinfo* addrinfo = address_list.head();
addrinfo != NULL;
addrinfo = addrinfo->ai_next) {
@@ -267,19 +123,14 @@ void WebSocketThrottle::WakeupSocketIfNecessary() {
ConnectingAddressMap::iterator iter = addr_map_.find(addrkey);
DCHECK(iter != addr_map_.end());
ConnectingQueue* queue = iter->second;
- if (state != queue->front()) {
+ if (job != queue->front()) {
should_wakeup = false;
break;
}
}
if (should_wakeup)
- state->Wakeup();
+ job->Wakeup();
}
}
-/* static */
-void WebSocketThrottle::Init() {
- Singleton<WebSocketThrottle>::get();
-}
-
} // namespace net
diff --git a/net/websockets/websocket_throttle.h b/net/websockets/websocket_throttle.h
index 279aea2..d05b246 100644
--- a/net/websockets/websocket_throttle.h
+++ b/net/websockets/websocket_throttle.h
@@ -5,12 +5,17 @@
#ifndef NET_WEBSOCKETS_WEBSOCKET_THROTTLE_H_
#define NET_WEBSOCKETS_WEBSOCKET_THROTTLE_H_
+#include <deque>
+#include <string>
+
#include "base/hash_tables.h"
#include "base/singleton.h"
-#include "net/socket_stream/socket_stream_throttle.h"
namespace net {
+class SocketStream;
+class WebSocketJob;
+
// SocketStreamThrottle for WebSocket protocol.
// Implements the client-side requirements in the spec.
// http://tools.ietf.org/html/draft-hixie-thewebsocketprotocol
@@ -19,42 +24,31 @@ namespace net {
// remote host (IP address) identified by /host/, even if known by
// another name, wait until that connection has been established or
// for that connection to have failed.
-class WebSocketThrottle : public SocketStreamThrottle {
+class WebSocketThrottle {
public:
- virtual int OnStartOpenConnection(SocketStream* socket,
- CompletionCallback* callback);
- virtual int OnRead(SocketStream* socket, const char* data, int len,
- CompletionCallback* callback);
- virtual int OnWrite(SocketStream* socket, const char* data, int len,
- CompletionCallback* callback);
- virtual void OnClose(SocketStream* socket);
+ // Puts |job| in |queue_| and queues for the destination addresses
+ // of |job|.
+ // If other job is using the same destination address, set |job| waiting.
+ void PutInQueue(WebSocketJob* job);
- static void Init();
+ // Removes |job| from |queue_| and queues for the destination addresses
+ // of |job|.
+ void RemoveFromQueue(WebSocketJob* job);
+
+ // Checks sockets waiting in |queue_| and check the socket is the front of
+ // every queue for the destination addresses of |socket|.
+ // If so, the socket can resume estabilshing connection, so wake up
+ // the socket.
+ void WakeupSocketIfNecessary();
private:
- class WebSocketState;
- typedef std::deque<WebSocketState*> ConnectingQueue;
+ typedef std::deque<WebSocketJob*> ConnectingQueue;
typedef base::hash_map<std::string, ConnectingQueue*> ConnectingAddressMap;
WebSocketThrottle();
virtual ~WebSocketThrottle();
friend struct DefaultSingletonTraits<WebSocketThrottle>;
- // Puts |socket| in |queue_| and queues for the destination addresses
- // of |socket|. Also sets |state| as UserData of |socket|.
- // If other socket is using the same destination address, set |state| waiting.
- void PutInQueue(SocketStream* socket, WebSocketState* state);
-
- // Removes |socket| from |queue_| and queues for the destination addresses
- // of |socket|. Also releases |state| from UserData of |socket|.
- void RemoveFromQueue(SocketStream* socket, WebSocketState* state);
-
- // Checks sockets waiting in |queue_| and check the socket is the front of
- // every queue for the destination addresses of |socket|.
- // If so, the socket can resume estabilshing connection, so wake up
- // the socket.
- void WakeupSocketIfNecessary();
-
// Key: string of host's address. Value: queue of sockets for the address.
ConnectingAddressMap addr_map_;
diff --git a/net/websockets/websocket_throttle_unittest.cc b/net/websockets/websocket_throttle_unittest.cc
index 55276e9..d568292 100644
--- a/net/websockets/websocket_throttle_unittest.cc
+++ b/net/websockets/websocket_throttle_unittest.cc
@@ -10,6 +10,7 @@
#include "net/base/sys_addrinfo.h"
#include "net/base/test_completion_callback.h"
#include "net/socket_stream/socket_stream.h"
+#include "net/websockets/websocket_job.h"
#include "net/websockets/websocket_throttle.h"
#include "testing/gtest/include/gtest/gtest.h"
#include "testing/platform_test.h"
@@ -66,94 +67,205 @@ class WebSocketThrottleTest : public PlatformTest {
};
TEST_F(WebSocketThrottleTest, Throttle) {
- WebSocketThrottle::Init();
DummySocketStreamDelegate delegate;
- WebSocketThrottle* throttle = Singleton<WebSocketThrottle>::get();
-
- EXPECT_EQ(throttle,
- SocketStreamThrottle::GetSocketStreamThrottleForScheme("ws"));
- EXPECT_EQ(throttle,
- SocketStreamThrottle::GetSocketStreamThrottleForScheme("wss"));
-
// For host1: 1.2.3.4, 1.2.3.5, 1.2.3.6
struct addrinfo* addr = AddAddr(1, 2, 3, 4, NULL);
addr = AddAddr(1, 2, 3, 5, addr);
addr = AddAddr(1, 2, 3, 6, addr);
+ scoped_refptr<WebSocketJob> w1 = new WebSocketJob(&delegate);
scoped_refptr<SocketStream> s1 =
- new SocketStream(GURL("ws://host1/"), &delegate);
+ new SocketStream(GURL("ws://host1/"), w1.get());
+ w1->InitSocketStream(s1.get());
WebSocketThrottleTest::SetAddressList(s1, addr);
DeleteAddrInfo(addr);
+ DLOG(INFO) << "socket1";
TestCompletionCallback callback_s1;
- EXPECT_EQ(OK, throttle->OnStartOpenConnection(s1, &callback_s1));
+ // Trying to open connection to host1 will start without wait.
+ EXPECT_EQ(OK, w1->OnStartOpenConnection(s1, &callback_s1));
+
+ // Now connecting to host1, so waiting queue looks like
+ // Address | head -> tail
+ // 1.2.3.4 | w1
+ // 1.2.3.5 | w1
+ // 1.2.3.6 | w1
// For host2: 1.2.3.4
addr = AddAddr(1, 2, 3, 4, NULL);
+ scoped_refptr<WebSocketJob> w2 = new WebSocketJob(&delegate);
scoped_refptr<SocketStream> s2 =
- new SocketStream(GURL("ws://host2/"), &delegate);
+ new SocketStream(GURL("ws://host2/"), w2.get());
+ w2->InitSocketStream(s2.get());
WebSocketThrottleTest::SetAddressList(s2, addr);
DeleteAddrInfo(addr);
+ DLOG(INFO) << "socket2";
TestCompletionCallback callback_s2;
- EXPECT_EQ(ERR_IO_PENDING, throttle->OnStartOpenConnection(s2, &callback_s2));
+ // Trying to open connection to host2 will wait for w1.
+ EXPECT_EQ(ERR_IO_PENDING, w2->OnStartOpenConnection(s2, &callback_s2));
+ // Now waiting queue looks like
+ // Address | head -> tail
+ // 1.2.3.4 | w1 w2
+ // 1.2.3.5 | w1
+ // 1.2.3.6 | w1
// For host3: 1.2.3.5
addr = AddAddr(1, 2, 3, 5, NULL);
+ scoped_refptr<WebSocketJob> w3 = new WebSocketJob(&delegate);
scoped_refptr<SocketStream> s3 =
- new SocketStream(GURL("ws://host3/"), &delegate);
+ new SocketStream(GURL("ws://host3/"), w3.get());
+ w3->InitSocketStream(s3.get());
WebSocketThrottleTest::SetAddressList(s3, addr);
DeleteAddrInfo(addr);
+ DLOG(INFO) << "socket3";
TestCompletionCallback callback_s3;
- EXPECT_EQ(ERR_IO_PENDING, throttle->OnStartOpenConnection(s3, &callback_s3));
+ // Trying to open connection to host3 will wait for w1.
+ EXPECT_EQ(ERR_IO_PENDING, w3->OnStartOpenConnection(s3, &callback_s3));
+ // Address | head -> tail
+ // 1.2.3.4 | w1 w2
+ // 1.2.3.5 | w1 w3
+ // 1.2.3.6 | w1
// For host4: 1.2.3.4, 1.2.3.6
addr = AddAddr(1, 2, 3, 4, NULL);
addr = AddAddr(1, 2, 3, 6, addr);
+ scoped_refptr<WebSocketJob> w4 = new WebSocketJob(&delegate);
scoped_refptr<SocketStream> s4 =
- new SocketStream(GURL("ws://host4/"), &delegate);
+ new SocketStream(GURL("ws://host4/"), w4.get());
+ w4->InitSocketStream(s4.get());
WebSocketThrottleTest::SetAddressList(s4, addr);
DeleteAddrInfo(addr);
+ DLOG(INFO) << "socket4";
TestCompletionCallback callback_s4;
- EXPECT_EQ(ERR_IO_PENDING, throttle->OnStartOpenConnection(s4, &callback_s4));
+ // Trying to open connection to host4 will wait for w1, w2.
+ EXPECT_EQ(ERR_IO_PENDING, w4->OnStartOpenConnection(s4, &callback_s4));
+ // Address | head -> tail
+ // 1.2.3.4 | w1 w2 w4
+ // 1.2.3.5 | w1 w3
+ // 1.2.3.6 | w1 w4
+
+ // For host5: 1.2.3.6
+ addr = AddAddr(1, 2, 3, 6, NULL);
+ scoped_refptr<WebSocketJob> w5 = new WebSocketJob(&delegate);
+ scoped_refptr<SocketStream> s5 =
+ new SocketStream(GURL("ws://host5/"), w5.get());
+ w5->InitSocketStream(s5.get());
+ WebSocketThrottleTest::SetAddressList(s5, addr);
+ DeleteAddrInfo(addr);
+
+ DLOG(INFO) << "socket5";
+ TestCompletionCallback callback_s5;
+ // Trying to open connection to host5 will wait for w1, w4
+ EXPECT_EQ(ERR_IO_PENDING, w5->OnStartOpenConnection(s5, &callback_s5));
+ // Address | head -> tail
+ // 1.2.3.4 | w1 w2 w4
+ // 1.2.3.5 | w1 w3
+ // 1.2.3.6 | w1 w4 w5
+
+ // For host6: 1.2.3.6
+ addr = AddAddr(1, 2, 3, 6, NULL);
+ scoped_refptr<WebSocketJob> w6 = new WebSocketJob(&delegate);
+ scoped_refptr<SocketStream> s6 =
+ new SocketStream(GURL("ws://host6/"), w6.get());
+ w6->InitSocketStream(s6.get());
+ WebSocketThrottleTest::SetAddressList(s6, addr);
+ DeleteAddrInfo(addr);
+ DLOG(INFO) << "socket6";
+ TestCompletionCallback callback_s6;
+ // Trying to open connection to host6 will wait for w1, w4, w5
+ EXPECT_EQ(ERR_IO_PENDING, w6->OnStartOpenConnection(s6, &callback_s6));
+ // Address | head -> tail
+ // 1.2.3.4 | w1 w2 w4
+ // 1.2.3.5 | w1 w3
+ // 1.2.3.6 | w1 w4 w5 w6
+
+ // Receive partial response on w1, still connecting.
+ DLOG(INFO) << "socket1 1";
static const char kHeader[] = "HTTP/1.1 101 Web Socket Protocol\r\n";
- EXPECT_EQ(OK,
- throttle->OnRead(s1.get(), kHeader, sizeof(kHeader) - 1, NULL));
+ w1->OnReceivedData(s1.get(), kHeader, sizeof(kHeader) - 1);
EXPECT_FALSE(callback_s2.have_result());
EXPECT_FALSE(callback_s3.have_result());
EXPECT_FALSE(callback_s4.have_result());
+ EXPECT_FALSE(callback_s5.have_result());
+ EXPECT_FALSE(callback_s6.have_result());
+ // Receive rest of handshake response on w1.
+ DLOG(INFO) << "socket1 2";
static const char kHeader2[] =
"Upgrade: WebSocket\r\n"
"Connection: Upgrade\r\n"
"WebSocket-Origin: http://www.google.com\r\n"
"WebSocket-Location: ws://websocket.chromium.org\r\n"
"\r\n";
- EXPECT_EQ(OK,
- throttle->OnRead(s1.get(), kHeader2, sizeof(kHeader2) - 1, NULL));
+ w1->OnReceivedData(s1.get(), kHeader2, sizeof(kHeader2) - 1);
MessageLoopForIO::current()->RunAllPending();
+ // Now, w1 is open.
+ EXPECT_EQ(WebSocketJob::OPEN, w1->state());
+ // So, w2 and w3 can start connecting. w4 needs to wait w2 (1.2.3.4)
EXPECT_TRUE(callback_s2.have_result());
EXPECT_TRUE(callback_s3.have_result());
EXPECT_FALSE(callback_s4.have_result());
+ // Address | head -> tail
+ // 1.2.3.4 | w2 w4
+ // 1.2.3.5 | w3
+ // 1.2.3.6 | w4 w5 w6
- throttle->OnClose(s1.get());
+ // Closing s1 doesn't change waiting queue.
+ DLOG(INFO) << "socket1 close";
+ w1->OnClose(s1.get());
MessageLoopForIO::current()->RunAllPending();
EXPECT_FALSE(callback_s4.have_result());
s1->DetachDelegate();
+ // Address | head -> tail
+ // 1.2.3.4 | w2 w4
+ // 1.2.3.5 | w3
+ // 1.2.3.6 | w4 w5 w6
+
+ // w5 can close while waiting in queue.
+ DLOG(INFO) << "socket5 close";
+ // w5 close() closes SocketStream that change state to STATE_CLOSE, calls
+ // DoLoop(), so OnClose() callback will be called.
+ w5->OnClose(s5.get());
+ MessageLoopForIO::current()->RunAllPending();
+ EXPECT_FALSE(callback_s4.have_result());
+ // Address | head -> tail
+ // 1.2.3.4 | w2 w4
+ // 1.2.3.5 | w3
+ // 1.2.3.6 | w4 w6
+ s5->DetachDelegate();
+
+ // w6 close abnormally (e.g. renderer finishes) while waiting in queue.
+ DLOG(INFO) << "socket6 close abnormally";
+ w6->DetachDelegate();
+ MessageLoopForIO::current()->RunAllPending();
+ EXPECT_FALSE(callback_s4.have_result());
+ // Address | head -> tail
+ // 1.2.3.4 | w2 w4
+ // 1.2.3.5 | w3
+ // 1.2.3.6 | w4
- throttle->OnClose(s2.get());
+ // Closing s2 kicks w4 to start connecting.
+ DLOG(INFO) << "socket2 close";
+ w2->OnClose(s2.get());
MessageLoopForIO::current()->RunAllPending();
EXPECT_TRUE(callback_s4.have_result());
+ // Address | head -> tail
+ // 1.2.3.4 | w4
+ // 1.2.3.5 | w3
+ // 1.2.3.6 | w4
s2->DetachDelegate();
- throttle->OnClose(s3.get());
+ DLOG(INFO) << "socket3 close";
+ w3->OnClose(s3.get());
MessageLoopForIO::current()->RunAllPending();
s3->DetachDelegate();
- throttle->OnClose(s4.get());
+ w4->OnClose(s4.get());
s4->DetachDelegate();
+ DLOG(INFO) << "Done";
}
}