summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorkkania@chromium.org <kkania@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2013-04-06 16:56:01 +0000
committerkkania@chromium.org <kkania@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2013-04-06 16:56:01 +0000
commit09b0a1b94a3fff60fe9c4f9498a40ede14839d9a (patch)
tree2d487be208cdc96b19bf6cebd51b203fffaebd32
parenta9f53039f7a049d41417b128c2476b162c839ac7 (diff)
downloadchromium_src-09b0a1b94a3fff60fe9c4f9498a40ede14839d9a.zip
chromium_src-09b0a1b94a3fff60fe9c4f9498a40ede14839d9a.tar.gz
chromium_src-09b0a1b94a3fff60fe9c4f9498a40ede14839d9a.tar.bz2
[chromedriver] Use net::StreamSocket directly instead of net::WebSocketJob.
This lessens our dependence on net but also allows us to send large messages, which was broken from r192080. BUG=none Review URL: https://codereview.chromium.org/13646018 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@192760 0039d316-1c4b-4281-b951-d872f2087c98
-rw-r--r--chrome/test/chromedriver/net/sync_websocket_impl.cc2
-rw-r--r--chrome/test/chromedriver/net/websocket.cc229
-rw-r--r--chrome/test/chromedriver/net/websocket.h61
-rw-r--r--chrome/test/chromedriver/net/websocket_unittest.cc10
4 files changed, 195 insertions, 107 deletions
diff --git a/chrome/test/chromedriver/net/sync_websocket_impl.cc b/chrome/test/chromedriver/net/sync_websocket_impl.cc
index bbf1d5c..418b21f 100644
--- a/chrome/test/chromedriver/net/sync_websocket_impl.cc
+++ b/chrome/test/chromedriver/net/sync_websocket_impl.cc
@@ -108,7 +108,7 @@ void SyncWebSocketImpl::Core::ConnectOnIO(
base::AutoLock lock(lock_);
received_queue_.clear();
}
- socket_.reset(new WebSocket(context_getter_, url, this));
+ socket_.reset(new WebSocket(url, this));
socket_->Connect(base::Bind(
&SyncWebSocketImpl::Core::OnConnectCompletedOnIO,
this, success, event));
diff --git a/chrome/test/chromedriver/net/websocket.cc b/chrome/test/chromedriver/net/websocket.cc
index 166b832..c8fe845 100644
--- a/chrome/test/chromedriver/net/websocket.cc
+++ b/chrome/test/chromedriver/net/websocket.cc
@@ -5,52 +5,56 @@
#include "chrome/test/chromedriver/net/websocket.h"
#include "base/base64.h"
+#include "base/bind.h"
+#include "base/bind_helpers.h"
#include "base/memory/scoped_vector.h"
#include "base/rand_util.h"
#include "base/sha1.h"
#include "base/stringprintf.h"
-#include "base/strings/string_split.h"
+#include "base/strings/string_number_conversions.h"
+#include "net/base/address_list.h"
#include "net/base/io_buffer.h"
+#include "net/base/ip_endpoint.h"
+#include "net/base/net_errors.h"
+#include "net/base/net_util.h"
#include "net/http/http_response_headers.h"
#include "net/http/http_util.h"
-#include "net/url_request/url_request_context_getter.h"
#include "net/websockets/websocket_frame.h"
-#include "net/websockets/websocket_job.h"
-
-WebSocket::WebSocket(
- net::URLRequestContextGetter* context_getter,
- const GURL& url,
- WebSocketListener* listener)
- : context_getter_(context_getter),
- url_(url),
+
+WebSocket::WebSocket(const GURL& url, WebSocketListener* listener)
+ : url_(url),
listener_(listener),
- connected_(false) {
- net::WebSocketJob::EnsureInit();
- web_socket_ = new net::WebSocketJob(this);
+ state_(INITIALIZED),
+ write_buffer_(new net::DrainableIOBuffer(new net::IOBuffer(0), 0)),
+ read_buffer_(new net::IOBufferWithSize(4096)) {
+ net::IPAddressNumber address;
+ CHECK(net::ParseIPLiteralToNumber(url_.HostNoBrackets(), &address));
+ int port = 80;
+ base::StringToInt(url_.port(), &port);
+ net::AddressList addresses(net::IPEndPoint(address, port));
+ net::NetLog::Source source;
+ socket_.reset(new net::TCPClientSocket(addresses, NULL, source));
}
WebSocket::~WebSocket() {
CHECK(thread_checker_.CalledOnValidThread());
- web_socket_->Close();
- web_socket_->DetachDelegate();
}
void WebSocket::Connect(const net::CompletionCallback& callback) {
CHECK(thread_checker_.CalledOnValidThread());
- CHECK_EQ(net::WebSocketJob::INITIALIZED, web_socket_->state());
-
+ CHECK_EQ(INITIALIZED, state_);
+ state_ = CONNECTING;
connect_callback_ = callback;
-
- scoped_refptr<net::SocketStream> socket = new net::SocketStream(
- url_, web_socket_);
- socket->set_context(context_getter_->GetURLRequestContext());
-
- web_socket_->InitSocketStream(socket);
- web_socket_->Connect();
+ int code = socket_->Connect(base::Bind(
+ &WebSocket::OnSocketConnect, base::Unretained(this)));
+ if (code != net::ERR_IO_PENDING)
+ OnSocketConnect(code);
}
bool WebSocket::Send(const std::string& message) {
CHECK(thread_checker_.CalledOnValidThread());
+ if (state_ != OPEN)
+ return false;
net::WebSocketFrameHeader header;
header.final = true;
@@ -70,12 +74,16 @@ bool WebSocket::Send(const std::string& message) {
std::string masked_message = message;
net::MaskWebSocketFramePayload(
masking_key, 0, &masked_message[0], masked_message.length());
- std::string data = header_str + masked_message;
- return web_socket_->SendData(data.c_str(), data.length());
+ Write(header_str + masked_message);
+ return true;
}
-void WebSocket::OnConnected(net::SocketStream* socket,
- int max_pending_send_allowed) {
+void WebSocket::OnSocketConnect(int code) {
+ if (code != net::OK) {
+ Close(code);
+ return;
+ }
+
CHECK(base::Base64Encode(base::RandBytesAsString(16), &sec_key_));
std::string handshake = base::StringPrintf(
"GET %s HTTP/1.1\r\n"
@@ -90,61 +98,132 @@ void WebSocket::OnConnected(net::SocketStream* socket,
url_.path().c_str(),
url_.host().c_str(),
sec_key_.c_str());
- if (!web_socket_->SendData(handshake.c_str(), handshake.length()))
- OnConnectFinished(net::ERR_FAILED);
+ Write(handshake);
+ Read();
+}
+
+void WebSocket::Write(const std::string& data) {
+ pending_write_ += data;
+ if (!write_buffer_->BytesRemaining())
+ ContinueWritingIfNecessary();
+}
+
+void WebSocket::OnWrite(int code) {
+ if (!socket_->IsConnected()) {
+ // Supposedly if |StreamSocket| is closed, the error code may be undefined.
+ Close(net::ERR_FAILED);
+ return;
+ }
+ if (code < 0) {
+ Close(code);
+ return;
+ }
+
+ write_buffer_->DidConsume(code);
+ ContinueWritingIfNecessary();
}
-void WebSocket::OnSentData(net::SocketStream* socket,
- int amount_sent) {}
-
-void WebSocket::OnReceivedData(net::SocketStream* socket,
- const char* data, int len) {
- net::WebSocketJob::State state = web_socket_->state();
- if (!connect_callback_.is_null()) {
- // WebSocketJob guarantees the first OnReceivedData call contains all
- // the response headers.
- const char kMagicKey[] = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
- std::string websocket_accept;
- CHECK(base::Base64Encode(base::SHA1HashString(sec_key_ + kMagicKey),
- &websocket_accept));
- scoped_refptr<net::HttpResponseHeaders> headers(
- new net::HttpResponseHeaders(
- net::HttpUtil::AssembleRawHeaders(data, len)));
- if (headers->response_code() != 101 ||
- !headers->HasHeaderValue("Upgrade", "WebSocket") ||
- !headers->HasHeaderValue("Connection", "Upgrade") ||
- !headers->HasHeaderValue("Sec-WebSocket-Accept", websocket_accept)) {
- OnConnectFinished(net::ERR_FAILED);
+void WebSocket::ContinueWritingIfNecessary() {
+ if (!write_buffer_->BytesRemaining()) {
+ if (pending_write_.empty())
return;
- }
- OnConnectFinished(
- state == net::WebSocketJob::OPEN ? net::OK : net::ERR_FAILED);
- } else if (connected_) {
- ScopedVector<net::WebSocketFrameChunk> frame_chunks;
- CHECK(parser_.Decode(data, len, &frame_chunks));
- for (size_t i = 0; i < frame_chunks.size(); ++i) {
- scoped_refptr<net::IOBufferWithSize> buffer = frame_chunks[i]->data;
- if (buffer)
- next_message_ += std::string(buffer->data(), buffer->size());
- if (frame_chunks[i]->final_chunk) {
- listener_->OnMessageReceived(next_message_);
- next_message_.clear();
- }
- }
+ write_buffer_ = new net::DrainableIOBuffer(
+ new net::StringIOBuffer(pending_write_),
+ pending_write_.length());
+ pending_write_.clear();
}
+ int code = socket_->Write(
+ write_buffer_,
+ write_buffer_->BytesRemaining(),
+ base::Bind(&WebSocket::OnWrite, base::Unretained(this)));
+ if (code != net::ERR_IO_PENDING)
+ OnWrite(code);
}
-void WebSocket::OnClose(net::SocketStream* socket) {
- if (!connect_callback_.is_null())
- OnConnectFinished(net::ERR_CONNECTION_CLOSED);
- else
- listener_->OnClose();
+void WebSocket::Read() {
+ int code = socket_->Read(
+ read_buffer_,
+ read_buffer_->size(),
+ base::Bind(&WebSocket::OnRead, base::Unretained(this)));
+ if (code != net::ERR_IO_PENDING)
+ OnRead(code);
+}
+
+void WebSocket::OnRead(int code) {
+ if (code <= 0) {
+ Close(code ? code : net::ERR_FAILED);
+ return;
+ }
+
+ if (state_ == CONNECTING)
+ OnReadDuringHandshake(read_buffer_->data(), code);
+ else if (state_ == OPEN)
+ OnReadDuringOpen(read_buffer_->data(), code);
+
+ if (state_ != CLOSED)
+ Read();
+}
+
+void WebSocket::OnReadDuringHandshake(const char* data, int len) {
+ handshake_response_ += std::string(data, len);
+ int headers_end = net::HttpUtil::LocateEndOfHeaders(
+ handshake_response_.data(), handshake_response_.size(), 0);
+ if (headers_end == -1)
+ return;
+
+ const char kMagicKey[] = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
+ std::string websocket_accept;
+ CHECK(base::Base64Encode(base::SHA1HashString(sec_key_ + kMagicKey),
+ &websocket_accept));
+ scoped_refptr<net::HttpResponseHeaders> headers(
+ new net::HttpResponseHeaders(
+ net::HttpUtil::AssembleRawHeaders(
+ handshake_response_.data(), headers_end)));
+ if (headers->response_code() != 101 ||
+ !headers->HasHeaderValue("Upgrade", "WebSocket") ||
+ !headers->HasHeaderValue("Connection", "Upgrade") ||
+ !headers->HasHeaderValue("Sec-WebSocket-Accept", websocket_accept)) {
+ Close(net::ERR_FAILED);
+ return;
+ }
+ std::string leftover_message = handshake_response_.substr(headers_end);
+ handshake_response_.clear();
+ sec_key_.clear();
+ state_ = OPEN;
+ InvokeConnectCallback(net::OK);
+ if (!leftover_message.empty())
+ OnReadDuringOpen(leftover_message.c_str(), leftover_message.length());
+}
+
+void WebSocket::OnReadDuringOpen(const char* data, int len) {
+ ScopedVector<net::WebSocketFrameChunk> frame_chunks;
+ CHECK(parser_.Decode(data, len, &frame_chunks));
+ for (size_t i = 0; i < frame_chunks.size(); ++i) {
+ scoped_refptr<net::IOBufferWithSize> buffer = frame_chunks[i]->data;
+ if (buffer)
+ next_message_ += std::string(buffer->data(), buffer->size());
+ if (frame_chunks[i]->final_chunk) {
+ listener_->OnMessageReceived(next_message_);
+ next_message_.clear();
+ }
+ }
}
-void WebSocket::OnConnectFinished(net::Error error) {
- if (error == net::OK)
- connected_ = true;
+void WebSocket::InvokeConnectCallback(int code) {
net::CompletionCallback temp = connect_callback_;
connect_callback_.Reset();
- temp.Run(error);
+ CHECK(!temp.is_null());
+ temp.Run(code);
}
+
+void WebSocket::Close(int code) {
+ socket_->Disconnect();
+ if (!connect_callback_.is_null())
+ InvokeConnectCallback(code);
+ if (state_ == OPEN)
+ listener_->OnClose();
+
+ state_ = CLOSED;
+}
+
+
diff --git a/chrome/test/chromedriver/net/websocket.h b/chrome/test/chromedriver/net/websocket.h
index f2481c9..9faccf2 100644
--- a/chrome/test/chromedriver/net/websocket.h
+++ b/chrome/test/chromedriver/net/websocket.h
@@ -8,30 +8,27 @@
#include <string>
#include "base/basictypes.h"
-#include "base/callback.h"
#include "base/compiler_specific.h"
#include "base/memory/ref_counted.h"
#include "base/threading/thread_checker.h"
#include "googleurl/src/gurl.h"
#include "net/base/completion_callback.h"
-#include "net/base/net_errors.h"
-#include "net/socket_stream/socket_stream.h"
+#include "net/socket/tcp_client_socket.h"
#include "net/websockets/websocket_frame_parser.h"
namespace net {
-class URLRequestContextGetter;
-class WebSocketJob;
-} // namespace net
+class DrainableIOBuffer;
+class IOBufferWithSize;
+}
class WebSocketListener;
// A text-only, non-thread safe WebSocket. Must be created and used on a single
// thread. Intended particularly for use with net::HttpServer.
-class WebSocket : public net::SocketStream::Delegate {
+class WebSocket {
public:
- WebSocket(net::URLRequestContextGetter* context_getter,
- const GURL& url,
- WebSocketListener* listener);
+ // |url| must be an IP v4/v6 literal, not a host name.
+ WebSocket(const GURL& url, WebSocketListener* listener);
virtual ~WebSocket();
// Initializes the WebSocket connection. Invokes the given callback with
@@ -41,29 +38,45 @@ class WebSocket : public net::SocketStream::Delegate {
// Sends the given message and returns true on success.
bool Send(const std::string& message);
- // Overridden from net::SocketStream::Delegate:
- virtual void OnConnected(net::SocketStream* socket,
- int max_pending_send_allowed) OVERRIDE;
- virtual void OnSentData(net::SocketStream* socket,
- int amount_sent) OVERRIDE;
- virtual void OnReceivedData(net::SocketStream* socket,
- const char* data,
- int len) OVERRIDE;
- virtual void OnClose(net::SocketStream* socket) OVERRIDE;
-
private:
- void OnConnectFinished(net::Error error);
+ enum State {
+ INITIALIZED,
+ CONNECTING,
+ OPEN,
+ CLOSED
+ };
+
+ void OnSocketConnect(int code);
+
+ void Write(const std::string& data);
+ void OnWrite(int code);
+ void ContinueWritingIfNecessary();
+
+ void Read();
+ void OnRead(int code);
+ void OnReadDuringHandshake(const char* data, int len);
+ void OnReadDuringOpen(const char* data, int len);
+
+ void InvokeConnectCallback(int code);
+ void Close(int code);
base::ThreadChecker thread_checker_;
- scoped_refptr<net::URLRequestContextGetter> context_getter_;
+
GURL url_;
- scoped_refptr<net::WebSocketJob> web_socket_;
WebSocketListener* listener_;
+ State state_;
+ scoped_ptr<net::TCPClientSocket> socket_;
+
net::CompletionCallback connect_callback_;
std::string sec_key_;
+ std::string handshake_response_;
+
+ scoped_refptr<net::DrainableIOBuffer> write_buffer_;
+ std::string pending_write_;
+
+ scoped_refptr<net::IOBufferWithSize> read_buffer_;
net::WebSocketFrameParser parser_;
std::string next_message_;
- bool connected_;
DISALLOW_COPY_AND_ASSIGN(WebSocket);
};
diff --git a/chrome/test/chromedriver/net/websocket_unittest.cc b/chrome/test/chromedriver/net/websocket_unittest.cc
index d51bbef..26df2a4 100644
--- a/chrome/test/chromedriver/net/websocket_unittest.cc
+++ b/chrome/test/chromedriver/net/websocket_unittest.cc
@@ -84,9 +84,7 @@ class CloseListener : public WebSocketListener {
class WebSocketTest : public testing::Test {
public:
- WebSocketTest()
- : context_getter_(
- new net::TestURLRequestContextGetter(loop_.message_loop_proxy())) {}
+ WebSocketTest() {}
virtual ~WebSocketTest() {}
virtual void SetUp() OVERRIDE {
@@ -101,8 +99,7 @@ class WebSocketTest : public testing::Test {
scoped_ptr<WebSocket> CreateWebSocket(const GURL& url,
WebSocketListener* listener) {
int error;
- scoped_ptr<WebSocket> sock(new WebSocket(
- context_getter_, url, listener));
+ scoped_ptr<WebSocket> sock(new WebSocket(url, listener));
base::RunLoop run_loop;
sock->Connect(base::Bind(&OnConnectFinished, &run_loop, &error));
loop_.PostDelayedTask(
@@ -134,14 +131,13 @@ class WebSocketTest : public testing::Test {
MessageLoopForIO loop_;
TestHttpServer server_;
- scoped_refptr<net::URLRequestContextGetter> context_getter_;
};
} // namespace
TEST_F(WebSocketTest, CreateDestroy) {
CloseListener listener(NULL);
- WebSocket sock(context_getter_, GURL("http://ok"), &listener);
+ WebSocket sock(GURL("ws://127.0.0.1:2222"), &listener);
}
TEST_F(WebSocketTest, Connect) {