diff options
author | kkania@chromium.org <kkania@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2013-04-06 16:56:01 +0000 |
---|---|---|
committer | kkania@chromium.org <kkania@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2013-04-06 16:56:01 +0000 |
commit | 09b0a1b94a3fff60fe9c4f9498a40ede14839d9a (patch) | |
tree | 2d487be208cdc96b19bf6cebd51b203fffaebd32 | |
parent | a9f53039f7a049d41417b128c2476b162c839ac7 (diff) | |
download | chromium_src-09b0a1b94a3fff60fe9c4f9498a40ede14839d9a.zip chromium_src-09b0a1b94a3fff60fe9c4f9498a40ede14839d9a.tar.gz chromium_src-09b0a1b94a3fff60fe9c4f9498a40ede14839d9a.tar.bz2 |
[chromedriver] Use net::StreamSocket directly instead of net::WebSocketJob.
This lessens our dependence on net but also allows us to send large messages,
which was broken from r192080.
BUG=none
Review URL: https://codereview.chromium.org/13646018
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@192760 0039d316-1c4b-4281-b951-d872f2087c98
-rw-r--r-- | chrome/test/chromedriver/net/sync_websocket_impl.cc | 2 | ||||
-rw-r--r-- | chrome/test/chromedriver/net/websocket.cc | 229 | ||||
-rw-r--r-- | chrome/test/chromedriver/net/websocket.h | 61 | ||||
-rw-r--r-- | chrome/test/chromedriver/net/websocket_unittest.cc | 10 |
4 files changed, 195 insertions, 107 deletions
diff --git a/chrome/test/chromedriver/net/sync_websocket_impl.cc b/chrome/test/chromedriver/net/sync_websocket_impl.cc index bbf1d5c..418b21f 100644 --- a/chrome/test/chromedriver/net/sync_websocket_impl.cc +++ b/chrome/test/chromedriver/net/sync_websocket_impl.cc @@ -108,7 +108,7 @@ void SyncWebSocketImpl::Core::ConnectOnIO( base::AutoLock lock(lock_); received_queue_.clear(); } - socket_.reset(new WebSocket(context_getter_, url, this)); + socket_.reset(new WebSocket(url, this)); socket_->Connect(base::Bind( &SyncWebSocketImpl::Core::OnConnectCompletedOnIO, this, success, event)); diff --git a/chrome/test/chromedriver/net/websocket.cc b/chrome/test/chromedriver/net/websocket.cc index 166b832..c8fe845 100644 --- a/chrome/test/chromedriver/net/websocket.cc +++ b/chrome/test/chromedriver/net/websocket.cc @@ -5,52 +5,56 @@ #include "chrome/test/chromedriver/net/websocket.h" #include "base/base64.h" +#include "base/bind.h" +#include "base/bind_helpers.h" #include "base/memory/scoped_vector.h" #include "base/rand_util.h" #include "base/sha1.h" #include "base/stringprintf.h" -#include "base/strings/string_split.h" +#include "base/strings/string_number_conversions.h" +#include "net/base/address_list.h" #include "net/base/io_buffer.h" +#include "net/base/ip_endpoint.h" +#include "net/base/net_errors.h" +#include "net/base/net_util.h" #include "net/http/http_response_headers.h" #include "net/http/http_util.h" -#include "net/url_request/url_request_context_getter.h" #include "net/websockets/websocket_frame.h" -#include "net/websockets/websocket_job.h" - -WebSocket::WebSocket( - net::URLRequestContextGetter* context_getter, - const GURL& url, - WebSocketListener* listener) - : context_getter_(context_getter), - url_(url), + +WebSocket::WebSocket(const GURL& url, WebSocketListener* listener) + : url_(url), listener_(listener), - connected_(false) { - net::WebSocketJob::EnsureInit(); - web_socket_ = new net::WebSocketJob(this); + state_(INITIALIZED), + write_buffer_(new net::DrainableIOBuffer(new net::IOBuffer(0), 0)), + read_buffer_(new net::IOBufferWithSize(4096)) { + net::IPAddressNumber address; + CHECK(net::ParseIPLiteralToNumber(url_.HostNoBrackets(), &address)); + int port = 80; + base::StringToInt(url_.port(), &port); + net::AddressList addresses(net::IPEndPoint(address, port)); + net::NetLog::Source source; + socket_.reset(new net::TCPClientSocket(addresses, NULL, source)); } WebSocket::~WebSocket() { CHECK(thread_checker_.CalledOnValidThread()); - web_socket_->Close(); - web_socket_->DetachDelegate(); } void WebSocket::Connect(const net::CompletionCallback& callback) { CHECK(thread_checker_.CalledOnValidThread()); - CHECK_EQ(net::WebSocketJob::INITIALIZED, web_socket_->state()); - + CHECK_EQ(INITIALIZED, state_); + state_ = CONNECTING; connect_callback_ = callback; - - scoped_refptr<net::SocketStream> socket = new net::SocketStream( - url_, web_socket_); - socket->set_context(context_getter_->GetURLRequestContext()); - - web_socket_->InitSocketStream(socket); - web_socket_->Connect(); + int code = socket_->Connect(base::Bind( + &WebSocket::OnSocketConnect, base::Unretained(this))); + if (code != net::ERR_IO_PENDING) + OnSocketConnect(code); } bool WebSocket::Send(const std::string& message) { CHECK(thread_checker_.CalledOnValidThread()); + if (state_ != OPEN) + return false; net::WebSocketFrameHeader header; header.final = true; @@ -70,12 +74,16 @@ bool WebSocket::Send(const std::string& message) { std::string masked_message = message; net::MaskWebSocketFramePayload( masking_key, 0, &masked_message[0], masked_message.length()); - std::string data = header_str + masked_message; - return web_socket_->SendData(data.c_str(), data.length()); + Write(header_str + masked_message); + return true; } -void WebSocket::OnConnected(net::SocketStream* socket, - int max_pending_send_allowed) { +void WebSocket::OnSocketConnect(int code) { + if (code != net::OK) { + Close(code); + return; + } + CHECK(base::Base64Encode(base::RandBytesAsString(16), &sec_key_)); std::string handshake = base::StringPrintf( "GET %s HTTP/1.1\r\n" @@ -90,61 +98,132 @@ void WebSocket::OnConnected(net::SocketStream* socket, url_.path().c_str(), url_.host().c_str(), sec_key_.c_str()); - if (!web_socket_->SendData(handshake.c_str(), handshake.length())) - OnConnectFinished(net::ERR_FAILED); + Write(handshake); + Read(); +} + +void WebSocket::Write(const std::string& data) { + pending_write_ += data; + if (!write_buffer_->BytesRemaining()) + ContinueWritingIfNecessary(); +} + +void WebSocket::OnWrite(int code) { + if (!socket_->IsConnected()) { + // Supposedly if |StreamSocket| is closed, the error code may be undefined. + Close(net::ERR_FAILED); + return; + } + if (code < 0) { + Close(code); + return; + } + + write_buffer_->DidConsume(code); + ContinueWritingIfNecessary(); } -void WebSocket::OnSentData(net::SocketStream* socket, - int amount_sent) {} - -void WebSocket::OnReceivedData(net::SocketStream* socket, - const char* data, int len) { - net::WebSocketJob::State state = web_socket_->state(); - if (!connect_callback_.is_null()) { - // WebSocketJob guarantees the first OnReceivedData call contains all - // the response headers. - const char kMagicKey[] = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; - std::string websocket_accept; - CHECK(base::Base64Encode(base::SHA1HashString(sec_key_ + kMagicKey), - &websocket_accept)); - scoped_refptr<net::HttpResponseHeaders> headers( - new net::HttpResponseHeaders( - net::HttpUtil::AssembleRawHeaders(data, len))); - if (headers->response_code() != 101 || - !headers->HasHeaderValue("Upgrade", "WebSocket") || - !headers->HasHeaderValue("Connection", "Upgrade") || - !headers->HasHeaderValue("Sec-WebSocket-Accept", websocket_accept)) { - OnConnectFinished(net::ERR_FAILED); +void WebSocket::ContinueWritingIfNecessary() { + if (!write_buffer_->BytesRemaining()) { + if (pending_write_.empty()) return; - } - OnConnectFinished( - state == net::WebSocketJob::OPEN ? net::OK : net::ERR_FAILED); - } else if (connected_) { - ScopedVector<net::WebSocketFrameChunk> frame_chunks; - CHECK(parser_.Decode(data, len, &frame_chunks)); - for (size_t i = 0; i < frame_chunks.size(); ++i) { - scoped_refptr<net::IOBufferWithSize> buffer = frame_chunks[i]->data; - if (buffer) - next_message_ += std::string(buffer->data(), buffer->size()); - if (frame_chunks[i]->final_chunk) { - listener_->OnMessageReceived(next_message_); - next_message_.clear(); - } - } + write_buffer_ = new net::DrainableIOBuffer( + new net::StringIOBuffer(pending_write_), + pending_write_.length()); + pending_write_.clear(); } + int code = socket_->Write( + write_buffer_, + write_buffer_->BytesRemaining(), + base::Bind(&WebSocket::OnWrite, base::Unretained(this))); + if (code != net::ERR_IO_PENDING) + OnWrite(code); } -void WebSocket::OnClose(net::SocketStream* socket) { - if (!connect_callback_.is_null()) - OnConnectFinished(net::ERR_CONNECTION_CLOSED); - else - listener_->OnClose(); +void WebSocket::Read() { + int code = socket_->Read( + read_buffer_, + read_buffer_->size(), + base::Bind(&WebSocket::OnRead, base::Unretained(this))); + if (code != net::ERR_IO_PENDING) + OnRead(code); +} + +void WebSocket::OnRead(int code) { + if (code <= 0) { + Close(code ? code : net::ERR_FAILED); + return; + } + + if (state_ == CONNECTING) + OnReadDuringHandshake(read_buffer_->data(), code); + else if (state_ == OPEN) + OnReadDuringOpen(read_buffer_->data(), code); + + if (state_ != CLOSED) + Read(); +} + +void WebSocket::OnReadDuringHandshake(const char* data, int len) { + handshake_response_ += std::string(data, len); + int headers_end = net::HttpUtil::LocateEndOfHeaders( + handshake_response_.data(), handshake_response_.size(), 0); + if (headers_end == -1) + return; + + const char kMagicKey[] = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; + std::string websocket_accept; + CHECK(base::Base64Encode(base::SHA1HashString(sec_key_ + kMagicKey), + &websocket_accept)); + scoped_refptr<net::HttpResponseHeaders> headers( + new net::HttpResponseHeaders( + net::HttpUtil::AssembleRawHeaders( + handshake_response_.data(), headers_end))); + if (headers->response_code() != 101 || + !headers->HasHeaderValue("Upgrade", "WebSocket") || + !headers->HasHeaderValue("Connection", "Upgrade") || + !headers->HasHeaderValue("Sec-WebSocket-Accept", websocket_accept)) { + Close(net::ERR_FAILED); + return; + } + std::string leftover_message = handshake_response_.substr(headers_end); + handshake_response_.clear(); + sec_key_.clear(); + state_ = OPEN; + InvokeConnectCallback(net::OK); + if (!leftover_message.empty()) + OnReadDuringOpen(leftover_message.c_str(), leftover_message.length()); +} + +void WebSocket::OnReadDuringOpen(const char* data, int len) { + ScopedVector<net::WebSocketFrameChunk> frame_chunks; + CHECK(parser_.Decode(data, len, &frame_chunks)); + for (size_t i = 0; i < frame_chunks.size(); ++i) { + scoped_refptr<net::IOBufferWithSize> buffer = frame_chunks[i]->data; + if (buffer) + next_message_ += std::string(buffer->data(), buffer->size()); + if (frame_chunks[i]->final_chunk) { + listener_->OnMessageReceived(next_message_); + next_message_.clear(); + } + } } -void WebSocket::OnConnectFinished(net::Error error) { - if (error == net::OK) - connected_ = true; +void WebSocket::InvokeConnectCallback(int code) { net::CompletionCallback temp = connect_callback_; connect_callback_.Reset(); - temp.Run(error); + CHECK(!temp.is_null()); + temp.Run(code); } + +void WebSocket::Close(int code) { + socket_->Disconnect(); + if (!connect_callback_.is_null()) + InvokeConnectCallback(code); + if (state_ == OPEN) + listener_->OnClose(); + + state_ = CLOSED; +} + + diff --git a/chrome/test/chromedriver/net/websocket.h b/chrome/test/chromedriver/net/websocket.h index f2481c9..9faccf2 100644 --- a/chrome/test/chromedriver/net/websocket.h +++ b/chrome/test/chromedriver/net/websocket.h @@ -8,30 +8,27 @@ #include <string> #include "base/basictypes.h" -#include "base/callback.h" #include "base/compiler_specific.h" #include "base/memory/ref_counted.h" #include "base/threading/thread_checker.h" #include "googleurl/src/gurl.h" #include "net/base/completion_callback.h" -#include "net/base/net_errors.h" -#include "net/socket_stream/socket_stream.h" +#include "net/socket/tcp_client_socket.h" #include "net/websockets/websocket_frame_parser.h" namespace net { -class URLRequestContextGetter; -class WebSocketJob; -} // namespace net +class DrainableIOBuffer; +class IOBufferWithSize; +} class WebSocketListener; // A text-only, non-thread safe WebSocket. Must be created and used on a single // thread. Intended particularly for use with net::HttpServer. -class WebSocket : public net::SocketStream::Delegate { +class WebSocket { public: - WebSocket(net::URLRequestContextGetter* context_getter, - const GURL& url, - WebSocketListener* listener); + // |url| must be an IP v4/v6 literal, not a host name. + WebSocket(const GURL& url, WebSocketListener* listener); virtual ~WebSocket(); // Initializes the WebSocket connection. Invokes the given callback with @@ -41,29 +38,45 @@ class WebSocket : public net::SocketStream::Delegate { // Sends the given message and returns true on success. bool Send(const std::string& message); - // Overridden from net::SocketStream::Delegate: - virtual void OnConnected(net::SocketStream* socket, - int max_pending_send_allowed) OVERRIDE; - virtual void OnSentData(net::SocketStream* socket, - int amount_sent) OVERRIDE; - virtual void OnReceivedData(net::SocketStream* socket, - const char* data, - int len) OVERRIDE; - virtual void OnClose(net::SocketStream* socket) OVERRIDE; - private: - void OnConnectFinished(net::Error error); + enum State { + INITIALIZED, + CONNECTING, + OPEN, + CLOSED + }; + + void OnSocketConnect(int code); + + void Write(const std::string& data); + void OnWrite(int code); + void ContinueWritingIfNecessary(); + + void Read(); + void OnRead(int code); + void OnReadDuringHandshake(const char* data, int len); + void OnReadDuringOpen(const char* data, int len); + + void InvokeConnectCallback(int code); + void Close(int code); base::ThreadChecker thread_checker_; - scoped_refptr<net::URLRequestContextGetter> context_getter_; + GURL url_; - scoped_refptr<net::WebSocketJob> web_socket_; WebSocketListener* listener_; + State state_; + scoped_ptr<net::TCPClientSocket> socket_; + net::CompletionCallback connect_callback_; std::string sec_key_; + std::string handshake_response_; + + scoped_refptr<net::DrainableIOBuffer> write_buffer_; + std::string pending_write_; + + scoped_refptr<net::IOBufferWithSize> read_buffer_; net::WebSocketFrameParser parser_; std::string next_message_; - bool connected_; DISALLOW_COPY_AND_ASSIGN(WebSocket); }; diff --git a/chrome/test/chromedriver/net/websocket_unittest.cc b/chrome/test/chromedriver/net/websocket_unittest.cc index d51bbef..26df2a4 100644 --- a/chrome/test/chromedriver/net/websocket_unittest.cc +++ b/chrome/test/chromedriver/net/websocket_unittest.cc @@ -84,9 +84,7 @@ class CloseListener : public WebSocketListener { class WebSocketTest : public testing::Test { public: - WebSocketTest() - : context_getter_( - new net::TestURLRequestContextGetter(loop_.message_loop_proxy())) {} + WebSocketTest() {} virtual ~WebSocketTest() {} virtual void SetUp() OVERRIDE { @@ -101,8 +99,7 @@ class WebSocketTest : public testing::Test { scoped_ptr<WebSocket> CreateWebSocket(const GURL& url, WebSocketListener* listener) { int error; - scoped_ptr<WebSocket> sock(new WebSocket( - context_getter_, url, listener)); + scoped_ptr<WebSocket> sock(new WebSocket(url, listener)); base::RunLoop run_loop; sock->Connect(base::Bind(&OnConnectFinished, &run_loop, &error)); loop_.PostDelayedTask( @@ -134,14 +131,13 @@ class WebSocketTest : public testing::Test { MessageLoopForIO loop_; TestHttpServer server_; - scoped_refptr<net::URLRequestContextGetter> context_getter_; }; } // namespace TEST_F(WebSocketTest, CreateDestroy) { CloseListener listener(NULL); - WebSocket sock(context_getter_, GURL("http://ok"), &listener); + WebSocket sock(GURL("ws://127.0.0.1:2222"), &listener); } TEST_F(WebSocketTest, Connect) { |