diff options
author | ukai@chromium.org <ukai@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2010-06-14 04:13:40 +0000 |
---|---|---|
committer | ukai@chromium.org <ukai@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2010-06-14 04:13:40 +0000 |
commit | 65d56aaadc75327749c62400a846a9e2cb12ea73 (patch) | |
tree | aa05833579eb90d65de4d1f232810268f512c97c | |
parent | 495caf3da866d8967176bf4102532dba412a1012 (diff) | |
download | chromium_src-65d56aaadc75327749c62400a846a9e2cb12ea73.zip chromium_src-65d56aaadc75327749c62400a846a9e2cb12ea73.tar.gz chromium_src-65d56aaadc75327749c62400a846a9e2cb12ea73.tar.bz2 |
Refactor SpdyStream.
Split SpdyStream into two parts: base SpdyStream and SpdyHttpStream.
SpdyStream is an interface to SpdySession and provides base implementation
as spdy stream.
SpdyHttpStream is derived class of SpdyStream and used for
[Spdy|Http]NetworkTransaction.
BUG=42320
TEST=none
Review URL: http://codereview.chromium.org/2564001
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@49667 0039d316-1c4b-4281-b951-d872f2087c98
-rw-r--r-- | net/http/http_network_transaction.cc | 2 | ||||
-rw-r--r-- | net/http/http_network_transaction.h | 4 | ||||
-rw-r--r-- | net/net.gyp | 4 | ||||
-rw-r--r-- | net/spdy/spdy_http_stream.cc | 226 | ||||
-rw-r--r-- | net/spdy/spdy_http_stream.h | 122 | ||||
-rw-r--r-- | net/spdy/spdy_http_stream_unittest.cc (renamed from net/spdy/spdy_stream_unittest.cc) | 13 | ||||
-rw-r--r-- | net/spdy/spdy_network_transaction.cc | 2 | ||||
-rw-r--r-- | net/spdy/spdy_network_transaction.h | 4 | ||||
-rw-r--r-- | net/spdy/spdy_session.cc | 32 | ||||
-rw-r--r-- | net/spdy/spdy_session.h | 13 | ||||
-rw-r--r-- | net/spdy/spdy_session_unittest.cc | 19 | ||||
-rw-r--r-- | net/spdy/spdy_stream.cc | 302 | ||||
-rw-r--r-- | net/spdy/spdy_stream.h | 115 |
13 files changed, 510 insertions, 348 deletions
diff --git a/net/http/http_network_transaction.cc b/net/http/http_network_transaction.cc index 0cd9748..13fbf9d 100644 --- a/net/http/http_network_transaction.cc +++ b/net/http/http_network_transaction.cc @@ -39,9 +39,9 @@ #include "net/socket/socks_client_socket_pool.h" #include "net/socket/ssl_client_socket.h" #include "net/socket/tcp_client_socket_pool.h" +#include "net/spdy/spdy_http_stream.h" #include "net/spdy/spdy_session.h" #include "net/spdy/spdy_session_pool.h" -#include "net/spdy/spdy_stream.h" using base::Time; diff --git a/net/http/http_network_transaction.h b/net/http/http_network_transaction.h index 21f15dc..439da93 100644 --- a/net/http/http_network_transaction.h +++ b/net/http/http_network_transaction.h @@ -31,10 +31,10 @@ namespace net { class ClientSocketFactory; class ClientSocketHandle; -class SpdyStream; class HttpNetworkSession; class HttpRequestHeaders; class HttpStream; +class SpdyHttpStream; class HttpNetworkTransaction : public HttpTransaction { public: @@ -322,7 +322,7 @@ class HttpNetworkTransaction : public HttpTransaction { scoped_ptr<ClientSocketHandle> connection_; scoped_ptr<HttpStream> http_stream_; - scoped_refptr<SpdyStream> spdy_stream_; + scoped_refptr<SpdyHttpStream> spdy_stream_; bool reused_socket_; // True if we've validated the headers that the stream parser has returned. diff --git a/net/net.gyp b/net/net.gyp index e6f9310..543fbb1 100644 --- a/net/net.gyp +++ b/net/net.gyp @@ -478,6 +478,8 @@ 'spdy/spdy_frame_builder.h', 'spdy/spdy_framer.cc', 'spdy/spdy_framer.h', + 'spdy/spdy_http_stream.cc', + 'spdy/spdy_http_stream.h', 'spdy/spdy_io_buffer.cc', 'spdy/spdy_io_buffer.h', 'spdy/spdy_network_transaction.cc', @@ -739,10 +741,10 @@ 'socket_stream/socket_stream_metrics_unittest.cc', 'socket_stream/socket_stream_unittest.cc', 'spdy/spdy_framer_test.cc', + 'spdy/spdy_http_stream_unittest.cc', 'spdy/spdy_network_transaction_unittest.cc', 'spdy/spdy_protocol_test.cc', 'spdy/spdy_session_unittest.cc', - 'spdy/spdy_stream_unittest.cc', 'spdy/spdy_test_util.h', 'tools/dump_cache/url_to_filename_encoder.cc', 'tools/dump_cache/url_to_filename_encoder.h', diff --git a/net/spdy/spdy_http_stream.cc b/net/spdy/spdy_http_stream.cc new file mode 100644 index 0000000..a8979d5 --- /dev/null +++ b/net/spdy/spdy_http_stream.cc @@ -0,0 +1,226 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "net/spdy/spdy_http_stream.h" + +#include <list> + +#include "base/logging.h" +#include "base/message_loop.h" +#include "net/http/http_request_info.h" +#include "net/http/http_response_info.h" +#include "net/spdy/spdy_session.h" + +namespace net { + +SpdyHttpStream::SpdyHttpStream( + SpdySession* session, spdy::SpdyStreamId stream_id, bool pushed) + : SpdyStream(session, stream_id, pushed), + download_finished_(false), + user_callback_(NULL), + user_buffer_len_(0), + buffered_read_callback_pending_(false), + more_read_data_pending_(false) {} + +SpdyHttpStream::~SpdyHttpStream() { + DLOG(INFO) << "Deleting SpdyHttpStream for stream " << stream_id(); +} + +uint64 SpdyHttpStream::GetUploadProgress() const { + if (!request_body_stream()) + return 0; + + return request_body_stream()->position(); +} + +int SpdyHttpStream::ReadResponseHeaders(CompletionCallback* callback) { + DCHECK(is_idle()); + // Note: The SpdyStream may have already received the response headers, so + // this call may complete synchronously. + CHECK(callback); + + int result = DoReadResponseHeaders(); + if (result == ERR_IO_PENDING) { + CHECK(!user_callback_); + user_callback_ = callback; + } + return result; +} + +int SpdyHttpStream::ReadResponseBody( + IOBuffer* buf, int buf_len, CompletionCallback* callback) { + DCHECK(is_idle()); + CHECK(buf); + CHECK(buf_len); + CHECK(callback); + CHECK(!cancelled()); + + // If we have data buffered, complete the IO immediately. + if (!response_body_.empty()) { + int bytes_read = 0; + while (!response_body_.empty() && buf_len > 0) { + scoped_refptr<IOBufferWithSize> data = response_body_.front(); + const int bytes_to_copy = std::min(buf_len, data->size()); + memcpy(&(buf->data()[bytes_read]), data->data(), bytes_to_copy); + buf_len -= bytes_to_copy; + if (bytes_to_copy == data->size()) { + response_body_.pop_front(); + } else { + const int bytes_remaining = data->size() - bytes_to_copy; + IOBufferWithSize* new_buffer = new IOBufferWithSize(bytes_remaining); + memcpy(new_buffer->data(), &(data->data()[bytes_to_copy]), + bytes_remaining); + response_body_.pop_front(); + response_body_.push_front(new_buffer); + } + bytes_read += bytes_to_copy; + } + return bytes_read; + } else if (response_complete()) { + return response_status(); + } + + CHECK(!user_callback_); + CHECK(!user_buffer_); + CHECK_EQ(0, user_buffer_len_); + + user_callback_ = callback; + user_buffer_ = buf; + user_buffer_len_ = buf_len; + return ERR_IO_PENDING; +} + +int SpdyHttpStream::SendRequest(UploadDataStream* upload_data, + HttpResponseInfo* response, + CompletionCallback* callback) { + CHECK(callback); + CHECK(!cancelled()); + CHECK(response); + + int result = DoSendRequest(upload_data, response); + if (result == ERR_IO_PENDING) { + CHECK(!user_callback_); + user_callback_ = callback; + } + return result; +} + +void SpdyHttpStream::Cancel() { + user_callback_ = NULL; + DoCancel(); +} + +int SpdyHttpStream::OnResponseReceived(const HttpResponseInfo& response) { + int rv = DoOnResponseReceived(response); + if (user_callback_) + DoCallback(rv); + return rv; +} + +bool SpdyHttpStream::OnDataReceived(const char* data, int length) { + bool result = DoOnDataReceived(data, length); + // Note that data may be received for a SpdyStream prior to the user calling + // ReadResponseBody(), therefore user_buffer_ may be NULL. This may often + // happen for server initiated streams. + if (result && !response_complete()) { + // Save the received data. + IOBufferWithSize* io_buffer = new IOBufferWithSize(length); + memcpy(io_buffer->data(), data, length); + response_body_.push_back(io_buffer); + + if (user_buffer_) { + // Handing small chunks of data to the caller creates measurable overhead. + // We buffer data in short time-spans and send a single read notification. + ScheduleBufferedReadCallback(); + } + } + return result; +} + +void SpdyHttpStream::OnWriteComplete(int status) { + DoOnWriteComplete(status); +} + +void SpdyHttpStream::OnClose(int status) { + if (status == net::OK) { + download_finished_ = true; + set_response_complete(true); + + // We need to complete any pending buffered read now. + DoBufferedReadCallback(); + } + DoOnClose(status); + if (user_callback_) + DoCallback(status); +} + +void SpdyHttpStream::ScheduleBufferedReadCallback() { + // If there is already a scheduled DoBufferedReadCallback, don't issue + // another one. Mark that we have received more data and return. + if (buffered_read_callback_pending_) { + more_read_data_pending_ = true; + return; + } + + more_read_data_pending_ = false; + buffered_read_callback_pending_ = true; + const int kBufferTimeMs = 1; + MessageLoop::current()->PostDelayedTask(FROM_HERE, NewRunnableMethod( + this, &SpdyHttpStream::DoBufferedReadCallback), kBufferTimeMs); +} + +// Checks to see if we should wait for more buffered data before notifying +// the caller. Returns true if we should wait, false otherwise. +bool SpdyHttpStream::ShouldWaitForMoreBufferedData() const { + // If the response is complete, there is no point in waiting. + if (response_complete()) + return false; + + int bytes_buffered = 0; + std::list<scoped_refptr<IOBufferWithSize> >::const_iterator it; + for (it = response_body_.begin(); + it != response_body_.end() && bytes_buffered < user_buffer_len_; + ++it) + bytes_buffered += (*it)->size(); + + return bytes_buffered < user_buffer_len_; +} + +void SpdyHttpStream::DoBufferedReadCallback() { + buffered_read_callback_pending_ = false; + + // If the transaction is cancelled or errored out, we don't need to complete + // the read. + if (response_status() != OK || cancelled()) + return; + + // When more_read_data_pending_ is true, it means that more data has + // arrived since we started waiting. Wait a little longer and continue + // to buffer. + if (more_read_data_pending_ && ShouldWaitForMoreBufferedData()) { + ScheduleBufferedReadCallback(); + return; + } + + int rv = 0; + if (user_buffer_) { + rv = ReadResponseBody(user_buffer_, user_buffer_len_, user_callback_); + CHECK_NE(rv, ERR_IO_PENDING); + user_buffer_ = NULL; + user_buffer_len_ = 0; + DoCallback(rv); + } +} + +void SpdyHttpStream::DoCallback(int rv) { + CHECK_NE(rv, ERR_IO_PENDING); + CHECK(user_callback_); + + // Since Run may result in being called back, clear user_callback_ in advance. + CompletionCallback* c = user_callback_; + user_callback_ = NULL; + c->Run(rv); +} + +} // namespace net diff --git a/net/spdy/spdy_http_stream.h b/net/spdy/spdy_http_stream.h new file mode 100644 index 0000000..d5131e8 --- /dev/null +++ b/net/spdy/spdy_http_stream.h @@ -0,0 +1,122 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef NET_SPDY_SPDY_HTTP_STREAM_H_ +#define NET_SPDY_SPDY_HTTP_STREAM_H_ + +#include <list> + +#include "base/basictypes.h" +#include "base/ref_counted.h" +#include "net/base/completion_callback.h" +#include "net/base/net_log.h" +#include "net/spdy/spdy_protocol.h" +#include "net/spdy/spdy_stream.h" + +namespace net { + +class HttpResponseInfo; +class IOBuffer; +class SpdySession; +class UploadData; +class UploadDataStream; + +// The SpdyHttpStream is a HTTP-specific type of stream known to a SpdySession. +class SpdyHttpStream : public SpdyStream { + public: + // SpdyHttpStream constructor + SpdyHttpStream( + SpdySession* session, spdy::SpdyStreamId stream_id, bool pushed); + + // =================================================== + // Interface for [Http|Spdy]NetworkTransaction to use. + + // Sends the request. If |upload_data| is non-NULL, sends that in the request + // body. |callback| is used when this completes asynchronously. Note that + // the actual SYN_STREAM packet will have already been sent by this point. + // Also note that SpdyStream takes ownership of |upload_data|. + int SendRequest(UploadDataStream* upload_data, + HttpResponseInfo* response, + CompletionCallback* callback); + + // Reads the response headers. Returns a net error code. + int ReadResponseHeaders(CompletionCallback* callback); + + // Reads the response body. Returns a net error code or the number of bytes + // read. + int ReadResponseBody( + IOBuffer* buf, int buf_len, CompletionCallback* callback); + + // Cancels the stream. Note that this does not immediately cause deletion of + // the stream. This function is used to cancel any callbacks from being + // invoked. TODO(willchan): It should also free up any memory associated with + // the stream, such as IOBuffers. + void Cancel(); + + // Returns the number of bytes uploaded. + uint64 GetUploadProgress() const; + + // =================================================== + // Interface for SpdySession to use. + + // Called by the SpdySession when a response (e.g. a SYN_REPLY) has been + // received for this stream. + // SpdyHttpSession calls back |callback| set by SendRequest or + // ReadResponseHeaders. + virtual int OnResponseReceived(const HttpResponseInfo& response); + + // Called by the SpdySession when response data has been received for this + // stream. This callback may be called multiple times as data arrives + // from the network, and will never be called prior to OnResponseReceived. + // SpdyHttpSession schedule to call back |callback| set by ReadResponseBody. + virtual bool OnDataReceived(const char* buffer, int bytes); + + // Called by the SpdySession when a write has completed. This callback + // will be called multiple times for each write which completes. Writes + // include the SYN_STREAM write and also DATA frame writes. + virtual void OnWriteComplete(int status); + + // Called by the SpdySession when the request is finished. This callback + // will always be called at the end of the request and signals to the + // stream that the stream has no more network events. No further callbacks + // to the stream will be made after this call. + // SpdyHttpSession call back |callback| set by SendRequest, + // ReadResponseHeaders or ReadResponseBody. + virtual void OnClose(int status); + + private: + friend class base::RefCounted<SpdyHttpStream>; + virtual ~SpdyHttpStream(); + + // Call the user callback. + void DoCallback(int rv); + + void ScheduleBufferedReadCallback(); + void DoBufferedReadCallback(); + bool ShouldWaitForMoreBufferedData() const; + + bool download_finished_; + + // We buffer the response body as it arrives asynchronously from the stream. + // TODO(mbelshe): is this infinite buffering? + std::list<scoped_refptr<IOBufferWithSize> > response_body_; + + CompletionCallback* user_callback_; + + // User provided buffer for the ReadResponseBody() response. + scoped_refptr<IOBuffer> user_buffer_; + int user_buffer_len_; + + // Is there a scheduled read callback pending. + bool buffered_read_callback_pending_; + // Has more data been received from the network during the wait for the + // scheduled read callback. + bool more_read_data_pending_; + + DISALLOW_COPY_AND_ASSIGN(SpdyHttpStream); +}; + +} // namespace net + +#endif // NET_SPDY_SPDY_HTTP_STREAM_H_ diff --git a/net/spdy/spdy_stream_unittest.cc b/net/spdy/spdy_http_stream_unittest.cc index 9f15fae..4057131 100644 --- a/net/spdy/spdy_stream_unittest.cc +++ b/net/spdy/spdy_http_stream_unittest.cc @@ -2,7 +2,7 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. -#include "net/spdy/spdy_stream.h" +#include "net/spdy/spdy_http_stream.h" #include "base/ref_counted.h" #include "net/base/mock_host_resolver.h" #include "net/base/net_errors.h" @@ -82,9 +82,9 @@ HttpNetworkSession* CreateSession(SessionDependencies* session_deps) { NULL); } -class SpdyStreamTest : public testing::Test { +class SpdyHttpStreamTest : public testing::Test { protected: - SpdyStreamTest() + SpdyHttpStreamTest() : session_(CreateSession(&session_deps_)), pool_peer_(session_->spdy_session_pool()) {} @@ -106,7 +106,7 @@ class SpdyStreamTest : public testing::Test { }; // Needs fixing, see http://crbug.com/28622 -TEST_F(SpdyStreamTest, SendRequest) { +TEST_F(SpdyHttpStreamTest, SendRequest) { scoped_refptr<SpdySession> session(CreateSpdySession()); HttpRequestInfo request; request.method = "GET"; @@ -114,9 +114,10 @@ TEST_F(SpdyStreamTest, SendRequest) { TestCompletionCallback callback; HttpResponseInfo response; - scoped_refptr<SpdyStream> stream(new SpdyStream(session, 1, false)); + scoped_refptr<SpdyHttpStream> stream(new SpdyHttpStream(session, 1, false)); stream->SetRequestInfo(request); - EXPECT_EQ(ERR_IO_PENDING, stream->SendRequest(NULL, &response, &callback)); + EXPECT_EQ(ERR_IO_PENDING, + stream->SendRequest(NULL, &response, &callback)); // Need to manually remove the spdy session since normally it gets removed on // socket close/error, but we aren't communicating over a socket here. diff --git a/net/spdy/spdy_network_transaction.cc b/net/spdy/spdy_network_transaction.cc index 20d833d..e79d889 100644 --- a/net/spdy/spdy_network_transaction.cc +++ b/net/spdy/spdy_network_transaction.cc @@ -19,7 +19,7 @@ #include "net/http/http_request_info.h" #include "net/http/http_response_info.h" #include "net/socket/tcp_client_socket_pool.h" -#include "net/spdy/spdy_stream.h" +#include "net/spdy/spdy_http_stream.h" using base::Time; diff --git a/net/spdy/spdy_network_transaction.h b/net/spdy/spdy_network_transaction.h index f2eb8d5..eb22781 100644 --- a/net/spdy/spdy_network_transaction.h +++ b/net/spdy/spdy_network_transaction.h @@ -22,7 +22,7 @@ namespace net { class SpdySession; -class SpdyStream; +class SpdyHttpStream; class HttpNetworkSession; class HttpResponseInfo; class IOBuffer; @@ -112,7 +112,7 @@ class SpdyNetworkTransaction : public HttpTransaction { // The next state in the state machine. State next_state_; - scoped_refptr<SpdyStream> stream_; + scoped_refptr<SpdyHttpStream> stream_; DISALLOW_COPY_AND_ASSIGN(SpdyNetworkTransaction); }; diff --git a/net/spdy/spdy_session.cc b/net/spdy/spdy_session.cc index ed99077..6cb33f8 100644 --- a/net/spdy/spdy_session.cc +++ b/net/spdy/spdy_session.cc @@ -26,6 +26,7 @@ #include "net/socket/client_socket_factory.h" #include "net/socket/ssl_client_socket.h" #include "net/spdy/spdy_frame_builder.h" +#include "net/spdy/spdy_http_stream.h" #include "net/spdy/spdy_protocol.h" #include "net/spdy/spdy_settings_storage.h" #include "net/spdy/spdy_stream.h" @@ -342,18 +343,19 @@ net::Error SpdySession::Connect(const std::string& group_name, return static_cast<net::Error>(rv); } -scoped_refptr<SpdyStream> SpdySession::GetOrCreateStream( +scoped_refptr<SpdyHttpStream> SpdySession::GetOrCreateStream( const HttpRequestInfo& request, const UploadDataStream* upload_data, const BoundNetLog& stream_net_log) { const GURL& url = request.url; const std::string& path = url.PathForRequest(); - scoped_refptr<SpdyStream> stream; + scoped_refptr<SpdyHttpStream> stream; // Check if we have a push stream for this path. if (request.method == "GET") { - stream = GetPushStream(path); + // Only HTTP will push a stream. + scoped_refptr<SpdyHttpStream> stream = GetPushStream(path); if (stream) { DCHECK(streams_pushed_and_claimed_count_ < streams_pushed_count_); // Update the request time @@ -385,18 +387,18 @@ scoped_refptr<SpdyStream> SpdySession::GetOrCreateStream( // Server will assign a stream id when the push stream arrives. Use 0 for // now. net_log_.AddEvent(NetLog::TYPE_SPDY_STREAM_ADOPTED_PUSH_STREAM, NULL); - SpdyStream* stream = new SpdyStream(this, 0, true); + stream = new SpdyHttpStream(this, 0, true); stream->SetRequestInfo(request); stream->set_path(path); stream->set_net_log(stream_net_log); it->second = stream; - return it->second; + return stream; } const spdy::SpdyStreamId stream_id = GetNewStreamId(); // If we still don't have a stream, activate one now. - stream = new SpdyStream(this, stream_id, false); + stream = new SpdyHttpStream(this, stream_id, false); stream->SetRequestInfo(request); stream->set_priority(request.priority); stream->set_path(path); @@ -878,9 +880,9 @@ void SpdySession::DeactivateStream(spdy::SpdyStreamId id) { DCHECK(IsStreamActive(id)); // Verify it is not on the pushed_streams_ list. - ActiveStreamList::iterator it; + ActivePushedStreamList::iterator it; for (it = pushed_streams_.begin(); it != pushed_streams_.end(); ++it) { - scoped_refptr<SpdyStream> curr = *it; + scoped_refptr<SpdyHttpStream> curr = *it; if (id == curr->stream_id()) { pushed_streams_.erase(it); break; @@ -897,15 +899,16 @@ void SpdySession::RemoveFromPool() { } } -scoped_refptr<SpdyStream> SpdySession::GetPushStream(const std::string& path) { +scoped_refptr<SpdyHttpStream> SpdySession::GetPushStream( + const std::string& path) { static StatsCounter used_push_streams("spdy.claimed_push_streams"); LOG(INFO) << "Looking for push stream: " << path; - scoped_refptr<SpdyStream> stream; + scoped_refptr<SpdyHttpStream> stream; // We just walk a linear list here. - ActiveStreamList::iterator it; + ActivePushedStreamList::iterator it; for (it = pushed_streams_.begin(); it != pushed_streams_.end(); ++it) { stream = *it; if (path == stream->path()) { @@ -937,7 +940,7 @@ void SpdySession::OnStreamFrameData(spdy::SpdyStreamId stream_id, const char* data, size_t len) { LOG(INFO) << "Spdy data for stream " << stream_id << ", " << len << " bytes"; - + if (!IsStreamActive(stream_id)) { // NOTE: it may just be that the stream was cancelled. LOG(WARNING) << "Received data frame for invalid stream " << stream_id; @@ -1027,7 +1030,8 @@ void SpdySession::OnSyn(const spdy::SpdySynStreamControlFrame& frame, return; } - scoped_refptr<SpdyStream> stream; + // Only HTTP push a stream. + scoped_refptr<SpdyHttpStream> stream; // Check if we already have a delegate awaiting this stream. PendingStreamMap::iterator it; @@ -1050,7 +1054,7 @@ void SpdySession::OnSyn(const spdy::SpdySynStreamControlFrame& frame, stream_id)); } } else { - stream = new SpdyStream(this, stream_id, true); + stream = new SpdyHttpStream(this, stream_id, true); if (net_log_.HasListener()) { net_log_.AddEvent( diff --git a/net/spdy/spdy_session.h b/net/spdy/spdy_session.h index f9f9e6f..4772c58 100644 --- a/net/spdy/spdy_session.h +++ b/net/spdy/spdy_session.h @@ -31,6 +31,7 @@ namespace net { +class SpdyHttpStream; class SpdyStream; class HttpNetworkSession; struct HttpRequestInfo; @@ -64,7 +65,7 @@ class SpdySession : public base::RefCounted<SpdySession>, // might also not have initiated the stream yet, but indicated it will via // X-Associated-Content. // Returns the new or existing stream. Never returns NULL. - scoped_refptr<SpdyStream> GetOrCreateStream( + scoped_refptr<SpdyHttpStream> GetOrCreateStream( const HttpRequestInfo& request, const UploadDataStream* upload_data, const BoundNetLog& stream_net_log); @@ -106,8 +107,10 @@ class SpdySession : public base::RefCounted<SpdySession>, }; typedef std::map<int, scoped_refptr<SpdyStream> > ActiveStreamMap; - typedef std::list<scoped_refptr<SpdyStream> > ActiveStreamList; - typedef std::map<std::string, scoped_refptr<SpdyStream> > PendingStreamMap; + // Only HTTP push a stream. + typedef std::list<scoped_refptr<SpdyHttpStream> > ActivePushedStreamList; + typedef std::map<std::string, scoped_refptr<SpdyHttpStream> > + PendingStreamMap; typedef std::priority_queue<SpdyIOBuffer> OutputQueue; virtual ~SpdySession(); @@ -170,7 +173,7 @@ class SpdySession : public base::RefCounted<SpdySession>, // Check if we have a pending pushed-stream for this url // Returns the stream if found (and returns it from the pending // list), returns NULL otherwise. - scoped_refptr<SpdyStream> GetPushStream(const std::string& url); + scoped_refptr<SpdyHttpStream> GetPushStream(const std::string& url); // Creates an HttpResponseInfo instance, and calls OnResponseReceived(). // Returns true if successful. @@ -218,7 +221,7 @@ class SpdySession : public base::RefCounted<SpdySession>, ActiveStreamMap active_streams_; // List of all the streams that have already started to be pushed by the // server, but do not have consumers yet. - ActiveStreamList pushed_streams_; + ActivePushedStreamList pushed_streams_; // List of streams declared in X-Associated-Content headers, but do not have // consumers yet. // The key is a string representing the path of the URI being pushed. diff --git a/net/spdy/spdy_session_unittest.cc b/net/spdy/spdy_session_unittest.cc index f26e060..bbcf8d1 100644 --- a/net/spdy/spdy_session_unittest.cc +++ b/net/spdy/spdy_session_unittest.cc @@ -12,6 +12,7 @@ #include "net/http/http_response_info.h" #include "net/proxy/proxy_service.h" #include "net/socket/socket_test_util.h" +#include "net/spdy/spdy_http_stream.h" #include "net/spdy/spdy_session.h" #include "net/spdy/spdy_session_pool.h" #include "net/spdy/spdy_stream.h" @@ -169,7 +170,7 @@ class StreamCanceler { STATE_DONE }; - explicit StreamCanceler(const scoped_refptr<SpdyStream>& stream) + explicit StreamCanceler(const scoped_refptr<SpdyHttpStream>& stream) : stream_(stream), ALLOW_THIS_IN_INITIALIZER_LIST( callback_(this, &StreamCanceler::OnIOComplete)), @@ -214,7 +215,7 @@ class StreamCanceler { } } - const scoped_refptr<SpdyStream> stream_; + const scoped_refptr<SpdyHttpStream> stream_; CompletionCallbackImpl<StreamCanceler> callback_; scoped_refptr<IOBufferWithSize> buf_; State state_; @@ -260,7 +261,7 @@ TEST_F(SpdySessionTest, CancelStreamOnClose) { HttpRequestInfo request; request.url = GURL("http://www.google.com"); - scoped_refptr<SpdyStream> stream = + scoped_refptr<SpdyHttpStream> stream = session->GetOrCreateStream(request, NULL, BoundNetLog()); TCPSocketParams tcp_params(kTestHost, kTestPort, MEDIUM, GURL(), false); int rv = session->Connect(kTestHost, tcp_params, MEDIUM); @@ -331,9 +332,9 @@ TEST_F(SpdySessionTest, GetPushStream) { // No push streams should exist in the beginning. std::string test_push_path = "/foo.js"; - scoped_refptr<SpdyStream> first_stream = session->GetPushStream( + scoped_refptr<SpdyHttpStream> first_stream = session->GetPushStream( test_push_path); - EXPECT_EQ(static_cast<SpdyStream*>(NULL), first_stream.get()); + EXPECT_EQ(static_cast<SpdyHttpStream*>(NULL), first_stream.get()); // Read in the data which contains a server-issued SYN_STREAM. TCPSocketParams tcp_params(test_host_port_pair, MEDIUM, GURL(), false); @@ -342,14 +343,14 @@ TEST_F(SpdySessionTest, GetPushStream) { MessageLoop::current()->RunAllPending(); // An unpushed path should not work. - scoped_refptr<SpdyStream> unpushed_stream = session->GetPushStream( + scoped_refptr<SpdyHttpStream> unpushed_stream = session->GetPushStream( "/unpushed_path"); - EXPECT_EQ(static_cast<SpdyStream*>(NULL), unpushed_stream.get()); + EXPECT_EQ(static_cast<SpdyHttpStream*>(NULL), unpushed_stream.get()); // The pushed path should be found. - scoped_refptr<SpdyStream> second_stream = session->GetPushStream( + scoped_refptr<SpdyHttpStream> second_stream = session->GetPushStream( test_push_path); - ASSERT_NE(static_cast<SpdyStream*>(NULL), second_stream.get()); + ASSERT_NE(static_cast<SpdyHttpStream*>(NULL), second_stream.get()); EXPECT_EQ(test_push_path, second_stream->path()); EXPECT_EQ(2U, second_stream->stream_id()); EXPECT_EQ(0, second_stream->priority()); diff --git a/net/spdy/spdy_stream.cc b/net/spdy/spdy_stream.cc index 74ad637..359217e0 100644 --- a/net/spdy/spdy_stream.cc +++ b/net/spdy/spdy_stream.cc @@ -6,6 +6,7 @@ #include "base/logging.h" #include "base/message_loop.h" +#include "base/singleton.h" #include "net/http/http_request_info.h" #include "net/http/http_response_info.h" #include "net/spdy/spdy_session.h" @@ -17,24 +18,17 @@ SpdyStream::SpdyStream( : stream_id_(stream_id), priority_(0), pushed_(pushed), - download_finished_(false), metrics_(Singleton<BandwidthMetrics>::get()), session_(session), request_time_(base::Time::Now()), response_(NULL), - request_body_stream_(NULL), response_complete_(false), io_state_(STATE_NONE), response_status_(OK), - user_callback_(NULL), - user_buffer_(NULL), - user_buffer_len_(0), cancelled_(false), send_bytes_(0), recv_bytes_(0), - histograms_recorded_(false), - buffered_read_callback_pending_(false), - more_read_data_pending_(false) {} + histograms_recorded_(false) {} SpdyStream::~SpdyStream() { DLOG(INFO) << "Deleting SpdyStream for stream " << stream_id_; @@ -50,13 +44,6 @@ SpdyStream::~SpdyStream() { } } -uint64 SpdyStream::GetUploadProgress() const { - if (!request_body_stream_.get()) - return 0; - - return request_body_stream_->position(); -} - const HttpResponseInfo* SpdyStream::GetResponseInfo() const { return response_; } @@ -90,128 +77,7 @@ void SpdyStream::SetRequestTime(base::Time t) { response_->request_time = request_time_; } -int SpdyStream::ReadResponseHeaders(CompletionCallback* callback) { - // Note: The SpdyStream may have already received the response headers, so - // this call may complete synchronously. - CHECK(callback); - CHECK_EQ(STATE_NONE, io_state_); - CHECK(!cancelled_); - - // The SYN_REPLY has already been received. - if (response_->headers) - return OK; - - io_state_ = STATE_READ_HEADERS; - CHECK(!user_callback_); - user_callback_ = callback; - return ERR_IO_PENDING; -} - -int SpdyStream::ReadResponseBody( - IOBuffer* buf, int buf_len, CompletionCallback* callback) { - DCHECK_EQ(io_state_, STATE_NONE); - CHECK(buf); - CHECK(buf_len); - CHECK(callback); - CHECK(!cancelled_); - - // If we have data buffered, complete the IO immediately. - if (response_body_.size()) { - int bytes_read = 0; - while (response_body_.size() && buf_len > 0) { - scoped_refptr<IOBufferWithSize> data = response_body_.front(); - const int bytes_to_copy = std::min(buf_len, data->size()); - memcpy(&(buf->data()[bytes_read]), data->data(), bytes_to_copy); - buf_len -= bytes_to_copy; - if (bytes_to_copy == data->size()) { - response_body_.pop_front(); - } else { - const int bytes_remaining = data->size() - bytes_to_copy; - IOBufferWithSize* new_buffer = new IOBufferWithSize(bytes_remaining); - memcpy(new_buffer->data(), &(data->data()[bytes_to_copy]), - bytes_remaining); - response_body_.pop_front(); - response_body_.push_front(new_buffer); - } - bytes_read += bytes_to_copy; - } - if (bytes_read > 0) - recv_bytes_ += bytes_read; - return bytes_read; - } else if (response_complete_) { - return response_status_; - } - - CHECK(!user_callback_); - CHECK(!user_buffer_); - CHECK_EQ(0, user_buffer_len_); - - user_callback_ = callback; - user_buffer_ = buf; - user_buffer_len_ = buf_len; - return ERR_IO_PENDING; -} - -int SpdyStream::SendRequest(UploadDataStream* upload_data, - HttpResponseInfo* response, - CompletionCallback* callback) { - CHECK(callback); - CHECK(!cancelled_); - CHECK(response); - - // SendRequest can be called in two cases. - // - // a) A client initiated request. In this case, response_ should be NULL - // to start with. - // b) A client request which matches a response that the server has already - // pushed. In this case, the value of |*push_response_| is copied over to - // the new response object |*response|. |push_response_| is cleared - // and |*push_response_| is deleted, and |response_| is reset to - // |response|. - if (push_response_.get()) { - *response = *push_response_; - push_response_.reset(NULL); - response_ = NULL; - } - - DCHECK_EQ(static_cast<HttpResponseInfo*>(NULL), response_); - response_ = response; - - if (upload_data) { - if (upload_data->size()) - request_body_stream_.reset(upload_data); - else - delete upload_data; - } - - send_time_ = base::TimeTicks::Now(); - - DCHECK_EQ(io_state_, STATE_NONE); - if (!pushed_) - io_state_ = STATE_SEND_HEADERS; - else { - if (response_->headers) { - io_state_ = STATE_READ_BODY; - } else { - io_state_ = STATE_READ_HEADERS; - } - } - int result = DoLoop(OK); - if (result == ERR_IO_PENDING) { - CHECK(!user_callback_); - user_callback_ = callback; - } - return result; -} - -void SpdyStream::Cancel() { - cancelled_ = true; - user_callback_ = NULL; - - session_->CancelStream(stream_id_); -} - -int SpdyStream::OnResponseReceived(const HttpResponseInfo& response) { +int SpdyStream::DoOnResponseReceived(const HttpResponseInfo& response) { int rv = OK; metrics_.StartStream(); @@ -241,13 +107,10 @@ int SpdyStream::OnResponseReceived(const HttpResponseInfo& response) { rv = DoLoop(rv); - if (user_callback_) - DoCallback(rv); - return rv; } -bool SpdyStream::OnDataReceived(const char* data, int length) { +bool SpdyStream::DoOnDataReceived(const char* data, int length) { DCHECK_GE(length, 0); LOG(INFO) << "SpdyStream: Data (" << length << " bytes) received for " << stream_id_; @@ -265,13 +128,8 @@ bool SpdyStream::OnDataReceived(const char* data, int length) { // A zero-length read means that the stream is being closed. if (!length) { metrics_.StopStream(); - download_finished_ = true; - response_complete_ = true; - - // We need to complete any pending buffered read now. - DoBufferedReadCallback(); - OnClose(net::OK); + UpdateHistograms(); return true; } @@ -280,24 +138,10 @@ bool SpdyStream::OnDataReceived(const char* data, int length) { recv_bytes_ += length; recv_last_byte_time_ = base::TimeTicks::Now(); - // Save the received data. - IOBufferWithSize* io_buffer = new IOBufferWithSize(length); - memcpy(io_buffer->data(), data, length); - response_body_.push_back(io_buffer); - - // Note that data may be received for a SpdyStream prior to the user calling - // ReadResponseBody(), therefore user_buffer_ may be NULL. This may often - // happen for server initiated streams. - if (user_buffer_) { - // Handing small chunks of data to the caller creates measurable overhead. - // We buffer data in short time-spans and send a single read notification. - ScheduleBufferedReadCallback(); - } - return true; } -void SpdyStream::OnWriteComplete(int status) { +void SpdyStream::DoOnWriteComplete(int status) { // TODO(mbelshe): Check for cancellation here. If we're cancelled, we // should discontinue the DoLoop. @@ -311,15 +155,73 @@ void SpdyStream::OnWriteComplete(int status) { DoLoop(status); } -void SpdyStream::OnClose(int status) { +void SpdyStream::DoOnClose(int status) { response_complete_ = true; response_status_ = status; stream_id_ = 0; +} + +void SpdyStream::DoCancel() { + cancelled_ = true; + session_->CancelStream(stream_id_); +} + +int SpdyStream::DoSendRequest(UploadDataStream* upload_data, + HttpResponseInfo* response) { + CHECK(!cancelled_); + CHECK(response); + + // SendRequest can be called in two cases. + // + // a) A client initiated request. In this case, response_ should be NULL + // to start with. + // b) A client request which matches a response that the server has already + // pushed. In this case, the value of |*push_response_| is copied over to + // the new response object |*response|. |push_response_| is cleared + // and |*push_response_| is deleted, and |response_| is reset to + // |response|. + if (push_response_.get()) { + *response = *push_response_; + push_response_.reset(NULL); + response_ = NULL; + } + + DCHECK_EQ(static_cast<HttpResponseInfo*>(NULL), response_); + response_ = response; + + if (upload_data) { + if (upload_data->size()) + request_body_stream_.reset(upload_data); + else + delete upload_data; + } + + send_time_ = base::TimeTicks::Now(); + + DCHECK_EQ(io_state_, STATE_NONE); + if (!pushed_) + io_state_ = STATE_SEND_HEADERS; + else { + if (response_->headers) { + io_state_ = STATE_READ_BODY; + } else { + io_state_ = STATE_READ_HEADERS; + } + } + return DoLoop(OK); +} + +int SpdyStream::DoReadResponseHeaders() { + CHECK_EQ(STATE_NONE, io_state_); + CHECK(!cancelled_); - if (user_callback_) - DoCallback(status); + // The SYN_REPLY has already been received. + if (response_->headers) + return OK; - UpdateHistograms(); + io_state_ = STATE_READ_HEADERS; + // Q: do we need to run DoLoop here? + return ERR_IO_PENDING; } int SpdyStream::DoLoop(int result) { @@ -380,74 +282,6 @@ int SpdyStream::DoLoop(int result) { return result; } -void SpdyStream::ScheduleBufferedReadCallback() { - // If there is already a scheduled DoBufferedReadCallback, don't issue - // another one. Mark that we have received more data and return. - if (buffered_read_callback_pending_) { - more_read_data_pending_ = true; - return; - } - - more_read_data_pending_ = false; - buffered_read_callback_pending_ = true; - const int kBufferTimeMs = 1; - MessageLoop::current()->PostDelayedTask(FROM_HERE, NewRunnableMethod( - this, &SpdyStream::DoBufferedReadCallback), kBufferTimeMs); -} - -// Checks to see if we should wait for more buffered data before notifying -// the caller. Returns true if we should wait, false otherwise. -bool SpdyStream::ShouldWaitForMoreBufferedData() const { - // If the response is complete, there is no point in waiting. - if (response_complete_) - return false; - - int bytes_buffered = 0; - std::list<scoped_refptr<IOBufferWithSize> >::const_iterator it; - for (it = response_body_.begin(); - it != response_body_.end() && bytes_buffered < user_buffer_len_; - ++it) - bytes_buffered += (*it)->size(); - - return bytes_buffered < user_buffer_len_; -} - -void SpdyStream::DoBufferedReadCallback() { - buffered_read_callback_pending_ = false; - - // If the transaction is cancelled or errored out, we don't need to complete - // the read. - if (response_status_ != OK || cancelled_) - return; - - // When more_read_data_pending_ is true, it means that more data has - // arrived since we started waiting. Wait a little longer and continue - // to buffer. - if (more_read_data_pending_ && ShouldWaitForMoreBufferedData()) { - ScheduleBufferedReadCallback(); - return; - } - - int rv = 0; - if (user_buffer_) { - rv = ReadResponseBody(user_buffer_, user_buffer_len_, user_callback_); - CHECK_NE(rv, ERR_IO_PENDING); - user_buffer_ = NULL; - user_buffer_len_ = 0; - DoCallback(rv); - } -} - -void SpdyStream::DoCallback(int rv) { - CHECK_NE(rv, ERR_IO_PENDING); - CHECK(user_callback_); - - // Since Run may result in being called back, clear user_callback_ in advance. - CompletionCallback* c = user_callback_; - user_callback_ = NULL; - c->Run(rv); -} - int SpdyStream::DoSendHeaders() { // The SpdySession will always call us back when the send is complete. // TODO(willchan): This code makes the assumption that for the non-push stream diff --git a/net/spdy/spdy_stream.h b/net/spdy/spdy_stream.h index bfd79d0..1792cf1 100644 --- a/net/spdy/spdy_stream.h +++ b/net/spdy/spdy_stream.h @@ -6,29 +6,25 @@ #define NET_SPDY_SPDY_STREAM_H_ #include <string> -#include <list> #include "base/basictypes.h" #include "base/ref_counted.h" #include "base/scoped_ptr.h" -#include "base/singleton.h" #include "net/base/bandwidth_metrics.h" -#include "net/base/completion_callback.h" #include "net/base/io_buffer.h" #include "net/base/net_log.h" #include "net/http/http_request_info.h" -#include "net/spdy/spdy_framer.h" #include "net/spdy/spdy_protocol.h" namespace net { class HttpResponseInfo; class SpdySession; -class UploadData; class UploadDataStream; // The SpdyStream is used by the SpdySession to represent each stream known -// on the SpdySession. +// on the SpdySession. This class provides interfaces for SpdySession to use +// and base implementations for the interfaces. // Streams can be created either by the client or by the server. When they // are initiated by the client, both the SpdySession and client object (such as // a SpdyNetworkTransaction) will maintain a reference to the stream. When @@ -39,46 +35,9 @@ class SpdyStream : public base::RefCounted<SpdyStream> { // SpdyStream constructor SpdyStream(SpdySession* session, spdy::SpdyStreamId stream_id, bool pushed); - // Ideally I'd use two abstract classes as interfaces for these two sections, - // but since we're ref counted, I can't make both abstract classes inherit - // from RefCounted or we'll have two separate ref counts for the same object. - // TODO(willchan): Consider using linked_ptr here orcreating proxy wrappers - // for SpdyStream to provide the appropriate interface. - - // =================================================== - // Interface for [Http|Spdy]NetworkTransaction to use. - - // Sends the request. If |upload_data| is non-NULL, sends that in the request - // body. |callback| is used when this completes asynchronously. Note that - // the actual SYN_STREAM packet will have already been sent by this point. - // Also note that SpdyStream takes ownership of |upload_data|. - int SendRequest(UploadDataStream* upload_data, - HttpResponseInfo* response, - CompletionCallback* callback); - - // Reads the response headers. Returns a net error code. - int ReadResponseHeaders(CompletionCallback* callback); - - // Reads the response body. Returns a net error code or the number of bytes - // read. - int ReadResponseBody( - IOBuffer* buf, int buf_len, CompletionCallback* callback); - - // Cancels the stream. Note that this does not immediately cause deletion of - // the stream. This function is used to cancel any callbacks from being - // invoked. TODO(willchan): It should also free up any memory associated with - // the stream, such as IOBuffers. - void Cancel(); - - // Returns the number of bytes uploaded. - uint64 GetUploadProgress() const; - // Is this stream a pushed stream from the server. bool pushed() const { return pushed_; } - // ================================= - // Interface for SpdySession to use. - spdy::SpdyStreamId stream_id() const { return stream_id_; } void set_stream_id(spdy::SpdyStreamId stream_id) { stream_id_ = stream_id; } @@ -102,7 +61,7 @@ class SpdyStream : public base::RefCounted<SpdyStream> { // received for this stream. |path| is the path of the URL for a server // initiated stream, otherwise is empty. // Returns a status code. - int OnResponseReceived(const HttpResponseInfo& response); + virtual int OnResponseReceived(const HttpResponseInfo& response) = 0; // Called by the SpdySession when response data has been received for this // stream. This callback may be called multiple times as data arrives @@ -112,28 +71,63 @@ class SpdyStream : public base::RefCounted<SpdyStream> { // |length| is the number of bytes received or an error. // A zero-length count does not indicate end-of-stream. // Returns true on success and false on error. - bool OnDataReceived(const char* buffer, int bytes); + virtual bool OnDataReceived(const char* buffer, int bytes) = 0; // Called by the SpdySession when a write has completed. This callback // will be called multiple times for each write which completes. Writes // include the SYN_STREAM write and also DATA frame writes. // |result| is the number of bytes written or a net error code. - void OnWriteComplete(int status); + virtual void OnWriteComplete(int status) = 0; // Called by the SpdySession when the request is finished. This callback // will always be called at the end of the request and signals to the // stream that the stream has no more network events. No further callbacks // to the stream will be made after this call. // |status| is an error code or OK. - void OnClose(int status); + virtual void OnClose(int status) = 0; + virtual void Cancel() = 0; bool cancelled() const { return cancelled_; } void SetPushResponse(HttpResponseInfo* response_info); - private: + protected: friend class base::RefCounted<SpdyStream>; + virtual ~SpdyStream(); + + int DoOnResponseReceived(const HttpResponseInfo& response); + bool DoOnDataReceived(const char* buffer,int bytes); + void DoOnWriteComplete(int status); + void DoOnClose(int status); + + void DoCancel(); + + // Sends the request. If |upload_data| is non-NULL, sends that in the + // request body. Note that the actual SYN_STREAM packet will have already + // been sent by this point. + // Note that SpdyStream takes ownership of |upload_data|. + // TODO(ukai): move out HTTP-specific thing to SpdyHttpStream. + int DoSendRequest(UploadDataStream* upload_data, + HttpResponseInfo* response_info); + + // Reads response headers. If the SpdyStream have already received + // the response headers, return OK and response headers filled in + // |response_info| given in SendRequest. + // Otherwise, return ERR_IO_PENDING. + int DoReadResponseHeaders(); + + const UploadDataStream* request_body_stream() const { + return request_body_stream_.get(); + } + + bool is_idle() const { return io_state_ == STATE_NONE; } + bool response_complete() const { return response_complete_; } + void set_response_complete(bool response_complete) { + response_complete_ = response_complete; + } + int response_status() const { return response_status_; } + private: enum State { STATE_NONE, STATE_SEND_HEADERS, @@ -147,18 +141,9 @@ class SpdyStream : public base::RefCounted<SpdyStream> { STATE_DONE }; - ~SpdyStream(); - // Try to make progress sending/receiving the request/response. int DoLoop(int result); - // Call the user callback. - void DoCallback(int rv); - - void ScheduleBufferedReadCallback(); - void DoBufferedReadCallback(); - bool ShouldWaitForMoreBufferedData() const; - // The implementations of each state of the state machine. int DoSendHeaders(); int DoSendHeadersComplete(int result); @@ -177,10 +162,6 @@ class SpdyStream : public base::RefCounted<SpdyStream> { std::string path_; int priority_; const bool pushed_; - // We buffer the response body as it arrives asynchronously from the stream. - // TODO(mbelshe): is this infinite buffering? - std::list<scoped_refptr<IOBufferWithSize> > response_body_; - bool download_finished_; ScopedBandwidthMetrics metrics_; scoped_refptr<SpdySession> session_; @@ -213,12 +194,6 @@ class SpdyStream : public base::RefCounted<SpdyStream> { // Not valid until response_complete_ is true. int response_status_; - CompletionCallback* user_callback_; - - // User provided buffer for the ReadResponseBody() response. - scoped_refptr<IOBuffer> user_buffer_; - int user_buffer_len_; - bool cancelled_; BoundNetLog net_log_; @@ -230,12 +205,6 @@ class SpdyStream : public base::RefCounted<SpdyStream> { int recv_bytes_; bool histograms_recorded_; - // Is there a scheduled read callback pending. - bool buffered_read_callback_pending_; - // Has more data been received from the network during the wait for the - // scheduled read callback. - bool more_read_data_pending_; - DISALLOW_COPY_AND_ASSIGN(SpdyStream); }; |