From 533f527697b337b7f0300c466bc071d858e3b865 Mon Sep 17 00:00:00 2001 From: yzshen Date: Wed, 20 May 2015 21:35:46 -0700 Subject: Mojo service implementation for HTTP server - part 3 This CL adds WebSocket support and correspnoding tests. BUG=478249 TEST=Newly added tests. Review URL: https://codereview.chromium.org/1144843002 Cr-Commit-Position: refs/heads/master@{#330884} --- mojo/services/network/http_connection_impl.cc | 228 +++++++++++++++++++-- mojo/services/network/http_connection_impl.h | 23 ++- mojo/services/network/http_server_apptest.cc | 222 +++++++++++++++++++- .../public/interfaces/http_connection.mojom | 7 + 4 files changed, 447 insertions(+), 33 deletions(-) (limited to 'mojo/services') diff --git a/mojo/services/network/http_connection_impl.cc b/mojo/services/network/http_connection_impl.cc index 0831d1d..77d7a91 100644 --- a/mojo/services/network/http_connection_impl.cc +++ b/mojo/services/network/http_connection_impl.cc @@ -14,6 +14,9 @@ #include "mojo/common/handle_watcher.h" #include "mojo/services/network/http_server_impl.h" #include "mojo/services/network/net_adapters.h" +#include "mojo/services/network/public/cpp/web_socket_read_queue.h" +#include "mojo/services/network/public/cpp/web_socket_write_queue.h" +#include "mojo/services/network/public/interfaces/web_socket.mojom.h" #include "net/base/net_errors.h" #include "net/http/http_request_headers.h" #include "net/http/http_status_code.h" @@ -82,6 +85,143 @@ class HttpConnectionImpl::SimpleDataPipeReader { DISALLOW_COPY_AND_ASSIGN(SimpleDataPipeReader); }; +class HttpConnectionImpl::WebSocketImpl : public WebSocket, + public ErrorHandler { + public: + // |connection| must outlive this object. + WebSocketImpl(HttpConnectionImpl* connection, + InterfaceRequest request, + ScopedDataPipeConsumerHandle send_stream, + WebSocketClientPtr client) + : connection_(connection), + binding_(this, request.Pass()), + client_(client.Pass()), + send_stream_(send_stream.Pass()), + read_send_stream_(new WebSocketReadQueue(send_stream_.get())), + pending_send_count_(0) { + DCHECK(binding_.is_bound()); + DCHECK(client_); + DCHECK(send_stream_.is_valid()); + + binding_.set_error_handler(this); + client_.set_error_handler(this); + + DataPipe data_pipe; + receive_stream_ = data_pipe.producer_handle.Pass(); + write_receive_stream_.reset(new WebSocketWriteQueue(receive_stream_.get())); + + client_->DidConnect("", "", data_pipe.consumer_handle.Pass()); + } + + ~WebSocketImpl() override {} + + void Close() { + DCHECK(!IsClosing()); + + binding_.Close(); + client_.reset(); + + NotifyOwnerCloseIfAllDone(); + } + + void OnReceivedWebSocketMessage(const std::string& data) { + if (IsClosing()) + return; + + // TODO(yzshen): It shouldn't be an issue to pass an empty message. However, + // WebSocket{Read,Write}Queue doesn't handle that correctly. + if (data.empty()) + return; + + uint32_t size = static_cast(data.size()); + write_receive_stream_->Write( + &data[0], size, + base::Bind(&WebSocketImpl::OnFinishedWritingReceiveStream, + base::Unretained(this), size)); + } + + private: + // WebSocket implementation. + void Connect(const String& url, + Array protocols, + const String& origin, + ScopedDataPipeConsumerHandle send_stream, + WebSocketClientPtr client) override { + NOTREACHED(); + } + + void Send(bool fin, MessageType type, uint32_t num_bytes) override { + if (!fin || type != MESSAGE_TYPE_TEXT) { + NOTIMPLEMENTED(); + Close(); + } + + // TODO(yzshen): It shouldn't be an issue to pass an empty message. However, + // WebSocket{Read,Write}Queue doesn't handle that correctly. + if (num_bytes == 0) + return; + + pending_send_count_++; + read_send_stream_->Read( + num_bytes, base::Bind(&WebSocketImpl::OnFinishedReadingSendStream, + base::Unretained(this), num_bytes)); + } + + void FlowControl(int64_t quota) override { NOTIMPLEMENTED(); } + + void Close(uint16_t code, const String& reason) override { + Close(); + } + + // ErrorHandler implementation. + void OnConnectionError() override { Close(); } + + void OnFinishedReadingSendStream(uint32_t num_bytes, const char* data) { + DCHECK_GT(pending_send_count_, 0u); + pending_send_count_--; + + if (data) { + connection_->server_->server()->SendOverWebSocket( + connection_->connection_id_, std::string(data, num_bytes)); + } + + if (IsClosing()) + NotifyOwnerCloseIfAllDone(); + } + + void OnFinishedWritingReceiveStream(uint32_t num_bytes, const char* buffer) { + if (IsClosing()) + return; + + if (buffer) + client_->DidReceiveData(true, MESSAGE_TYPE_TEXT, num_bytes); + } + + // Checks whether Close() has been called. + bool IsClosing() const { return !binding_.is_bound(); } + + void NotifyOwnerCloseIfAllDone() { + DCHECK(IsClosing()); + + if (pending_send_count_ == 0) + connection_->OnWebSocketClosed(); + } + + HttpConnectionImpl* const connection_; + + Binding binding_; + WebSocketClientPtr client_; + + ScopedDataPipeConsumerHandle send_stream_; + scoped_ptr read_send_stream_; + size_t pending_send_count_; + + ScopedDataPipeProducerHandle receive_stream_; + scoped_ptr write_receive_stream_; + + DISALLOW_COPY_AND_ASSIGN(WebSocketImpl); +}; + template <> struct TypeConverter { static HttpRequestPtr Convert(const net::HttpServerRequestInfo& obj) { @@ -115,11 +255,11 @@ struct TypeConverter { }; HttpConnectionImpl::HttpConnectionImpl(int connection_id, - HttpServerImpl* owner, + HttpServerImpl* server, HttpConnectionDelegatePtr delegate, HttpConnectionPtr* connection) : connection_id_(connection_id), - owner_(owner), + server_(server), delegate_(delegate.Pass()), binding_(this, connection) { DCHECK(delegate_); @@ -133,7 +273,7 @@ HttpConnectionImpl::~HttpConnectionImpl() { void HttpConnectionImpl::OnReceivedHttpRequest( const net::HttpServerRequestInfo& info) { - if (EncounteredConnectionError()) + if (IsClosing()) return; delegate_->OnReceivedRequest( @@ -154,11 +294,32 @@ void HttpConnectionImpl::OnReceivedHttpRequest( void HttpConnectionImpl::OnReceivedWebSocketRequest( const net::HttpServerRequestInfo& info) { - // TODO(yzshen): implement it. + if (IsClosing()) + return; + + delegate_->OnReceivedWebSocketRequest( + HttpRequest::From(info), + [this, info](InterfaceRequest web_socket, + ScopedDataPipeConsumerHandle send_stream, + WebSocketClientPtr web_socket_client) { + if (!web_socket.is_pending() || !send_stream.is_valid() || + !web_socket_client) { + Close(); + return; + } + + web_socket_.reset(new WebSocketImpl(this, web_socket.Pass(), + send_stream.Pass(), + web_socket_client.Pass())); + server_->server()->AcceptWebSocket(connection_id_, info); + }); } void HttpConnectionImpl::OnReceivedWebSocketMessage(const std::string& data) { - // TODO(yzshen): implement it. + if (IsClosing()) + return; + + web_socket_->OnReceivedWebSocketMessage(data); } void HttpConnectionImpl::SetSendBufferSize( @@ -167,8 +328,8 @@ void HttpConnectionImpl::SetSendBufferSize( if (size > static_cast(std::numeric_limits::max())) size = std::numeric_limits::max(); - owner_->server()->SetSendBufferSize( - connection_id_, static_cast(size)); + server_->server()->SetSendBufferSize(connection_id_, + static_cast(size)); callback.Run(MakeNetworkError(net::OK)); } @@ -178,8 +339,8 @@ void HttpConnectionImpl::SetReceiveBufferSize( if (size > static_cast(std::numeric_limits::max())) size = std::numeric_limits::max(); - owner_->server()->SetReceiveBufferSize( - connection_id_, static_cast(size)); + server_->server()->SetReceiveBufferSize(connection_id_, + static_cast(size)); callback.Run(MakeNetworkError(net::OK)); } @@ -188,14 +349,7 @@ void HttpConnectionImpl::OnConnectionError() { // |delegate_| has closed the pipe. Although it is set as error handler for // both |binding_| and |delegate_|, it will only be called at most once // because when called it closes/resets |binding_| and |delegate_|. - DCHECK(!EncounteredConnectionError()); - - binding_.Close(); - delegate_.reset(); - - // Don't close the connection until all pending responses are sent. - if (response_body_readers_.empty()) - owner_->server()->Close(connection_id_); + Close(); } void HttpConnectionImpl::OnFinishedReadingResponseBody( @@ -235,10 +389,44 @@ void HttpConnectionImpl::OnFinishedReadingResponseBody( if (body) info.SetBody(*body, content_type); - owner_->server()->SendResponse(connection_id_, info); + server_->server()->SendResponse(connection_id_, info); + + if (IsClosing()) + NotifyOwnerCloseIfAllDone(); +} + +void HttpConnectionImpl::Close() { + DCHECK(!IsClosing()); + + binding_.Close(); + delegate_.reset(); + + if (web_socket_) + web_socket_->Close(); + + NotifyOwnerCloseIfAllDone(); +} + +void HttpConnectionImpl::NotifyOwnerCloseIfAllDone() { + DCHECK(IsClosing()); - if (response_body_readers_.empty() && EncounteredConnectionError()) - owner_->server()->Close(connection_id_); + // Don't close the connection until all pending sends are done. + bool should_wait = !response_body_readers_.empty() || web_socket_; + if (!should_wait) + server_->server()->Close(connection_id_); +} + +void HttpConnectionImpl::OnWebSocketClosed() { + web_socket_.reset(); + + if (IsClosing()) { + // The close operation is initiated by this object. + NotifyOwnerCloseIfAllDone(); + } else { + // The close operation is initiated by |web_socket_|; start closing this + // object. + Close(); + } } } // namespace mojo diff --git a/mojo/services/network/http_connection_impl.h b/mojo/services/network/http_connection_impl.h index 7f952310..79fc72d 100644 --- a/mojo/services/network/http_connection_impl.h +++ b/mojo/services/network/http_connection_impl.h @@ -26,9 +26,9 @@ class HttpServerImpl; class HttpConnectionImpl : public HttpConnection, public ErrorHandler { public: - // |owner| must outlive this object. + // |server| must outlive this object. HttpConnectionImpl(int connection_id, - HttpServerImpl* owner, + HttpServerImpl* server, HttpConnectionDelegatePtr delegate, HttpConnectionPtr* connection); @@ -40,6 +40,7 @@ class HttpConnectionImpl : public HttpConnection, private: class SimpleDataPipeReader; + class WebSocketImpl; // HttpConnection implementation. void SetSendBufferSize(uint32_t size, @@ -55,17 +56,27 @@ class HttpConnectionImpl : public HttpConnection, SimpleDataPipeReader* reader, scoped_ptr body); - bool EncounteredConnectionError() const { - return !binding_.is_bound() || !delegate_; - } + void Close(); + + // Checks whether Close() has been called. + bool IsClosing() const { return !binding_.is_bound(); } + + // Checks whether all wrap-up work has been done during the closing process. + // If yes, notifies the owner, which may result in the destruction of this + // object. + void NotifyOwnerCloseIfAllDone(); + + void OnWebSocketClosed(); const int connection_id_; - HttpServerImpl* const owner_; + HttpServerImpl* const server_; HttpConnectionDelegatePtr delegate_; Binding binding_; // Owns its elements. std::set response_body_readers_; + scoped_ptr web_socket_; + DISALLOW_COPY_AND_ASSIGN(HttpConnectionImpl); }; diff --git a/mojo/services/network/http_server_apptest.cc b/mojo/services/network/http_server_apptest.cc index 80677e5..9652c05a 100644 --- a/mojo/services/network/http_server_apptest.cc +++ b/mojo/services/network/http_server_apptest.cc @@ -2,19 +2,27 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. +#include "base/logging.h" #include "base/macros.h" #include "base/memory/linked_ptr.h" #include "base/memory/ref_counted.h" #include "base/memory/scoped_ptr.h" #include "base/run_loop.h" #include "base/strings/string_util.h" +#include "base/strings/stringprintf.h" #include "mojo/application/public/cpp/application_connection.h" #include "mojo/application/public/cpp/application_impl.h" #include "mojo/application/public/cpp/application_test_base.h" #include "mojo/common/data_pipe_utils.h" #include "mojo/services/network/net_address_type_converters.h" +#include "mojo/services/network/public/cpp/web_socket_read_queue.h" +#include "mojo/services/network/public/cpp/web_socket_write_queue.h" +#include "mojo/services/network/public/interfaces/http_connection.mojom.h" +#include "mojo/services/network/public/interfaces/http_message.mojom.h" #include "mojo/services/network/public/interfaces/http_server.mojom.h" +#include "mojo/services/network/public/interfaces/net_address.mojom.h" #include "mojo/services/network/public/interfaces/network_service.mojom.h" +#include "mojo/services/network/public/interfaces/web_socket.mojom.h" #include "net/base/io_buffer.h" #include "net/base/net_errors.h" #include "net/base/test_completion_callback.h" @@ -261,6 +269,141 @@ class TestHttpClient { DISALLOW_COPY_AND_ASSIGN(TestHttpClient); }; +class WebSocketClientImpl : public WebSocketClient { + public: + explicit WebSocketClientImpl() + : binding_(this, &client_ptr_), + wait_for_message_count_(0), + run_loop_(nullptr) {} + ~WebSocketClientImpl() override {} + + // Establishes a connection from the client side. + void Connect(WebSocketPtr web_socket, const std::string& url) { + web_socket_ = web_socket.Pass(); + + DataPipe data_pipe; + send_stream_ = data_pipe.producer_handle.Pass(); + write_send_stream_.reset(new WebSocketWriteQueue(send_stream_.get())); + + web_socket_->Connect(url, Array(0), "http://example.com", + data_pipe.consumer_handle.Pass(), client_ptr_.Pass()); + } + + // Establishes a connection from the server side. + void AcceptConnectRequest( + const HttpConnectionDelegate::OnReceivedWebSocketRequestCallback& + callback) { + InterfaceRequest web_socket_request = GetProxy(&web_socket_); + + DataPipe data_pipe; + send_stream_ = data_pipe.producer_handle.Pass(); + write_send_stream_.reset(new WebSocketWriteQueue(send_stream_.get())); + + callback.Run(web_socket_request.Pass(), data_pipe.consumer_handle.Pass(), + client_ptr_.Pass()); + } + + void WaitForConnectCompletion() { + DCHECK(!run_loop_); + + if (receive_stream_.is_valid()) + return; + + base::RunLoop run_loop; + run_loop_ = &run_loop; + run_loop.Run(); + run_loop_ = nullptr; + } + + void Send(const std::string& message) { + DCHECK(!message.empty()); + + uint32_t size = static_cast(message.size()); + write_send_stream_->Write( + &message[0], size, + base::Bind(&WebSocketClientImpl::OnFinishedWritingSendStream, + base::Unretained(this), size)); + } + + void WaitForMessage(size_t count) { + DCHECK(!run_loop_); + + if (received_messages_.size() >= count) + return; + wait_for_message_count_ = count; + base::RunLoop run_loop; + run_loop_ = &run_loop; + run_loop.Run(); + run_loop_ = nullptr; + } + + std::vector& received_messages() { return received_messages_; } + + private: + // WebSocketClient implementation. + void DidConnect(const String& selected_subprotocol, + const String& extensions, + ScopedDataPipeConsumerHandle receive_stream) override { + receive_stream_ = receive_stream.Pass(); + read_receive_stream_.reset(new WebSocketReadQueue(receive_stream_.get())); + + web_socket_->FlowControl(2048); + if (run_loop_) + run_loop_->Quit(); + } + + void DidReceiveData(bool fin, + WebSocket::MessageType type, + uint32_t num_bytes) override { + DCHECK(num_bytes > 0); + + read_receive_stream_->Read( + num_bytes, + base::Bind(&WebSocketClientImpl::OnFinishedReadingReceiveStream, + base::Unretained(this), num_bytes)); + } + + void DidReceiveFlowControl(int64_t quota) override {} + + void DidFail(const String& message) override {} + + void DidClose(bool was_clean, uint16_t code, const String& reason) override {} + + void OnFinishedWritingSendStream(uint32_t num_bytes, const char* buffer) { + EXPECT_TRUE(buffer); + + web_socket_->Send(true, WebSocket::MESSAGE_TYPE_TEXT, num_bytes); + } + + void OnFinishedReadingReceiveStream(uint32_t num_bytes, const char* data) { + EXPECT_TRUE(data); + + received_messages_.push_back(std::string(data, num_bytes)); + if (run_loop_ && received_messages_.size() >= wait_for_message_count_) { + wait_for_message_count_ = 0; + run_loop_->Quit(); + } + } + + WebSocketClientPtr client_ptr_; + Binding binding_; + WebSocketPtr web_socket_; + + ScopedDataPipeProducerHandle send_stream_; + scoped_ptr write_send_stream_; + + ScopedDataPipeConsumerHandle receive_stream_; + scoped_ptr read_receive_stream_; + + std::vector received_messages_; + size_t wait_for_message_count_; + + // Pointing to a stack-allocated RunLoop instance. + base::RunLoop* run_loop_; + + DISALLOW_COPY_AND_ASSIGN(WebSocketClientImpl); +}; + class HttpConnectionDelegateImpl : public HttpConnectionDelegate { public: struct PendingRequest { @@ -272,8 +415,8 @@ class HttpConnectionDelegateImpl : public HttpConnectionDelegate { InterfaceRequest request) : connection_(connection.Pass()), binding_(this, request.Pass()), - run_loop_(nullptr), - wait_for_request_count_(0) {} + wait_for_request_count_(0), + run_loop_(nullptr) {} ~HttpConnectionDelegateImpl() override {} // HttpConnectionDelegate implementation: @@ -292,7 +435,12 @@ class HttpConnectionDelegateImpl : public HttpConnectionDelegate { void OnReceivedWebSocketRequest( HttpRequestPtr request, const OnReceivedWebSocketRequestCallback& callback) override { - NOTREACHED(); + web_socket_.reset(new WebSocketClientImpl()); + + web_socket_->AcceptConnectRequest(callback); + + if (run_loop_) + run_loop_->Quit(); } void SendResponse(HttpResponsePtr response) { @@ -305,6 +453,9 @@ class HttpConnectionDelegateImpl : public HttpConnectionDelegate { void WaitForRequest(size_t count) { DCHECK(!run_loop_); + if (pending_requests_.size() >= count) + return; + wait_for_request_count_ = count; base::RunLoop run_loop; run_loop_ = &run_loop; @@ -312,17 +463,33 @@ class HttpConnectionDelegateImpl : public HttpConnectionDelegate { run_loop_ = nullptr; } + void WaitForWebSocketRequest() { + DCHECK(!run_loop_); + + if (web_socket_) + return; + + base::RunLoop run_loop; + run_loop_ = &run_loop; + run_loop.Run(); + run_loop_ = nullptr; + } + std::vector>& pending_requests() { return pending_requests_; } + WebSocketClientImpl* web_socket() { return web_socket_.get(); } + private: HttpConnectionPtr connection_; Binding binding_; std::vector> pending_requests_; + size_t wait_for_request_count_; + scoped_ptr web_socket_; + // Pointing to a stack-allocated RunLoop instance. base::RunLoop* run_loop_; - size_t wait_for_request_count_; DISALLOW_COPY_AND_ASSIGN(HttpConnectionDelegateImpl); }; @@ -331,8 +498,8 @@ class HttpServerDelegateImpl : public HttpServerDelegate { public: explicit HttpServerDelegateImpl(HttpServerDelegatePtr* delegate_ptr) : binding_(this, delegate_ptr), - run_loop_(nullptr), - wait_for_connection_count_(0) {} + wait_for_connection_count_(0), + run_loop_(nullptr) {} ~HttpServerDelegateImpl() override {} // HttpServerDelegate implementation. @@ -349,6 +516,9 @@ class HttpServerDelegateImpl : public HttpServerDelegate { void WaitForConnection(size_t count) { DCHECK(!run_loop_); + if (connections_.size() >= count) + return; + wait_for_connection_count_ = count; base::RunLoop run_loop; run_loop_ = &run_loop; @@ -363,9 +533,9 @@ class HttpServerDelegateImpl : public HttpServerDelegate { private: Binding binding_; std::vector> connections_; + size_t wait_for_connection_count_; // Pointing to a stack-allocated RunLoop instance. base::RunLoop* run_loop_; - size_t wait_for_connection_count_; DISALLOW_COPY_AND_ASSIGN(HttpServerDelegateImpl); }; @@ -480,4 +650,42 @@ TEST_F(HttpServerAppTest, HttpRequestResponseWithBody) { CheckResponse(response_data, response_message); } +TEST_F(HttpServerAppTest, WebSocket) { + NetAddressPtr bound_to; + HttpServerDelegatePtr server_delegate_ptr; + HttpServerDelegateImpl server_delegate_impl(&server_delegate_ptr); + CreateHttpServer(server_delegate_ptr.Pass(), &bound_to); + + WebSocketPtr web_socket_ptr; + network_service_->CreateWebSocket(GetProxy(&web_socket_ptr)); + WebSocketClientImpl socket_0; + socket_0.Connect( + web_socket_ptr.Pass(), + base::StringPrintf("ws://127.0.0.1:%d/hello", bound_to->ipv4->port)); + + server_delegate_impl.WaitForConnection(1); + HttpConnectionDelegateImpl& connection = + *server_delegate_impl.connections()[0]; + + connection.WaitForWebSocketRequest(); + WebSocketClientImpl& socket_1 = *connection.web_socket(); + + socket_1.WaitForConnectCompletion(); + socket_0.WaitForConnectCompletion(); + + socket_0.Send("Hello"); + socket_0.Send("world!"); + + socket_1.WaitForMessage(2); + EXPECT_EQ("Hello", socket_1.received_messages()[0]); + EXPECT_EQ("world!", socket_1.received_messages()[1]); + + socket_1.Send("How do"); + socket_1.Send("you do?"); + + socket_0.WaitForMessage(2); + EXPECT_EQ("How do", socket_0.received_messages()[0]); + EXPECT_EQ("you do?", socket_0.received_messages()[1]); +} + } // namespace mojo diff --git a/mojo/services/network/public/interfaces/http_connection.mojom b/mojo/services/network/public/interfaces/http_connection.mojom index 1733ed2..2cb79bd 100644 --- a/mojo/services/network/public/interfaces/http_connection.mojom +++ b/mojo/services/network/public/interfaces/http_connection.mojom @@ -28,6 +28,13 @@ interface HttpConnectionDelegate { // WebSocket should be written to the producer end of the |send_stream|. // |web_socket| will be already connected. There is no need to call Connect() // on it. But |client| will still receive a DidConnect() notification. + // + // NOTE: WebSocket server support is not fully implemented. For now the + // following are not supported: + // - negotiating subprotocol or extension; + // - fragmented or non-text messages; + // - failure or close notification; + // - flow control. OnReceivedWebSocketRequest(HttpRequest request) => (WebSocket&? web_socket, handle? send_stream, -- cgit v1.1