diff options
author | ukai@chromium.org <ukai@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2009-11-04 10:02:28 +0000 |
---|---|---|
committer | ukai@chromium.org <ukai@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2009-11-04 10:02:28 +0000 |
commit | 4c4eac006bdbf1ad0f3e37db47c488f7c3ee4949 (patch) | |
tree | 16237134b9e037cf609ca4211b5c9efbcd8759ee /net/socket_stream | |
parent | 8ecd3aade85b825c8206735f16c23880023782db (diff) | |
download | chromium_src-4c4eac006bdbf1ad0f3e37db47c488f7c3ee4949.zip chromium_src-4c4eac006bdbf1ad0f3e37db47c488f7c3ee4949.tar.gz chromium_src-4c4eac006bdbf1ad0f3e37db47c488f7c3ee4949.tar.bz2 |
Implement websocket throttling.
Implement the client-side requirements in the spec.
4.1 Handshake
1. If the user agent already has a Web Socket connection to the
remote host (IP address) identified by /host/, even if known by
another name, wait until that connection has been established or
for that connection to have failed.
BUG=none
TEST=net_unittests passes
Review URL: http://codereview.chromium.org/342052
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@30949 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'net/socket_stream')
-rw-r--r-- | net/socket_stream/socket_stream.cc | 60 | ||||
-rw-r--r-- | net/socket_stream/socket_stream.h | 14 | ||||
-rw-r--r-- | net/socket_stream/socket_stream_throttle.cc | 80 | ||||
-rw-r--r-- | net/socket_stream/socket_stream_throttle.h | 81 |
4 files changed, 210 insertions, 25 deletions
diff --git a/net/socket_stream/socket_stream.cc b/net/socket_stream/socket_stream.cc index 62aad61..aa0795c 100644 --- a/net/socket_stream/socket_stream.cc +++ b/net/socket_stream/socket_stream.cc @@ -25,6 +25,7 @@ #include "net/socket/socks5_client_socket.h" #include "net/socket/socks_client_socket.h" #include "net/socket/tcp_client_socket.h" +#include "net/socket_stream/socket_stream_throttle.h" #include "net/url_request/url_request.h" static const int kMaxPendingSendAllowed = 32768; // 32 kilobytes. @@ -55,12 +56,16 @@ SocketStream::SocketStream(const GURL& url, Delegate* delegate) write_buf_(NULL), current_write_buf_(NULL), write_buf_offset_(0), - write_buf_size_(0) { + write_buf_size_(0), + throttle_( + SocketStreamThrottle::GetSocketStreamThrottleForScheme( + url.scheme())) { DCHECK(MessageLoop::current()) << "The current MessageLoop must exist"; DCHECK_EQ(MessageLoop::TYPE_IO, MessageLoop::current()->type()) << "The current MessageLoop must be TYPE_IO"; DCHECK(delegate_); + DCHECK(throttle_); } SocketStream::~SocketStream() { @@ -199,6 +204,7 @@ void SocketStream::Finish(int result) { if (delegate) { delegate->OnClose(this); } + throttle_->OnClose(this); Release(); } @@ -213,6 +219,10 @@ void SocketStream::SetClientSocketFactory( factory_ = factory; } +void SocketStream::CopyAddrInfo(struct addrinfo* head) { + addresses_.Copy(head); +} + int SocketStream::DidEstablishConnection() { if (!socket_.get() || !socket_->IsConnected()) { next_state_ = STATE_CLOSE; @@ -226,24 +236,29 @@ int SocketStream::DidEstablishConnection() { return OK; } -void SocketStream::DidReceiveData(int result) { +int SocketStream::DidReceiveData(int result) { DCHECK(read_buf_); DCHECK_GT(result, 0); - if (!delegate_) - return; - // Notify recevied data to delegate. - delegate_->OnReceivedData(this, read_buf_->data(), result); + int len = result; + result = throttle_->OnRead(this, read_buf_->data(), len, &io_callback_); + if (delegate_) { + // Notify recevied data to delegate. + delegate_->OnReceivedData(this, read_buf_->data(), len); + } read_buf_ = NULL; + return result; } -void SocketStream::DidSendData(int result) { - current_write_buf_ = NULL; +int SocketStream::DidSendData(int result) { DCHECK_GT(result, 0); - if (!delegate_) - return; + int len = result; + result = throttle_->OnWrite(this, current_write_buf_->data(), len, + &io_callback_); + current_write_buf_ = NULL; + if (delegate_) + delegate_->OnSentData(this, len); - delegate_->OnSentData(this, result); - int remaining_size = write_buf_size_ - write_buf_offset_ - result; + int remaining_size = write_buf_size_ - write_buf_offset_ - len; if (remaining_size == 0) { if (!pending_write_bufs_.empty()) { write_buf_size_ = pending_write_bufs_.front()->size(); @@ -255,8 +270,9 @@ void SocketStream::DidSendData(int result) { } write_buf_offset_ = 0; } else { - write_buf_offset_ += result; + write_buf_offset_ += len; } + return result; } void SocketStream::OnIOCompleted(int result) { @@ -268,16 +284,14 @@ void SocketStream::OnReadCompleted(int result) { // 0 indicates end-of-file, so socket was closed. next_state_ = STATE_CLOSE; } else if (result > 0 && read_buf_) { - DidReceiveData(result); - result = OK; + result = DidReceiveData(result); } DoLoop(result); } void SocketStream::OnWriteCompleted(int result) { if (result >= 0 && write_buf_) { - DidSendData(result); - result = OK; + result = DidSendData(result); } DoLoop(result); } @@ -407,10 +421,12 @@ int SocketStream::DoResolveHost() { } int SocketStream::DoResolveHostComplete(int result) { - if (result == OK) + if (result == OK) { next_state_ = STATE_TCP_CONNECT; - else + result = throttle_->OnStartOpenConnection(this, &io_callback_); + } else { next_state_ = STATE_CLOSE; + } // TODO(ukai): if error occured, reconsider proxy after error. return result; } @@ -680,8 +696,7 @@ int SocketStream::DoReadWrite(int result) { read_buf_ = new IOBuffer(kReadBufferSize); result = socket_->Read(read_buf_, kReadBufferSize, &read_callback_); if (result > 0) { - DidReceiveData(result); - return OK; + return DidReceiveData(result); } else if (result == 0) { // 0 indicates end-of-file, so socket was closed. next_state_ = STATE_CLOSE; @@ -705,8 +720,7 @@ int SocketStream::DoReadWrite(int result) { current_write_buf_->BytesRemaining(), &write_callback_); if (result > 0) { - DidSendData(result); - return OK; + return DidSendData(result); } // If write is not pending, return the result and do next loop (to close // the connection). diff --git a/net/socket_stream/socket_stream.h b/net/socket_stream/socket_stream.h index 37b723a..14dfd36 100644 --- a/net/socket_stream/socket_stream.h +++ b/net/socket_stream/socket_stream.h @@ -31,6 +31,7 @@ class ClientSocketFactory; class HostResolver; class SSLConfigService; class SingleRequestHostResolver; +class SocketStreamThrottle; // SocketStream is used to implement Web Sockets. // It provides plain full-duplex stream with proxy and SSL support. @@ -96,6 +97,7 @@ class SocketStream : public base::RefCountedThreadSafe<SocketStream> { void SetUserData(const void* key, UserData* data); const GURL& url() const { return url_; } + const AddressList& address_list() const { return addresses_; } Delegate* delegate() const { return delegate_; } int max_pending_send_allowed() const { return max_pending_send_allowed_; } @@ -191,14 +193,20 @@ class SocketStream : public base::RefCountedThreadSafe<SocketStream> { friend class base::RefCountedThreadSafe<SocketStream>; ~SocketStream(); + friend class WebSocketThrottleTest; + + // Copies the given addrinfo list in |addresses_|. + // Used for WebSocketThrottleTest. + void CopyAddrInfo(struct addrinfo* head); + // Finishes the job. // Calls OnError and OnClose of delegate, and no more // notifications will be sent to delegate. void Finish(int result); int DidEstablishConnection(); - void DidReceiveData(int result); - void DidSendData(int result); + int DidReceiveData(int result); + int DidSendData(int result); void OnIOCompleted(int result); void OnReadCompleted(int result); @@ -289,6 +297,8 @@ class SocketStream : public base::RefCountedThreadSafe<SocketStream> { int write_buf_size_; PendingDataQueue pending_write_bufs_; + SocketStreamThrottle* throttle_; + DISALLOW_COPY_AND_ASSIGN(SocketStream); }; diff --git a/net/socket_stream/socket_stream_throttle.cc b/net/socket_stream/socket_stream_throttle.cc new file mode 100644 index 0000000..6a1d20d --- /dev/null +++ b/net/socket_stream/socket_stream_throttle.cc @@ -0,0 +1,80 @@ +// Copyright (c) 2009 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include <string> + +#include "net/socket_stream/socket_stream_throttle.h" + +#include "base/hash_tables.h" +#include "base/singleton.h" +#include "net/base/completion_callback.h" +#include "net/socket_stream/socket_stream.h" + +namespace net { + +// Default SocketStreamThrottle. No throttling. Used for unknown URL scheme. +class DefaultSocketStreamThrottle : public SocketStreamThrottle { + private: + DefaultSocketStreamThrottle() {} + virtual ~DefaultSocketStreamThrottle() {} + friend struct DefaultSingletonTraits<DefaultSocketStreamThrottle>; + + DISALLOW_COPY_AND_ASSIGN(DefaultSocketStreamThrottle); +}; + +class SocketStreamThrottleRegistry { + public: + SocketStreamThrottle* GetSocketStreamThrottleForScheme( + const std::string& scheme); + + void RegisterSocketStreamThrottle( + const std::string& scheme, SocketStreamThrottle* throttle); + + private: + typedef base::hash_map<std::string, SocketStreamThrottle*> ThrottleMap; + + SocketStreamThrottleRegistry() {} + ~SocketStreamThrottleRegistry() {} + friend struct DefaultSingletonTraits<SocketStreamThrottleRegistry>; + + ThrottleMap throttles_; + + DISALLOW_COPY_AND_ASSIGN(SocketStreamThrottleRegistry); +}; + +SocketStreamThrottle* +SocketStreamThrottleRegistry::GetSocketStreamThrottleForScheme( + const std::string& scheme) { + ThrottleMap::const_iterator found = throttles_.find(scheme); + if (found == throttles_.end()) { + SocketStreamThrottle* throttle = + Singleton<DefaultSocketStreamThrottle>::get(); + throttles_[scheme] = throttle; + return throttle; + } + return found->second; +} + +void SocketStreamThrottleRegistry::RegisterSocketStreamThrottle( + const std::string& scheme, SocketStreamThrottle* throttle) { + throttles_[scheme] = throttle; +} + +/* static */ +SocketStreamThrottle* SocketStreamThrottle::GetSocketStreamThrottleForScheme( + const std::string& scheme) { + SocketStreamThrottleRegistry* registry = + Singleton<SocketStreamThrottleRegistry>::get(); + return registry->GetSocketStreamThrottleForScheme(scheme); +} + +/* static */ +void SocketStreamThrottle::RegisterSocketStreamThrottle( + const std::string& scheme, SocketStreamThrottle* throttle) { + SocketStreamThrottleRegistry* registry = + Singleton<SocketStreamThrottleRegistry>::get(); + registry->RegisterSocketStreamThrottle(scheme, throttle); +} + +} // namespace net diff --git a/net/socket_stream/socket_stream_throttle.h b/net/socket_stream/socket_stream_throttle.h new file mode 100644 index 0000000..7726cbe --- /dev/null +++ b/net/socket_stream/socket_stream_throttle.h @@ -0,0 +1,81 @@ +// Copyright (c) 2009 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef NET_SOCKET_STREAM_SOCKET_STREAM_THROTTLE_H_ +#define NET_SOCKET_STREAM_SOCKET_STREAM_THROTTLE_H_ + +#include <string> + +#include "base/basictypes.h" +#include "net/base/completion_callback.h" +#include "net/base/net_errors.h" + +namespace net { + +class SocketStream; + +// Abstract interface to throttle SocketStream per URL scheme. +// Each URL scheme (protocol) could define own SocketStreamThrottle. +// These methods will be called on IO thread. +class SocketStreamThrottle { + public: + // Called when |socket| is about to open connection. + // Returns net::OK if the connection can open now. + // Returns net::ERR_IO_PENDING if the connection should wait. In this case, + // |callback| will be called when it's ready to open connection. + virtual int OnStartOpenConnection(SocketStream* socket, + CompletionCallback* callback) { + // No throttle by default. + return OK; + } + + // Called when |socket| read |len| bytes of |data|. + // May wake up another waiting socket. + // Returns net::OK if |socket| can continue to run. + // Returns net::ERR_IO_PENDING if |socket| should suspend to run. In this + // case, |callback| will be called when it's ready to resume running. + virtual int OnRead(SocketStream* socket, const char* data, int len, + CompletionCallback* callback) { + // No throttle by default. + return OK; + } + + // Called when |socket| wrote |len| bytes of |data|. + // May wake up another waiting socket. + // Returns net::OK if |socket| can continue to run. + // Returns net::ERR_IO_PENDING if |socket| should suspend to run. In this + // case, |callback| will be called when it's ready to resume running. + virtual int OnWrite(SocketStream* socket, const char* data, int len, + CompletionCallback* callback) { + // No throttle by default. + return OK; + } + + // Called when |socket| is closed. + // May wake up another waiting socket. + virtual void OnClose(SocketStream* socket) {} + + // Gets SocketStreamThrottle for URL |scheme|. + // Doesn't pass ownership of the SocketStreamThrottle. + static SocketStreamThrottle* GetSocketStreamThrottleForScheme( + const std::string& scheme); + + // Registers |throttle| for URL |scheme|. + // Doesn't take ownership of |throttle|. Typically |throttle| is + // singleton instance. + static void RegisterSocketStreamThrottle( + const std::string& scheme, + SocketStreamThrottle* throttle); + + protected: + SocketStreamThrottle() {} + virtual ~SocketStreamThrottle() {} + + private: + DISALLOW_COPY_AND_ASSIGN(SocketStreamThrottle); +}; + +} // namespace net + +#endif // NET_SOCKET_STREAM_SOCKET_STREAM_THROTTLE_H_ |