diff options
author | toyoshim@chromium.org <toyoshim@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2011-06-27 11:50:29 +0000 |
---|---|---|
committer | toyoshim@chromium.org <toyoshim@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2011-06-27 11:50:29 +0000 |
commit | 675fa7d9f1c979b04a4606afcc5ad947a64440a7 (patch) | |
tree | c296c526db9a221f11e40f2a1af3c2868cca57be /net | |
parent | fe487862e3d76fa9d07bc9b9bd645b40e20a289c (diff) | |
download | chromium_src-675fa7d9f1c979b04a4606afcc5ad947a64440a7.zip chromium_src-675fa7d9f1c979b04a4606afcc5ad947a64440a7.tar.gz chromium_src-675fa7d9f1c979b04a4606afcc5ad947a64440a7.tar.bz2 |
WebSocket over SPDY: SpdyWebSocketStream
SpdyWebSocketStream implementation.
This class will be used by WebSocketJob to enable WebSocket over SPDY support.
BUG=42320
TEST=net_unittest --gtest_filter=SpdyWebSocket\*
Review URL: http://codereview.chromium.org/7089001
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@90555 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'net')
-rw-r--r-- | net/net.gyp | 3 | ||||
-rw-r--r-- | net/spdy/spdy_framer.h | 2 | ||||
-rw-r--r-- | net/spdy/spdy_stream.cc | 6 | ||||
-rw-r--r-- | net/spdy/spdy_stream.h | 1 | ||||
-rw-r--r-- | net/spdy/spdy_websocket_stream.cc | 145 | ||||
-rw-r--r-- | net/spdy/spdy_websocket_stream.h | 99 | ||||
-rw-r--r-- | net/spdy/spdy_websocket_stream_unittest.cc | 683 |
7 files changed, 938 insertions, 1 deletions
diff --git a/net/net.gyp b/net/net.gyp index d94f528..4f2226f 100644 --- a/net/net.gyp +++ b/net/net.gyp @@ -567,6 +567,8 @@ 'spdy/spdy_settings_storage.h', 'spdy/spdy_stream.cc', 'spdy/spdy_stream.h', + 'spdy/spdy_websocket_stream.cc', + 'spdy/spdy_websocket_stream.h', 'third_party/mozilla_security_manager/nsKeygenHandler.cpp', 'third_party/mozilla_security_manager/nsKeygenHandler.h', 'third_party/mozilla_security_manager/nsNSSCertificateDB.cpp', @@ -987,6 +989,7 @@ 'spdy/spdy_stream_unittest.cc', 'spdy/spdy_test_util.cc', 'spdy/spdy_test_util.h', + 'spdy/spdy_websocket_stream_unittest.cc', 'test/python_utils_unittest.cc', 'tools/dump_cache/url_to_filename_encoder.cc', 'tools/dump_cache/url_to_filename_encoder.h', diff --git a/net/spdy/spdy_framer.h b/net/spdy/spdy_framer.h index 62b48d4..6bfe3865 100644 --- a/net/spdy/spdy_framer.h +++ b/net/spdy/spdy_framer.h @@ -29,6 +29,7 @@ class SpdyNetworkTransactionTest; class SpdyProxyClientSocketTest; class SpdySessionTest; class SpdyStreamTest; +class SpdyWebSocketStreamTest; } namespace spdy { @@ -268,6 +269,7 @@ class NET_TEST SpdyFramer { friend class net::SpdyProxyClientSocketTest; friend class net::SpdySessionTest; friend class net::SpdyStreamTest; + friend class net::SpdyWebSocketStreamTest; friend class test::TestSpdyVisitor; friend void test::FramerSetEnableCompressionHelper(SpdyFramer* framer, bool compress); diff --git a/net/spdy/spdy_stream.cc b/net/spdy/spdy_stream.cc index 5808907..00e66e4 100644 --- a/net/spdy/spdy_stream.cc +++ b/net/spdy/spdy_stream.cc @@ -1,4 +1,4 @@ -// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Copyright (c) 2011 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. @@ -381,6 +381,10 @@ void SpdyStream::Cancel() { session_->ResetStream(stream_id_, spdy::CANCEL); } +void SpdyStream::Close() { + session_->CloseStream(stream_id_, net::OK); +} + int SpdyStream::SendRequest(bool has_upload_data) { if (delegate_) delegate_->set_chunk_callback(this); diff --git a/net/spdy/spdy_stream.h b/net/spdy/spdy_stream.h index f8b115e..8bbee32 100644 --- a/net/spdy/spdy_stream.h +++ b/net/spdy/spdy_stream.h @@ -199,6 +199,7 @@ class NET_TEST SpdyStream void OnClose(int status); void Cancel(); + void Close(); bool cancelled() const { return cancelled_; } bool closed() const { return io_state_ == STATE_DONE; } diff --git a/net/spdy/spdy_websocket_stream.cc b/net/spdy/spdy_websocket_stream.cc new file mode 100644 index 0000000..7ae35c5 --- /dev/null +++ b/net/spdy/spdy_websocket_stream.cc @@ -0,0 +1,145 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "net/spdy/spdy_websocket_stream.h" + +#include "googleurl/src/gurl.h" +#include "net/base/net_errors.h" +#include "net/base/io_buffer.h" +#include "net/spdy/spdy_framer.h" +#include "net/spdy/spdy_protocol.h" +#include "net/spdy/spdy_session.h" +#include "net/spdy/spdy_stream.h" + +namespace net { + +SpdyWebSocketStream::SpdyWebSocketStream( + SpdySession* spdy_session, Delegate* delegate) + : stream_(NULL), + spdy_session_(spdy_session), + delegate_(delegate), + ALLOW_THIS_IN_INITIALIZER_LIST(spdy_stream_created_callback_( + this, &SpdyWebSocketStream::OnSpdyStreamCreated)) { + DCHECK(spdy_session_); + DCHECK(delegate_); +} + +SpdyWebSocketStream::~SpdyWebSocketStream() { + if (stream_) { + // If Close() has not already been called, DetachDelegate() will send a + // SPDY RST_STREAM. Deleting SpdyWebSocketStream is good enough to initiate + // graceful shutdown, so we call Close() to avoid sending a RST_STREAM. + // For safe, we should eliminate |delegate_| for OnClose() calback. + delegate_ = NULL; + stream_->Close(); + } +} + +int SpdyWebSocketStream::InitializeStream(const GURL& url, + RequestPriority request_priority, + const BoundNetLog& net_log) { + if (spdy_session_->IsClosed()) + return ERR_SOCKET_NOT_CONNECTED; + + int result = spdy_session_->CreateStream( + url, request_priority, &stream_, net_log, + &spdy_stream_created_callback_); + + if (result == OK) { + DCHECK(stream_); + stream_->SetDelegate(this); + } + return result; +} + +int SpdyWebSocketStream::SendRequest( + const linked_ptr<spdy::SpdyHeaderBlock>& headers) { + if (!stream_) { + NOTREACHED(); + return ERR_UNEXPECTED; + } + stream_->set_spdy_headers(headers); + int result = stream_->SendRequest(true); + if (result < OK && result != ERR_IO_PENDING) + Close(); + return result; +} + +int SpdyWebSocketStream::SendData(const char* data, int length) { + if (!stream_) { + NOTREACHED(); + return ERR_UNEXPECTED; + } + IOBuffer* buf(new IOBuffer(length)); + memcpy(buf->data(), data, length); + return stream_->WriteStreamData(buf, length, spdy::DATA_FLAG_NONE); +} + +void SpdyWebSocketStream::Close() { + if (spdy_session_) + spdy_session_->CancelPendingCreateStreams(&stream_); + if (stream_) + stream_->Close(); +} + +bool SpdyWebSocketStream::OnSendHeadersComplete(int status) { + DCHECK(delegate_); + delegate_->OnSentSpdyHeaders(status); + return true; +} + +int SpdyWebSocketStream::OnSendBody() { + NOTREACHED(); + return ERR_UNEXPECTED; +} + +int SpdyWebSocketStream::OnSendBodyComplete(int status, bool* eof) { + NOTREACHED(); + *eof = true; + return ERR_UNEXPECTED; +} + +int SpdyWebSocketStream::OnResponseReceived( + const spdy::SpdyHeaderBlock& response, + base::Time response_time, int status) { + DCHECK(delegate_); + return delegate_->OnReceivedSpdyResponseHeader(response, status); +} + +void SpdyWebSocketStream::OnDataReceived(const char* data, int length) { + DCHECK(delegate_); + delegate_->OnReceivedSpdyData(data, length); +} + +void SpdyWebSocketStream::OnDataSent(int length) { + DCHECK(delegate_); + delegate_->OnSentSpdyData(length); +} + +void SpdyWebSocketStream::OnClose(int status) { + stream_ = NULL; + + // Destruction without Close() call OnClose() with delegate_ being NULL. + if (!delegate_) + return; + Delegate* delegate = delegate_; + delegate_ = NULL; + delegate->OnCloseSpdyStream(); +} + +void SpdyWebSocketStream::set_chunk_callback(ChunkCallback* callback) { + // Do nothing. SpdyWebSocketStream doesn't send any chunked data. +} + +void SpdyWebSocketStream::OnSpdyStreamCreated(int result) { + DCHECK_NE(ERR_IO_PENDING, result); + if (result == OK) { + DCHECK(stream_); + stream_->SetDelegate(this); + } + DCHECK(delegate_); + delegate_->OnCreatedSpdyStream(result); +} + +} // namespace net diff --git a/net/spdy/spdy_websocket_stream.h b/net/spdy/spdy_websocket_stream.h new file mode 100644 index 0000000..59610aa --- /dev/null +++ b/net/spdy/spdy_websocket_stream.h @@ -0,0 +1,99 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef NET_SPDY_SPDY_WEBSOCKET_STREAM_H_ +#define NET_SPDY_SPDY_WEBSOCKET_STREAM_H_ +#pragma once + +#include "base/basictypes.h" +#include "base/gtest_prod_util.h" +#include "base/memory/linked_ptr.h" +#include "base/memory/ref_counted.h" +#include "base/time.h" +#include "net/base/completion_callback.h" +#include "net/base/request_priority.h" +#include "net/spdy/spdy_framer.h" +#include "net/spdy/spdy_stream.h" + +namespace net { + +// The SpdyWebSocketStream is a WebSocket-specific type of stream known to a +// SpdySession. WebSocket's opening handshake is converted to SPDY's +// SYN_STREAM/SYN_REPLY. WebSocket frames are encapsulated as SPDY data frames. +class NET_TEST SpdyWebSocketStream + : public SpdyStream::Delegate { + public: + // Delegate handles asynchronous events. + class NET_TEST Delegate { + public: + // Called when InitializeStream() finishes asynchronously. This delegate is + // called if InitializeStream() returns ERR_IO_PENDING. |status| indicates + // network error. + virtual void OnCreatedSpdyStream(int status) = 0; + + // Called on corresponding to OnSendHeadersComplete() or SPDY's SYN frame + // has been sent. + virtual void OnSentSpdyHeaders(int status) = 0; + + // Called on corresponding to OnResponseReceived() or SPDY's SYN_STREAM, + // SYN_REPLY, or HEADERS frames are received. This callback may be called + // multiple times as SPDY's delegate does. + virtual int OnReceivedSpdyResponseHeader( + const spdy::SpdyHeaderBlock& headers, + int status) = 0; + + // Called when data is sent. + virtual void OnSentSpdyData(int amount_sent) = 0; + + // Called when data is received. + virtual void OnReceivedSpdyData(const char* data, int length) = 0; + + // Called when SpdyStream is closed. + virtual void OnCloseSpdyStream() = 0; + }; + + SpdyWebSocketStream(SpdySession* spdy_session, Delegate* delegate); + virtual ~SpdyWebSocketStream(); + + // Initializes SPDY stream for the WebSocket. + // It might create SPDY stream asynchronously. In this case, this method + // returns ERR_IO_PENDING and call OnCreatedSpdyStream delegate with result + // after completion. In other cases, delegate does not be called. + int InitializeStream(const GURL& url, + RequestPriority request_priority, + const BoundNetLog& stream_net_log); + + int SendRequest(const linked_ptr<spdy::SpdyHeaderBlock>& headers); + int SendData(const char* data, int length); + void Close(); + + // SpdyStream::Delegate + virtual bool OnSendHeadersComplete(int status); + virtual int OnSendBody(); + virtual int OnSendBodyComplete(int status, bool* eof); + virtual int OnResponseReceived(const spdy::SpdyHeaderBlock& response, + base::Time response_time, + int status); + virtual void OnDataReceived(const char* data, int length); + virtual void OnDataSent(int length); + virtual void OnClose(int status); + virtual void set_chunk_callback(ChunkCallback* callback); + + private: + FRIEND_TEST_ALL_PREFIXES(SpdyWebSocketStreamTest, Basic); + + void OnSpdyStreamCreated(int status); + + scoped_refptr<SpdyStream> stream_; + scoped_refptr<SpdySession> spdy_session_; + Delegate* delegate_; + + CompletionCallbackImpl<SpdyWebSocketStream> spdy_stream_created_callback_; + + DISALLOW_COPY_AND_ASSIGN(SpdyWebSocketStream); +}; + +} // namespace net + +#endif // NET_SPDY_SPDY_WEBSOCKET_STREAM_H_ diff --git a/net/spdy/spdy_websocket_stream_unittest.cc b/net/spdy/spdy_websocket_stream_unittest.cc new file mode 100644 index 0000000..310c533 --- /dev/null +++ b/net/spdy/spdy_websocket_stream_unittest.cc @@ -0,0 +1,683 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "net/spdy/spdy_websocket_stream.h" + +#include <string> +#include <vector> + +#include "net/base/completion_callback.h" +#include "net/proxy/proxy_server.h" +#include "net/spdy/spdy_http_utils.h" +#include "net/spdy/spdy_protocol.h" +#include "net/spdy/spdy_session.h" +#include "net/spdy/spdy_test_util.h" +#include "testing/gtest/include/gtest/gtest.h" + +namespace { + +spdy::SpdyFrame* ConstructSpdyWebSocketHandshakeReq( + const char* const url, + const char* const origin, + const char* const protocol, + bool compressed, + spdy::SpdyStreamId stream_id, + net::RequestPriority request_priority) { + const net::SpdyHeaderInfo kSynStreamHeader = { + spdy::SYN_STREAM, + stream_id, + 0, // Associated stream ID + net::ConvertRequestPriorityToSpdyPriority(request_priority), + spdy::CONTROL_FLAG_NONE, + compressed, + spdy::INVALID, // Status + NULL, // Data, + 0, // Length + spdy::DATA_FLAG_NONE + }; + + const char* const headers[] = { + "url", + url, + "origin", + origin, + "protocol", + protocol, + }; + int header_size = arraysize(headers) / 2; + if (protocol == NULL) + header_size -= 1; + + return ConstructSpdyPacket( + kSynStreamHeader, + NULL, + 0, + headers, + header_size); +} + +spdy::SpdyFrame* ConstructSpdyWebSocketHandshakeResp( + const char* const url, + const char* const origin, + const char* const protocol, + bool compressed, + spdy::SpdyStreamId stream_id, + net::RequestPriority request_priority) { + const net::SpdyHeaderInfo kSynReplyHeader = { + spdy::SYN_REPLY, + stream_id, + 0, // Associated stream ID + net::ConvertRequestPriorityToSpdyPriority(request_priority), + spdy::CONTROL_FLAG_NONE, + false, + spdy::INVALID, // Status + NULL, // Data + 0, // Length + spdy::DATA_FLAG_NONE + }; + + const char* const headers[] = { + "sec-websocket-location", + url, + "sec-websocket-origin", + origin, + "sec-websocket-protocol", + protocol, + }; + int header_size = arraysize(headers) / 2; + if (protocol == NULL) + header_size -= 1; + + return ConstructSpdyPacket( + kSynReplyHeader, + NULL, + 0, + headers, + header_size); +} + +spdy::SpdyFrame* ConstructSpdyWebSocketFrame( + const char* data, + int len, + spdy::SpdyStreamId stream_id, + bool fin) { + spdy::SpdyFramer framer; + return framer.CreateDataFrame( + stream_id, data, len, + fin ? spdy::DATA_FLAG_FIN : spdy::DATA_FLAG_NONE); +} + +struct SpdyWebSocketStreamEvent { + enum EventType { + EVENT_CREATED, + EVENT_SENT_HEADERS, + EVENT_RECEIVED_HEADER, + EVENT_SENT_DATA, + EVENT_RECEIVED_DATA, + EVENT_CLOSE, + }; + SpdyWebSocketStreamEvent(EventType type, + const spdy::SpdyHeaderBlock& headers, + int result, + const std::string& data) + : event_type(type), + headers(headers), + result(result), + data(data) {} + + EventType event_type; + spdy::SpdyHeaderBlock headers; + int result; + std::string data; +}; + +} // namespace + +namespace net { + +class SpdyWebSocketStreamEventRecorder : public SpdyWebSocketStream::Delegate { + public: + explicit SpdyWebSocketStreamEventRecorder(CompletionCallback* callback) + : callback_(callback) {} + virtual ~SpdyWebSocketStreamEventRecorder() {} + + void SetOnCreated(Callback1<SpdyWebSocketStreamEvent*>::Type* callback) { + on_created_.reset(callback); + } + void SetOnSentHeaders(Callback1<SpdyWebSocketStreamEvent*>::Type* callback) { + on_sent_headers_.reset(callback); + } + void SetOnReceivedHeader( + Callback1<SpdyWebSocketStreamEvent*>::Type* callback) { + on_received_header_.reset(callback); + } + void SetOnSentData(Callback1<SpdyWebSocketStreamEvent*>::Type* callback) { + on_sent_data_.reset(callback); + } + void SetOnReceivedData( + Callback1<SpdyWebSocketStreamEvent*>::Type* callback) { + on_received_data_.reset(callback); + } + void SetOnClose(Callback1<SpdyWebSocketStreamEvent*>::Type* callback) { + on_close_.reset(callback); + } + + virtual void OnCreatedSpdyStream(int result) { + events_.push_back( + SpdyWebSocketStreamEvent(SpdyWebSocketStreamEvent::EVENT_CREATED, + spdy::SpdyHeaderBlock(), + result, + std::string())); + if (on_created_.get()) + on_created_->Run(&events_.back()); + } + virtual void OnSentSpdyHeaders(int result) { + events_.push_back( + SpdyWebSocketStreamEvent(SpdyWebSocketStreamEvent::EVENT_SENT_HEADERS, + spdy::SpdyHeaderBlock(), + result, + std::string())); + if (on_sent_data_.get()) + on_sent_data_->Run(&events_.back()); + } + virtual int OnReceivedSpdyResponseHeader( + const spdy::SpdyHeaderBlock& headers, int status) { + events_.push_back( + SpdyWebSocketStreamEvent( + SpdyWebSocketStreamEvent::EVENT_RECEIVED_HEADER, + headers, + status, + std::string())); + if (on_received_header_.get()) + on_received_header_->Run(&events_.back()); + return status; + } + virtual void OnSentSpdyData(int amount_sent) { + events_.push_back( + SpdyWebSocketStreamEvent( + SpdyWebSocketStreamEvent::EVENT_SENT_DATA, + spdy::SpdyHeaderBlock(), + amount_sent, + std::string())); + if (on_sent_data_.get()) + on_sent_data_->Run(&events_.back()); + } + virtual void OnReceivedSpdyData(const char* data, int length) { + events_.push_back( + SpdyWebSocketStreamEvent( + SpdyWebSocketStreamEvent::EVENT_RECEIVED_DATA, + spdy::SpdyHeaderBlock(), + length, + std::string(data, length))); + if (on_received_data_.get()) + on_received_data_->Run(&events_.back()); + } + virtual void OnCloseSpdyStream() { + events_.push_back( + SpdyWebSocketStreamEvent( + SpdyWebSocketStreamEvent::EVENT_CLOSE, + spdy::SpdyHeaderBlock(), + OK, + std::string())); + if (on_close_.get()) + on_close_->Run(&events_.back()); + if (callback_) + callback_->Run(OK); + } + + const std::vector<SpdyWebSocketStreamEvent>& GetSeenEvents() const { + return events_; + } + + private: + std::vector<SpdyWebSocketStreamEvent> events_; + scoped_ptr<Callback1<SpdyWebSocketStreamEvent*>::Type> on_created_; + scoped_ptr<Callback1<SpdyWebSocketStreamEvent*>::Type> on_sent_headers_; + scoped_ptr<Callback1<SpdyWebSocketStreamEvent*>::Type> on_received_header_; + scoped_ptr<Callback1<SpdyWebSocketStreamEvent*>::Type> on_sent_data_; + scoped_ptr<Callback1<SpdyWebSocketStreamEvent*>::Type> on_received_data_; + scoped_ptr<Callback1<SpdyWebSocketStreamEvent*>::Type> on_close_; + CompletionCallback* callback_; + + DISALLOW_COPY_AND_ASSIGN(SpdyWebSocketStreamEventRecorder); +}; + +class SpdyWebSocketStreamTest : public testing::Test { + public: + OrderedSocketData* data() { return data_; } + + void DoSendHelloFrame(SpdyWebSocketStreamEvent* event) { + websocket_stream_->SendData(kMessageFrame, kMessageFrameLength); + } + + void DoSendClosingFrame(SpdyWebSocketStreamEvent* event) { + websocket_stream_->SendData(kClosingFrame, kClosingFrameLength); + } + + void DoClose(SpdyWebSocketStreamEvent* event) { + websocket_stream_->Close(); + } + + void DoSync(SpdyWebSocketStreamEvent* event) { + sync_callback_.Run(OK); + } + + protected: + SpdyWebSocketStreamTest() {} + virtual ~SpdyWebSocketStreamTest() {} + + virtual void SetUp() { + EnableCompression(false); + SpdySession::SetSSLMode(false); + + host_port_pair_.set_host("example.com"); + host_port_pair_.set_port(80); + host_port_proxy_pair_.first = host_port_pair_; + host_port_proxy_pair_.second = ProxyServer::Direct(); + + const size_t max_concurrent_streams = 1; + spdy::SettingsFlagsAndId id(0); + id.set_id(spdy::SETTINGS_MAX_CONCURRENT_STREAMS); + + id.set_flags(spdy::SETTINGS_FLAG_PLEASE_PERSIST); + spdy_settings_to_set_.push_back( + spdy::SpdySetting(id, max_concurrent_streams)); + + id.set_flags(spdy::SETTINGS_FLAG_PERSISTED); + spdy_settings_to_send_.push_back( + spdy::SpdySetting(id, max_concurrent_streams)); + } + virtual void TearDown() { + MessageLoop::current()->RunAllPending(); + } + + void EnableCompression(bool enabled) { + spdy::SpdyFramer::set_enable_compression_default(enabled); + } + void Prepare(spdy::SpdyStreamId stream_id) { + stream_id_ = stream_id; + + request_frame_.reset(ConstructSpdyWebSocketHandshakeReq( + "ws://example.com/echo", + "http://example.com/wsdemo", + NULL, + false, + stream_id_, + HIGHEST)); + response_frame_.reset(ConstructSpdyWebSocketHandshakeResp( + "ws://example.com/echo", + "http://example.com/wsdemo", + NULL, + false, + stream_id_, + HIGHEST)); + + message_frame_.reset(ConstructSpdyWebSocketFrame( + kMessageFrame, kMessageFrameLength, stream_id_, false)); + + closing_frame_.reset(ConstructSpdyWebSocketFrame( + kClosingFrame, kClosingFrameLength, stream_id_, false)); + } + int InitSession(MockRead* reads, size_t reads_count, + MockWrite* writes, size_t writes_count, + bool throttling) { + data_ = new OrderedSocketData(reads, reads_count, writes, writes_count); + session_deps_.socket_factory->AddSocketDataProvider(data_.get()); + http_session_ = SpdySessionDependencies::SpdyCreateSession(&session_deps_); + SpdySessionPool* spdy_session_pool(http_session_->spdy_session_pool()); + + if (throttling) { + // Set max concurrent streams to 1. + spdy_session_pool->mutable_spdy_settings()->Set( + host_port_pair_, spdy_settings_to_set_); + } + + EXPECT_FALSE(spdy_session_pool->HasSession(host_port_proxy_pair_)); + session_ = spdy_session_pool->Get(host_port_proxy_pair_, BoundNetLog()); + EXPECT_TRUE(spdy_session_pool->HasSession(host_port_proxy_pair_)); + transport_params_ = new TransportSocketParams(host_port_pair_, MEDIUM, + GURL(), false, false); + TestCompletionCallback callback; + scoped_ptr<ClientSocketHandle> connection(new ClientSocketHandle); + EXPECT_EQ(ERR_IO_PENDING, + connection->Init(host_port_pair_.ToString(), transport_params_, + MEDIUM, &callback, + http_session_->transport_socket_pool(), + BoundNetLog())); + EXPECT_EQ(OK, callback.WaitForResult()); + return session_->InitializeWithSocket(connection.release(), false, OK); + } + void SendRequest() { + linked_ptr<spdy::SpdyHeaderBlock> headers(new spdy::SpdyHeaderBlock); + (*headers)["url"] = "ws://example.com/echo"; + (*headers)["origin"] = "http://example.com/wsdemo"; + + websocket_stream_->SendRequest(headers); + } + + spdy::SpdySettings spdy_settings_to_set_; + spdy::SpdySettings spdy_settings_to_send_; + SpdySessionDependencies session_deps_; + scoped_refptr<OrderedSocketData> data_; + scoped_refptr<HttpNetworkSession> http_session_; + scoped_refptr<SpdySession> session_; + scoped_refptr<TransportSocketParams> transport_params_; + scoped_ptr<SpdyWebSocketStream> websocket_stream_; + spdy::SpdyStreamId stream_id_; + scoped_ptr<spdy::SpdyFrame> request_frame_; + scoped_ptr<spdy::SpdyFrame> response_frame_; + scoped_ptr<spdy::SpdyFrame> message_frame_; + scoped_ptr<spdy::SpdyFrame> closing_frame_; + HostPortPair host_port_pair_; + HostPortProxyPair host_port_proxy_pair_; + TestCompletionCallback completion_callback_; + TestCompletionCallback sync_callback_; + + static const char kMessageFrame[]; + static const char kClosingFrame[]; + static const size_t kMessageFrameLength; + static const size_t kClosingFrameLength; +}; + +const char SpdyWebSocketStreamTest::kMessageFrame[] = "\0hello\xff"; +const char SpdyWebSocketStreamTest::kClosingFrame[] = "\xff\0"; +const size_t SpdyWebSocketStreamTest::kMessageFrameLength = + arraysize(SpdyWebSocketStreamTest::kMessageFrame) - 1; +const size_t SpdyWebSocketStreamTest::kClosingFrameLength = + arraysize(SpdyWebSocketStreamTest::kClosingFrame) - 1; + +TEST_F(SpdyWebSocketStreamTest, Basic) { + Prepare(1); + MockWrite writes[] = { + CreateMockWrite(*request_frame_.get(), 1), + CreateMockWrite(*message_frame_.get(), 3), + CreateMockWrite(*closing_frame_.get(), 5) + }; + + MockRead reads[] = { + CreateMockRead(*response_frame_.get(), 2), + CreateMockRead(*message_frame_.get(), 4), + // Skip sequence 6 to notify closing has been sent. + CreateMockRead(*closing_frame_.get(), 7), + MockRead(false, 0, 8) // EOF cause OnCloseSpdyStream event. + }; + + EXPECT_EQ(OK, InitSession(reads, arraysize(reads), + writes, arraysize(writes), false)); + + SpdyWebSocketStreamEventRecorder delegate(&completion_callback_); + SpdyWebSocketStreamTest* test = this; // Necessary for NewCallback. + delegate.SetOnReceivedHeader( + NewCallback(test, &SpdyWebSocketStreamTest::DoSendHelloFrame)); + delegate.SetOnReceivedData( + NewCallback(test, &SpdyWebSocketStreamTest::DoSendClosingFrame)); + + websocket_stream_.reset(new SpdyWebSocketStream(session_, &delegate)); + + BoundNetLog net_log; + GURL url("ws://example.com/echo"); + ASSERT_EQ(OK, websocket_stream_->InitializeStream(url, HIGHEST, net_log)); + + ASSERT_TRUE(websocket_stream_->stream_); + EXPECT_EQ(stream_id_, websocket_stream_->stream_->stream_id()); + + SendRequest(); + + completion_callback_.WaitForResult(); + + websocket_stream_.reset(); + + const std::vector<SpdyWebSocketStreamEvent>& events = + delegate.GetSeenEvents(); + ASSERT_EQ(7U, events.size()); + + EXPECT_EQ(SpdyWebSocketStreamEvent::EVENT_SENT_HEADERS, + events[0].event_type); + EXPECT_LT(0, events[0].result); + EXPECT_EQ(SpdyWebSocketStreamEvent::EVENT_RECEIVED_HEADER, + events[1].event_type); + EXPECT_EQ(OK, events[1].result); + EXPECT_EQ(SpdyWebSocketStreamEvent::EVENT_SENT_DATA, + events[2].event_type); + EXPECT_EQ(static_cast<int>(kMessageFrameLength), events[2].result); + EXPECT_EQ(SpdyWebSocketStreamEvent::EVENT_RECEIVED_DATA, + events[3].event_type); + EXPECT_EQ(static_cast<int>(kMessageFrameLength), events[3].result); + EXPECT_EQ(SpdyWebSocketStreamEvent::EVENT_SENT_DATA, + events[4].event_type); + EXPECT_EQ(static_cast<int>(kClosingFrameLength), events[4].result); + EXPECT_EQ(SpdyWebSocketStreamEvent::EVENT_RECEIVED_DATA, + events[5].event_type); + EXPECT_EQ(static_cast<int>(kClosingFrameLength), events[5].result); + EXPECT_EQ(SpdyWebSocketStreamEvent::EVENT_CLOSE, + events[6].event_type); + EXPECT_EQ(OK, events[6].result); + + // EOF close SPDY session. + EXPECT_TRUE(!http_session_->spdy_session_pool()->HasSession( + host_port_proxy_pair_)); + EXPECT_TRUE(data()->at_read_eof()); + EXPECT_TRUE(data()->at_write_eof()); +} + +TEST_F(SpdyWebSocketStreamTest, DestructionBeforeClose) { + Prepare(1); + MockWrite writes[] = { + CreateMockWrite(*request_frame_.get(), 1), + CreateMockWrite(*message_frame_.get(), 3) + }; + + MockRead reads[] = { + CreateMockRead(*response_frame_.get(), 2), + CreateMockRead(*message_frame_.get(), 4), + MockRead(true, ERR_IO_PENDING, 5) + }; + + EXPECT_EQ(OK, InitSession(reads, arraysize(reads), + writes, arraysize(writes), false)); + + SpdyWebSocketStreamEventRecorder delegate(&completion_callback_); + SpdyWebSocketStreamTest* test = this; // Necessary for NewCallback. + delegate.SetOnReceivedHeader( + NewCallback(test, &SpdyWebSocketStreamTest::DoSendHelloFrame)); + delegate.SetOnReceivedData( + NewCallback(test, &SpdyWebSocketStreamTest::DoSync)); + + websocket_stream_.reset(new SpdyWebSocketStream(session_, &delegate)); + + BoundNetLog net_log; + GURL url("ws://example.com/echo"); + ASSERT_EQ(OK, websocket_stream_->InitializeStream(url, HIGHEST, net_log)); + + SendRequest(); + + sync_callback_.WaitForResult(); + + // WebSocketStream destruction remove its SPDY stream from the session. + EXPECT_TRUE(session_->IsStreamActive(stream_id_)); + websocket_stream_.reset(); + EXPECT_FALSE(session_->IsStreamActive(stream_id_)); + + const std::vector<SpdyWebSocketStreamEvent>& events = + delegate.GetSeenEvents(); + ASSERT_GE(4U, events.size()); + + EXPECT_EQ(SpdyWebSocketStreamEvent::EVENT_SENT_HEADERS, + events[0].event_type); + EXPECT_LT(0, events[0].result); + EXPECT_EQ(SpdyWebSocketStreamEvent::EVENT_RECEIVED_HEADER, + events[1].event_type); + EXPECT_EQ(OK, events[1].result); + EXPECT_EQ(SpdyWebSocketStreamEvent::EVENT_SENT_DATA, + events[2].event_type); + EXPECT_EQ(static_cast<int>(kMessageFrameLength), events[2].result); + EXPECT_EQ(SpdyWebSocketStreamEvent::EVENT_RECEIVED_DATA, + events[3].event_type); + EXPECT_EQ(static_cast<int>(kMessageFrameLength), events[3].result); + + EXPECT_TRUE(http_session_->spdy_session_pool()->HasSession( + host_port_proxy_pair_)); + EXPECT_TRUE(data()->at_read_eof()); + EXPECT_TRUE(data()->at_write_eof()); +} + +TEST_F(SpdyWebSocketStreamTest, DestructionAfterExplicitClose) { + Prepare(1); + MockWrite writes[] = { + CreateMockWrite(*request_frame_.get(), 1), + CreateMockWrite(*message_frame_.get(), 3), + CreateMockWrite(*closing_frame_.get(), 5) + }; + + MockRead reads[] = { + CreateMockRead(*response_frame_.get(), 2), + CreateMockRead(*message_frame_.get(), 4), + MockRead(true, ERR_IO_PENDING, 6) + }; + + EXPECT_EQ(OK, InitSession(reads, arraysize(reads), + writes, arraysize(writes), false)); + + SpdyWebSocketStreamEventRecorder delegate(&completion_callback_); + SpdyWebSocketStreamTest* test = this; // Necessary for NewCallback. + delegate.SetOnReceivedHeader( + NewCallback(test, &SpdyWebSocketStreamTest::DoSendHelloFrame)); + delegate.SetOnReceivedData( + NewCallback(test, &SpdyWebSocketStreamTest::DoClose)); + + websocket_stream_.reset(new SpdyWebSocketStream(session_, &delegate)); + + BoundNetLog net_log; + GURL url("ws://example.com/echo"); + ASSERT_EQ(OK, websocket_stream_->InitializeStream(url, HIGHEST, net_log)); + + SendRequest(); + + completion_callback_.WaitForResult(); + + // SPDY stream has already been removed from the session by Close(). + EXPECT_FALSE(session_->IsStreamActive(stream_id_)); + websocket_stream_.reset(); + + const std::vector<SpdyWebSocketStreamEvent>& events = + delegate.GetSeenEvents(); + ASSERT_EQ(5U, events.size()); + + EXPECT_EQ(SpdyWebSocketStreamEvent::EVENT_SENT_HEADERS, + events[0].event_type); + EXPECT_LT(0, events[0].result); + EXPECT_EQ(SpdyWebSocketStreamEvent::EVENT_RECEIVED_HEADER, + events[1].event_type); + EXPECT_EQ(OK, events[1].result); + EXPECT_EQ(SpdyWebSocketStreamEvent::EVENT_SENT_DATA, + events[2].event_type); + EXPECT_EQ(static_cast<int>(kMessageFrameLength), events[2].result); + EXPECT_EQ(SpdyWebSocketStreamEvent::EVENT_RECEIVED_DATA, + events[3].event_type); + EXPECT_EQ(static_cast<int>(kMessageFrameLength), events[3].result); + EXPECT_EQ(SpdyWebSocketStreamEvent::EVENT_CLOSE, events[4].event_type); + + EXPECT_TRUE(http_session_->spdy_session_pool()->HasSession( + host_port_proxy_pair_)); +} + +TEST_F(SpdyWebSocketStreamTest, IOPending) { + Prepare(3); + scoped_ptr<spdy::SpdyFrame> settings_frame( + ConstructSpdySettings(spdy_settings_to_send_)); + MockWrite writes[] = { + // Setting throttling make SpdySession send settings frame automatically. + CreateMockWrite(*settings_frame), + CreateMockWrite(*request_frame_.get(), 2), + CreateMockWrite(*message_frame_.get(), 4), + CreateMockWrite(*closing_frame_.get(), 6) + }; + + MockRead reads[] = { + CreateMockRead(*response_frame_.get(), 3), + CreateMockRead(*message_frame_.get(), 5), + // Skip sequence 7 to notify closing has been sent. + CreateMockRead(*closing_frame_.get(), 8), + MockRead(false, 0, 9) // EOF cause OnCloseSpdyStream event. + }; + + EXPECT_EQ(OK, InitSession(reads, arraysize(reads), + writes, arraysize(writes), true)); + + // Create a dummy WebSocketStream which cause ERR_IO_PENDING to another + // WebSocketStream under test. + SpdyWebSocketStreamTest* test = this; // Necessary for NewCallback. + SpdyWebSocketStreamEventRecorder block_delegate(NULL); + + scoped_ptr<SpdyWebSocketStream> block_stream( + new SpdyWebSocketStream(session_, &block_delegate)); + BoundNetLog block_net_log; + GURL block_url("ws://example.com/block"); + ASSERT_EQ(OK, + block_stream->InitializeStream(block_url, HIGHEST, block_net_log)); + + // Create a WebSocketStream under test. + SpdyWebSocketStreamEventRecorder delegate(&completion_callback_); + delegate.SetOnCreated(NewCallback(test, &SpdyWebSocketStreamTest::DoSync)); + delegate.SetOnReceivedHeader( + NewCallback(test, &SpdyWebSocketStreamTest::DoSendHelloFrame)); + delegate.SetOnReceivedData( + NewCallback(test, &SpdyWebSocketStreamTest::DoSendClosingFrame)); + + websocket_stream_.reset(new SpdyWebSocketStream(session_, &delegate)); + BoundNetLog net_log; + GURL url("ws://example.com/echo"); + ASSERT_EQ(ERR_IO_PENDING, websocket_stream_->InitializeStream( + url, HIGHEST, net_log)); + + // Delete the fist stream to allow create the second stream. + block_stream.reset(); + ASSERT_EQ(OK, sync_callback_.WaitForResult()); + + SendRequest(); + + completion_callback_.WaitForResult(); + + websocket_stream_.reset(); + + const std::vector<SpdyWebSocketStreamEvent>& block_events = + block_delegate.GetSeenEvents(); + ASSERT_EQ(0U, block_events.size()); + + const std::vector<SpdyWebSocketStreamEvent>& events = + delegate.GetSeenEvents(); + ASSERT_EQ(8U, events.size()); + EXPECT_EQ(SpdyWebSocketStreamEvent::EVENT_CREATED, + events[0].event_type); + EXPECT_EQ(0, events[0].result); + EXPECT_EQ(SpdyWebSocketStreamEvent::EVENT_SENT_HEADERS, + events[1].event_type); + EXPECT_LT(0, events[1].result); + EXPECT_EQ(SpdyWebSocketStreamEvent::EVENT_RECEIVED_HEADER, + events[2].event_type); + EXPECT_EQ(OK, events[2].result); + EXPECT_EQ(SpdyWebSocketStreamEvent::EVENT_SENT_DATA, + events[3].event_type); + EXPECT_EQ(static_cast<int>(kMessageFrameLength), events[3].result); + EXPECT_EQ(SpdyWebSocketStreamEvent::EVENT_RECEIVED_DATA, + events[4].event_type); + EXPECT_EQ(static_cast<int>(kMessageFrameLength), events[4].result); + EXPECT_EQ(SpdyWebSocketStreamEvent::EVENT_SENT_DATA, + events[5].event_type); + EXPECT_EQ(static_cast<int>(kClosingFrameLength), events[5].result); + EXPECT_EQ(SpdyWebSocketStreamEvent::EVENT_RECEIVED_DATA, + events[6].event_type); + EXPECT_EQ(static_cast<int>(kClosingFrameLength), events[6].result); + EXPECT_EQ(SpdyWebSocketStreamEvent::EVENT_CLOSE, + events[7].event_type); + EXPECT_EQ(OK, events[7].result); + + // EOF close SPDY session. + EXPECT_TRUE(!http_session_->spdy_session_pool()->HasSession( + host_port_proxy_pair_)); + EXPECT_TRUE(data()->at_read_eof()); + EXPECT_TRUE(data()->at_write_eof()); +} + +} // namespace net |