diff options
author | erikchen@google.com <erikchen@google.com@0039d316-1c4b-4281-b951-d872f2087c98> | 2010-06-24 17:59:46 +0000 |
---|---|---|
committer | erikchen@google.com <erikchen@google.com@0039d316-1c4b-4281-b951-d872f2087c98> | 2010-06-24 17:59:46 +0000 |
commit | 9be804c808196ed8ae63b4b78b743b212aa993f0 (patch) | |
tree | 1338a7874917e7e24159171b2f68c5b3d6f85d45 | |
parent | 5beca81467eb19e2b7c24ee25d61aae9d294b967 (diff) | |
download | chromium_src-9be804c808196ed8ae63b4b78b743b212aa993f0.zip chromium_src-9be804c808196ed8ae63b4b78b743b212aa993f0.tar.gz chromium_src-9be804c808196ed8ae63b4b78b743b212aa993f0.tar.bz2 |
Fixed bug where streams do not shutdown properly after the user callback deletes the stream.
Added a unit test.
TEST=net_unittests
BUG=46925
Merge branch 'trunk' of http://src.chromium.org/git/chromium into ukai_delegate_fix
Revert "Revert 50215 because of crashes - Refactor SpdyStream to get HTTP specific out of the interface and members."
This reverts commit 8f9bf3b9ba6663aeef7fbdab3edf16aeaa510f84.
Review URL: http://codereview.chromium.org/2810022
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@50739 0039d316-1c4b-4281-b951-d872f2087c98
-rw-r--r-- | net/http/http_network_transaction.cc | 38 | ||||
-rw-r--r-- | net/http/http_network_transaction.h | 2 | ||||
-rw-r--r-- | net/spdy/spdy_http_stream.cc | 289 | ||||
-rw-r--r-- | net/spdy/spdy_http_stream.h | 67 | ||||
-rw-r--r-- | net/spdy/spdy_http_stream_unittest.cc | 11 | ||||
-rw-r--r-- | net/spdy/spdy_network_transaction.cc | 26 | ||||
-rw-r--r-- | net/spdy/spdy_network_transaction.h | 8 | ||||
-rw-r--r-- | net/spdy/spdy_network_transaction_unittest.cc | 70 | ||||
-rw-r--r-- | net/spdy/spdy_session.cc | 253 | ||||
-rw-r--r-- | net/spdy/spdy_session.h | 49 | ||||
-rw-r--r-- | net/spdy/spdy_session_unittest.cc | 14 | ||||
-rw-r--r-- | net/spdy/spdy_stream.cc | 161 | ||||
-rw-r--r-- | net/spdy/spdy_stream.h | 137 |
13 files changed, 680 insertions, 445 deletions
diff --git a/net/http/http_network_transaction.cc b/net/http/http_network_transaction.cc index c2c7cbe..d65a892 100644 --- a/net/http/http_network_transaction.cc +++ b/net/http/http_network_transaction.cc @@ -463,10 +463,10 @@ int HttpNetworkTransaction::Read(IOBuffer* buf, int buf_len, // Are we using SPDY or HTTP? if (using_spdy_) { DCHECK(!http_stream_.get()); - DCHECK(spdy_stream_->GetResponseInfo()->headers); + DCHECK(spdy_http_stream_->GetResponseInfo()->headers); next_state = STATE_SPDY_READ_BODY; } else { - DCHECK(!spdy_stream_.get()); + DCHECK(!spdy_http_stream_.get()); next_state = STATE_READ_BODY; if (!connection_->is_initialized()) @@ -549,8 +549,8 @@ HttpNetworkTransaction::~HttpNetworkTransaction() { if (pac_request_) session_->proxy_service()->CancelPacRequest(pac_request_); - if (spdy_stream_.get()) - spdy_stream_->Cancel(); + if (spdy_http_stream_.get()) + spdy_http_stream_->Cancel(); } void HttpNetworkTransaction::DoCallback(int rv) { @@ -1470,7 +1470,7 @@ int HttpNetworkTransaction::DoDrainBodyForAuthRestartComplete(int result) { int HttpNetworkTransaction::DoSpdySendRequest() { next_state_ = STATE_SPDY_SEND_REQUEST_COMPLETE; - CHECK(!spdy_stream_.get()); + CHECK(!spdy_http_stream_.get()); // First we get a SPDY session. Theoretically, we've just negotiated one, but // if one already exists, then screw it, use the existing one! Otherwise, @@ -1503,9 +1503,25 @@ int HttpNetworkTransaction::DoSpdySendRequest() { return error_code; } headers_valid_ = false; - spdy_stream_ = spdy_session->GetOrCreateStream( - *request_, upload_data, net_log_); - return spdy_stream_->SendRequest(upload_data, &response_, &io_callback_); + scoped_refptr<SpdyStream> spdy_stream; + if (request_->method == "GET") + spdy_stream = spdy_session->GetPushStream(request_->url, net_log_); + if (spdy_stream.get()) { + DCHECK(spdy_stream->pushed()); + CHECK(spdy_stream->GetDelegate() == NULL); + spdy_http_stream_.reset(new SpdyHttpStream(spdy_stream)); + spdy_http_stream_->InitializeRequest(*request_, base::Time::Now(), NULL); + } else { + spdy_stream = spdy_session->CreateStream(request_->url, + request_->priority, + net_log_); + DCHECK(!spdy_stream->pushed()); + CHECK(spdy_stream->GetDelegate() == NULL); + spdy_http_stream_.reset(new SpdyHttpStream(spdy_stream)); + spdy_http_stream_->InitializeRequest( + *request_, base::Time::Now(), upload_data); + } + return spdy_http_stream_->SendRequest(&response_, &io_callback_); } int HttpNetworkTransaction::DoSpdySendRequestComplete(int result) { @@ -1518,7 +1534,7 @@ int HttpNetworkTransaction::DoSpdySendRequestComplete(int result) { int HttpNetworkTransaction::DoSpdyReadHeaders() { next_state_ = STATE_SPDY_READ_HEADERS_COMPLETE; - return spdy_stream_->ReadResponseHeaders(&io_callback_); + return spdy_http_stream_->ReadResponseHeaders(&io_callback_); } int HttpNetworkTransaction::DoSpdyReadHeadersComplete(int result) { @@ -1534,7 +1550,7 @@ int HttpNetworkTransaction::DoSpdyReadHeadersComplete(int result) { int HttpNetworkTransaction::DoSpdyReadBody() { next_state_ = STATE_SPDY_READ_BODY_COMPLETE; - return spdy_stream_->ReadResponseBody( + return spdy_http_stream_->ReadResponseBody( read_buf_, read_buf_len_, &io_callback_); } @@ -1543,7 +1559,7 @@ int HttpNetworkTransaction::DoSpdyReadBodyComplete(int result) { read_buf_len_ = 0; if (result <= 0) - spdy_stream_ = NULL; + spdy_http_stream_.reset() ; return result; } diff --git a/net/http/http_network_transaction.h b/net/http/http_network_transaction.h index f02deb4..a59f908 100644 --- a/net/http/http_network_transaction.h +++ b/net/http/http_network_transaction.h @@ -279,7 +279,7 @@ class HttpNetworkTransaction : public HttpTransaction { scoped_ptr<ClientSocketHandle> connection_; scoped_ptr<HttpStream> http_stream_; - scoped_refptr<SpdyHttpStream> spdy_stream_; + scoped_ptr<SpdyHttpStream> spdy_http_stream_; bool reused_socket_; // True if we've validated the headers that the stream parser has returned. diff --git a/net/spdy/spdy_http_stream.cc b/net/spdy/spdy_http_stream.cc index a8979d5..bb22b9d 100644 --- a/net/spdy/spdy_http_stream.cc +++ b/net/spdy/spdy_http_stream.cc @@ -8,39 +8,187 @@ #include "base/logging.h" #include "base/message_loop.h" +#include "base/string_util.h" +#include "net/base/load_flags.h" #include "net/http/http_request_info.h" #include "net/http/http_response_info.h" #include "net/spdy/spdy_session.h" +namespace { + +// Convert a SpdyHeaderBlock into an HttpResponseInfo. +// |headers| input parameter with the SpdyHeaderBlock. +// |info| output parameter for the HttpResponseInfo. +// Returns true if successfully converted. False if there was a failure +// or if the SpdyHeaderBlock was invalid. +bool SpdyHeadersToHttpResponse(const spdy::SpdyHeaderBlock& headers, + net::HttpResponseInfo* response) { + std::string version; + std::string status; + + // The "status" and "version" headers are required. + spdy::SpdyHeaderBlock::const_iterator it; + it = headers.find("status"); + if (it == headers.end()) { + LOG(ERROR) << "SpdyHeaderBlock without status header."; + return false; + } + status = it->second; + + // Grab the version. If not provided by the server, + it = headers.find("version"); + if (it == headers.end()) { + LOG(ERROR) << "SpdyHeaderBlock without version header."; + return false; + } + version = it->second; + + response->response_time = base::Time::Now(); + + std::string raw_headers(version); + raw_headers.push_back(' '); + raw_headers.append(status); + raw_headers.push_back('\0'); + for (it = headers.begin(); it != headers.end(); ++it) { + // For each value, if the server sends a NUL-separated + // list of values, we separate that back out into + // individual headers for each value in the list. + // e.g. + // Set-Cookie "foo\0bar" + // becomes + // Set-Cookie: foo\0 + // Set-Cookie: bar\0 + std::string value = it->second; + size_t start = 0; + size_t end = 0; + do { + end = value.find('\0', start); + std::string tval; + if (end != value.npos) + tval = value.substr(start, (end - start)); + else + tval = value.substr(start); + raw_headers.append(it->first); + raw_headers.push_back(':'); + raw_headers.append(tval); + raw_headers.push_back('\0'); + start = end + 1; + } while (end != value.npos); + } + + response->headers = new net::HttpResponseHeaders(raw_headers); + response->was_fetched_via_spdy = true; + return true; +} + +// Create a SpdyHeaderBlock for a Spdy SYN_STREAM Frame from +// a HttpRequestInfo block. +void CreateSpdyHeadersFromHttpRequest( + const net::HttpRequestInfo& info, spdy::SpdyHeaderBlock* headers) { + // TODO(willchan): It's not really necessary to convert from + // HttpRequestHeaders to spdy::SpdyHeaderBlock. + + static const char kHttpProtocolVersion[] = "HTTP/1.1"; + + net::HttpRequestHeaders::Iterator it(info.extra_headers); + + while (it.GetNext()) { + std::string name = StringToLowerASCII(it.name()); + if (headers->find(name) == headers->end()) { + (*headers)[name] = it.value(); + } else { + std::string new_value = (*headers)[name]; + new_value.append(1, '\0'); // +=() doesn't append 0's + new_value += it.value(); + (*headers)[name] = new_value; + } + } + + // TODO(mbelshe): Add Proxy headers here. (See http_network_transaction.cc) + // TODO(mbelshe): Add authentication headers here. + + (*headers)["method"] = info.method; + (*headers)["url"] = info.url.spec(); + (*headers)["version"] = kHttpProtocolVersion; + if (!info.referrer.is_empty()) + (*headers)["referer"] = info.referrer.spec(); + + // Honor load flags that impact proxy caches. + if (info.load_flags & net::LOAD_BYPASS_CACHE) { + (*headers)["pragma"] = "no-cache"; + (*headers)["cache-control"] = "no-cache"; + } else if (info.load_flags & net::LOAD_VALIDATE_CACHE) { + (*headers)["cache-control"] = "max-age=0"; + } +} + +} // anonymous namespace + namespace net { -SpdyHttpStream::SpdyHttpStream( - SpdySession* session, spdy::SpdyStreamId stream_id, bool pushed) - : SpdyStream(session, stream_id, pushed), +SpdyHttpStream::SpdyHttpStream(const scoped_refptr<SpdyStream>& stream) + : ALLOW_THIS_IN_INITIALIZER_LIST(read_callback_factory_(this)), + stream_(stream), + response_info_(NULL), download_finished_(false), user_callback_(NULL), user_buffer_len_(0), buffered_read_callback_pending_(false), - more_read_data_pending_(false) {} + more_read_data_pending_(false) { + CHECK(stream_.get()); + stream_->SetDelegate(this); +} SpdyHttpStream::~SpdyHttpStream() { - DLOG(INFO) << "Deleting SpdyHttpStream for stream " << stream_id(); + stream_->DetachDelegate(); +} + +void SpdyHttpStream::InitializeRequest( + const HttpRequestInfo& request_info, + base::Time request_time, + UploadDataStream* upload_data) { + request_info_ = request_info; + linked_ptr<spdy::SpdyHeaderBlock> headers(new spdy::SpdyHeaderBlock); + CreateSpdyHeadersFromHttpRequest(request_info_, headers.get()); + stream_->set_spdy_headers(headers); + + stream_->SetRequestTime(request_time); + // This should only get called in the case of a request occuring + // during server push that has already begun but hasn't finished, + // so we set the response's request time to be the actual one + if (response_info_) + response_info_->request_time = request_time; + + CHECK(!request_body_stream_.get()); + if (upload_data) { + if (upload_data->size()) + request_body_stream_.reset(upload_data); + else + delete upload_data; + } +} + +const HttpResponseInfo* SpdyHttpStream::GetResponseInfo() const { + return response_info_; } uint64 SpdyHttpStream::GetUploadProgress() const { - if (!request_body_stream()) + if (!request_body_stream_.get()) return 0; - return request_body_stream()->position(); + return request_body_stream_->position(); } int SpdyHttpStream::ReadResponseHeaders(CompletionCallback* callback) { - DCHECK(is_idle()); + DCHECK(stream_->is_idle()); // Note: The SpdyStream may have already received the response headers, so // this call may complete synchronously. CHECK(callback); - int result = DoReadResponseHeaders(); + if (stream_->response_complete()) + return stream_->response_status(); + + int result = stream_->DoReadResponseHeaders(); if (result == ERR_IO_PENDING) { CHECK(!user_callback_); user_callback_ = callback; @@ -50,11 +198,10 @@ int SpdyHttpStream::ReadResponseHeaders(CompletionCallback* callback) { int SpdyHttpStream::ReadResponseBody( IOBuffer* buf, int buf_len, CompletionCallback* callback) { - DCHECK(is_idle()); CHECK(buf); CHECK(buf_len); CHECK(callback); - CHECK(!cancelled()); + DCHECK(stream_->is_idle()); // If we have data buffered, complete the IO immediately. if (!response_body_.empty()) { @@ -77,8 +224,8 @@ int SpdyHttpStream::ReadResponseBody( bytes_read += bytes_to_copy; } return bytes_read; - } else if (response_complete()) { - return response_status(); + } else if (stream_->response_complete()) { + return stream_->response_status(); } CHECK(!user_callback_); @@ -91,14 +238,38 @@ int SpdyHttpStream::ReadResponseBody( return ERR_IO_PENDING; } -int SpdyHttpStream::SendRequest(UploadDataStream* upload_data, - HttpResponseInfo* response, +int SpdyHttpStream::SendRequest(HttpResponseInfo* response, CompletionCallback* callback) { CHECK(callback); - CHECK(!cancelled()); + CHECK(!stream_->cancelled()); CHECK(response); - int result = DoSendRequest(upload_data, response); + if (stream_->response_complete()) { + if (stream_->response_status() == OK) + return ERR_FAILED; + else + return stream_->response_status(); + } + + // SendRequest can be called in two cases. + // + // a) A client initiated request. In this case, |response_info_| should be + // NULL to start with. + // b) A client request which matches a response that the server has already + // pushed. In this case, the value of |*push_response_info_| is copied + // over to the new response object |*response|. |push_response_info_| is + // deleted, and |response_info_| is reset |response|. + if (push_response_info_.get()) { + *response = *push_response_info_; + push_response_info_.reset(); + response_info_ = NULL; + } + + DCHECK_EQ(static_cast<HttpResponseInfo*>(NULL), response_info_); + response_info_ = response; + + bool has_upload_data = request_body_stream_.get() != NULL; + int result = stream_->DoSendRequest(has_upload_data); if (result == ERR_IO_PENDING) { CHECK(!user_callback_); user_callback_ = callback; @@ -108,22 +279,58 @@ int SpdyHttpStream::SendRequest(UploadDataStream* upload_data, void SpdyHttpStream::Cancel() { user_callback_ = NULL; - DoCancel(); + stream_->Cancel(); +} + +bool SpdyHttpStream::OnSendHeadersComplete(int status) { + return request_body_stream_.get() == NULL; +} + +int SpdyHttpStream::OnSendBody() { + CHECK(request_body_stream_.get()); + int buf_len = static_cast<int>(request_body_stream_->buf_len()); + if (!buf_len) + return OK; + return stream_->WriteStreamData(request_body_stream_->buf(), buf_len); +} + +bool SpdyHttpStream::OnSendBodyComplete(int status) { + CHECK(request_body_stream_.get()); + request_body_stream_->DidConsume(status); + return request_body_stream_->eof(); } -int SpdyHttpStream::OnResponseReceived(const HttpResponseInfo& response) { - int rv = DoOnResponseReceived(response); +int SpdyHttpStream::OnResponseReceived(const spdy::SpdyHeaderBlock& response, + base::Time response_time, + int status) { + if (!response_info_) { + DCHECK(stream_->pushed()); + push_response_info_.reset(new HttpResponseInfo); + response_info_ = push_response_info_.get(); + } + + if (!SpdyHeadersToHttpResponse(response, response_info_)) { + status = ERR_INVALID_RESPONSE; + } else { + stream_->GetSSLInfo(&response_info_->ssl_info, + &response_info_->was_npn_negotiated); + response_info_->request_time = stream_->GetRequestTime(); + response_info_->vary_data.Init(request_info_, *response_info_->headers); + // TODO(ahendrickson): This is recorded after the entire SYN_STREAM control + // frame has been received and processed. Move to framer? + response_info_->response_time = response_time; + } + if (user_callback_) - DoCallback(rv); - return rv; + DoCallback(status); + return status; } -bool SpdyHttpStream::OnDataReceived(const char* data, int length) { - bool result = DoOnDataReceived(data, length); +void SpdyHttpStream::OnDataReceived(const char* data, int length) { // Note that data may be received for a SpdyStream prior to the user calling // ReadResponseBody(), therefore user_buffer_ may be NULL. This may often // happen for server initiated streams. - if (result && !response_complete()) { + if (length > 0 && !stream_->response_complete()) { // Save the received data. IOBufferWithSize* io_buffer = new IOBufferWithSize(length); memcpy(io_buffer->data(), data, length); @@ -135,23 +342,15 @@ bool SpdyHttpStream::OnDataReceived(const char* data, int length) { ScheduleBufferedReadCallback(); } } - return result; -} - -void SpdyHttpStream::OnWriteComplete(int status) { - DoOnWriteComplete(status); } void SpdyHttpStream::OnClose(int status) { + bool invoked_callback = false; if (status == net::OK) { - download_finished_ = true; - set_response_complete(true); - // We need to complete any pending buffered read now. - DoBufferedReadCallback(); + invoked_callback = DoBufferedReadCallback(); } - DoOnClose(status); - if (user_callback_) + if (!invoked_callback && user_callback_) DoCallback(status); } @@ -166,15 +365,16 @@ void SpdyHttpStream::ScheduleBufferedReadCallback() { more_read_data_pending_ = false; buffered_read_callback_pending_ = true; const int kBufferTimeMs = 1; - MessageLoop::current()->PostDelayedTask(FROM_HERE, NewRunnableMethod( - this, &SpdyHttpStream::DoBufferedReadCallback), kBufferTimeMs); + MessageLoop::current()->PostDelayedTask(FROM_HERE, read_callback_factory_. + NewRunnableMethod(&SpdyHttpStream::DoBufferedReadCallback), + kBufferTimeMs); } // Checks to see if we should wait for more buffered data before notifying // the caller. Returns true if we should wait, false otherwise. bool SpdyHttpStream::ShouldWaitForMoreBufferedData() const { // If the response is complete, there is no point in waiting. - if (response_complete()) + if (stream_->response_complete()) return false; int bytes_buffered = 0; @@ -187,20 +387,21 @@ bool SpdyHttpStream::ShouldWaitForMoreBufferedData() const { return bytes_buffered < user_buffer_len_; } -void SpdyHttpStream::DoBufferedReadCallback() { +bool SpdyHttpStream::DoBufferedReadCallback() { + read_callback_factory_.RevokeAll(); buffered_read_callback_pending_ = false; // If the transaction is cancelled or errored out, we don't need to complete // the read. - if (response_status() != OK || cancelled()) - return; + if (!stream_ || stream_->response_status() != OK || stream_->cancelled()) + return false; // When more_read_data_pending_ is true, it means that more data has // arrived since we started waiting. Wait a little longer and continue // to buffer. if (more_read_data_pending_ && ShouldWaitForMoreBufferedData()) { ScheduleBufferedReadCallback(); - return; + return false; } int rv = 0; @@ -210,7 +411,9 @@ void SpdyHttpStream::DoBufferedReadCallback() { user_buffer_ = NULL; user_buffer_len_ = 0; DoCallback(rv); + return true; } + return false; } void SpdyHttpStream::DoCallback(int rv) { diff --git a/net/spdy/spdy_http_stream.h b/net/spdy/spdy_http_stream.h index 11c00e0..495cac9e 100644 --- a/net/spdy/spdy_http_stream.h +++ b/net/spdy/spdy_http_stream.h @@ -9,8 +9,10 @@ #include "base/basictypes.h" #include "base/ref_counted.h" +#include "base/task.h" #include "net/base/completion_callback.h" #include "net/base/net_log.h" +#include "net/http/http_request_info.h" #include "net/spdy/spdy_protocol.h" #include "net/spdy/spdy_stream.h" @@ -23,21 +25,29 @@ class UploadData; class UploadDataStream; // The SpdyHttpStream is a HTTP-specific type of stream known to a SpdySession. -class SpdyHttpStream : public SpdyStream { +class SpdyHttpStream : public SpdyStream::Delegate { public: // SpdyHttpStream constructor - SpdyHttpStream( - SpdySession* session, spdy::SpdyStreamId stream_id, bool pushed); + explicit SpdyHttpStream(const scoped_refptr<SpdyStream>& stream); + virtual ~SpdyHttpStream(); + + SpdyStream* stream() { return stream_.get(); } + + // Initialize request. Must be called before calling SendRequest(). + // SpdyHttpStream takes ownership of |upload_data|. |upload_data| may be NULL. + void InitializeRequest(const HttpRequestInfo& request_info, + base::Time request_time, + UploadDataStream* upload_data); + + const HttpResponseInfo* GetResponseInfo() const; // =================================================== // Interface for [Http|Spdy]NetworkTransaction to use. - // Sends the request. If |upload_data| is non-NULL, sends that in the request - // body. |callback| is used when this completes asynchronously. Note that - // the actual SYN_STREAM packet will have already been sent by this point. - // Also note that SpdyStream takes ownership of |upload_data|. - int SendRequest(UploadDataStream* upload_data, - HttpResponseInfo* response, + // Sends the request. + // |callback| is used when this completes asynchronously. + // The actual SYN_STREAM packet will be sent if the stream is non-pushed. + int SendRequest(HttpResponseInfo* response, CompletionCallback* callback); // Reads the response headers. Returns a net error code. @@ -55,24 +65,25 @@ class SpdyHttpStream : public SpdyStream { uint64 GetUploadProgress() const; // =================================================== - // Interface for SpdySession to use. + // SpdyStream::Delegate. + + virtual bool OnSendHeadersComplete(int status); + virtual int OnSendBody(); + virtual bool OnSendBodyComplete(int status); // Called by the SpdySession when a response (e.g. a SYN_REPLY) has been // received for this stream. // SpdyHttpSession calls back |callback| set by SendRequest or // ReadResponseHeaders. - virtual int OnResponseReceived(const HttpResponseInfo& response); + virtual int OnResponseReceived(const spdy::SpdyHeaderBlock& response, + base::Time response_time, + int status); // Called by the SpdySession when response data has been received for this // stream. This callback may be called multiple times as data arrives // from the network, and will never be called prior to OnResponseReceived. // SpdyHttpSession schedule to call back |callback| set by ReadResponseBody. - virtual bool OnDataReceived(const char* buffer, int bytes); - - // Called by the SpdySession when a write has completed. This callback - // will be called multiple times for each write which completes. Writes - // include the SYN_STREAM write and also DATA frame writes. - virtual void OnWriteComplete(int status); + virtual void OnDataReceived(const char* buffer, int bytes); // Called by the SpdySession when the request is finished. This callback // will always be called at the end of the request and signals to the @@ -83,16 +94,30 @@ class SpdyHttpStream : public SpdyStream { virtual void OnClose(int status); private: - friend class base::RefCounted<SpdyHttpStream>; - virtual ~SpdyHttpStream(); - // Call the user callback. void DoCallback(int rv); void ScheduleBufferedReadCallback(); - void DoBufferedReadCallback(); + + // Returns true if the callback is invoked. + bool DoBufferedReadCallback(); bool ShouldWaitForMoreBufferedData() const; + ScopedRunnableMethodFactory<SpdyHttpStream> read_callback_factory_; + const scoped_refptr<SpdyStream> stream_; + + // The request to send. + HttpRequestInfo request_info_; + + scoped_ptr<UploadDataStream> request_body_stream_; + + // |response_info_| is the HTTP response data object which is filled in + // when a SYN_REPLY comes in for the stream. + // It is not owned by this stream object, or point to |push_response_info_|. + HttpResponseInfo* response_info_; + + scoped_ptr<HttpResponseInfo> push_response_info_; + bool download_finished_; // We buffer the response body as it arrives asynchronously from the stream. diff --git a/net/spdy/spdy_http_stream_unittest.cc b/net/spdy/spdy_http_stream_unittest.cc index 5d20aa9a..c3b321a 100644 --- a/net/spdy/spdy_http_stream_unittest.cc +++ b/net/spdy/spdy_http_stream_unittest.cc @@ -4,8 +4,10 @@ #include "net/spdy/spdy_http_stream.h" #include "base/ref_counted.h" +#include "base/time.h" #include "net/base/mock_host_resolver.h" #include "net/base/net_errors.h" +#include "net/base/net_log.h" #include "net/base/ssl_config_service.h" #include "net/base/ssl_config_service_defaults.h" #include "net/base/test_completion_callback.h" @@ -115,10 +117,13 @@ TEST_F(SpdyHttpStreamTest, SendRequest) { TestCompletionCallback callback; HttpResponseInfo response; - scoped_refptr<SpdyHttpStream> stream(new SpdyHttpStream(session, 1, false)); - stream->SetRequestInfo(request); + scoped_refptr<SpdyStream> stream( + session->CreateStream(request.url, HIGHEST, BoundNetLog())); + + scoped_ptr<SpdyHttpStream> http_stream(new SpdyHttpStream(stream.get())); + http_stream->InitializeRequest(request, base::Time::Now(), NULL); EXPECT_EQ(ERR_IO_PENDING, - stream->SendRequest(NULL, &response, &callback)); + http_stream->SendRequest(&response, &callback)); // Need to manually remove the spdy session since normally it gets removed on // socket close/error, but we aren't communicating over a socket here. diff --git a/net/spdy/spdy_network_transaction.cc b/net/spdy/spdy_network_transaction.cc index e79d889..5872a38 100644 --- a/net/spdy/spdy_network_transaction.cc +++ b/net/spdy/spdy_network_transaction.cc @@ -256,15 +256,31 @@ int SpdyNetworkTransaction::DoSendRequest() { if (!upload_data) return error_code; } - stream_ = spdy_->GetOrCreateStream(*request_, upload_data, net_log_); - // Release the reference to |spdy_| since we don't need it anymore. + scoped_refptr<SpdyStream> spdy_stream; + if (request_->method == "GET") + spdy_stream = spdy_->GetPushStream(request_->url, net_log_); + if (spdy_stream.get()) { + DCHECK(spdy_stream->pushed()); + CHECK(spdy_stream->GetDelegate() == NULL); + stream_.reset(new SpdyHttpStream(spdy_stream)); + stream_->InitializeRequest(*request_, base::Time::Now(), NULL); + // "vary" field? + } else { + spdy_stream = spdy_->CreateStream(request_->url, + request_->priority, + net_log_); + DCHECK(!spdy_stream->pushed()); + CHECK(spdy_stream->GetDelegate() == NULL); + stream_.reset(new SpdyHttpStream(spdy_stream)); + stream_->InitializeRequest(*request_, base::Time::Now(), upload_data); + } spdy_ = NULL; - return stream_->SendRequest(upload_data, &response_, &io_callback_); + return stream_->SendRequest(&response_, &io_callback_); } int SpdyNetworkTransaction::DoSendRequestComplete(int result) { if (result < 0) { - stream_ = NULL; + stream_.reset() ; return result; } @@ -294,7 +310,7 @@ int SpdyNetworkTransaction::DoReadBodyComplete(int result) { user_buffer_len_ = 0; if (result <= 0) - stream_ = NULL; + stream_.reset(); return result; } diff --git a/net/spdy/spdy_network_transaction.h b/net/spdy/spdy_network_transaction.h index eb22781..12b055c 100644 --- a/net/spdy/spdy_network_transaction.h +++ b/net/spdy/spdy_network_transaction.h @@ -52,12 +52,6 @@ class SpdyNetworkTransaction : public HttpTransaction { virtual LoadState GetLoadState() const; virtual uint64 GetUploadProgress() const; - protected: - friend class SpdyNetworkTransactionTest; - - // Provide access to the session for testing. - SpdySession* GetSpdySession() { return spdy_.get(); } - private: enum State { STATE_INIT_CONNECTION, @@ -112,7 +106,7 @@ class SpdyNetworkTransaction : public HttpTransaction { // The next state in the state machine. State next_state_; - scoped_refptr<SpdyHttpStream> stream_; + scoped_ptr<SpdyHttpStream> stream_; DISALLOW_COPY_AND_ASSIGN(SpdyNetworkTransaction); }; diff --git a/net/spdy/spdy_network_transaction_unittest.cc b/net/spdy/spdy_network_transaction_unittest.cc index e7e2a3e..810c5e4 100644 --- a/net/spdy/spdy_network_transaction_unittest.cc +++ b/net/spdy/spdy_network_transaction_unittest.cc @@ -19,6 +19,7 @@ #include "net/proxy/proxy_config_service_fixed.h" #include "net/socket/socket_test_util.h" #include "net/spdy/spdy_framer.h" +#include "net/spdy/spdy_http_stream.h" #include "net/spdy/spdy_protocol.h" #include "net/spdy/spdy_test_util.h" #include "testing/platform_test.h" @@ -720,6 +721,75 @@ TEST_F(SpdyNetworkTransactionTest, CancelledTransaction) { MessageLoop::current()->RunAllPending(); } +class DeleteSessionCallback : public CallbackRunner< Tuple1<int> > { + public: + explicit DeleteSessionCallback(SpdyNetworkTransaction* trans1) : + trans(trans1) {} + + // We kill the transaction, which deletes the session and stream. However, the + // memory is still accessible, so we also have to zero out the memory of the + // stream. This is not a well defined operation, and can cause failures. + virtual void RunWithParams(const Tuple1<int>& params) { + delete trans; + } + + private: + const SpdyNetworkTransaction* trans; +}; + +// Verify that the client can correctly deal with the user callback deleting the +// transaction. Failures will usually be valgrind errors. See +// http://crbug.com/46925 +TEST_F(SpdyNetworkTransactionTest, DeleteSessionOnReadCallback) { + MockWrite writes[] = { + MockWrite(true, reinterpret_cast<const char*>(kGetSyn), + arraysize(kGetSyn), 1), + }; + + MockRead reads[] = { + MockRead(true, reinterpret_cast<const char*>(kGetSynReply), + arraysize(kGetSynReply), 2), + MockRead(true, ERR_IO_PENDING, 3), // Force a pause + MockRead(true, reinterpret_cast<const char*>(kGetBodyFrame), + arraysize(kGetBodyFrame), 4), + MockRead(true, 0, 0, 5), // EOF + }; + + HttpRequestInfo request; + request.method = "GET"; + request.url = GURL("http://www.google.com/"); + request.load_flags = 0; + + // We disable SSL for this test. + SpdySession::SetSSLMode(false); + + SessionDependencies session_deps; + SpdyNetworkTransaction * trans = + new SpdyNetworkTransaction(CreateSession(&session_deps)); + scoped_refptr<OrderedSocketData> data( + new OrderedSocketData(reads, arraysize(reads), + writes, arraysize(writes))); + session_deps.socket_factory.AddSocketDataProvider(data); + + // Start the transaction with basic parameters. + TestCompletionCallback callback; + int rv = trans->Start(&request, &callback, BoundNetLog()); + EXPECT_EQ(ERR_IO_PENDING, rv); + rv = callback.WaitForResult(); + + // Setup a user callback which will delete the session, and clear out the + // memory holding the stream object. Note that the callback deletes trans. + DeleteSessionCallback callback2(trans); + const int kSize = 3000; + scoped_refptr<net::IOBuffer> buf = new net::IOBuffer(kSize); + rv = trans->Read(buf, kSize, &callback2); + ASSERT_EQ(ERR_IO_PENDING, rv); + data->CompleteRead(); + + // Finish running rest of tasks. + MessageLoop::current()->RunAllPending(); +} + // Verify that various SynReply headers parse correctly through the // HTTP layer. TEST_F(SpdyNetworkTransactionTest, SynReplyHeaders) { diff --git a/net/spdy/spdy_session.cc b/net/spdy/spdy_session.cc index 2e771e7..3795bc1 100644 --- a/net/spdy/spdy_session.cc +++ b/net/spdy/spdy_session.cc @@ -19,14 +19,10 @@ #include "net/base/net_log.h" #include "net/base/net_util.h" #include "net/http/http_network_session.h" -#include "net/http/http_request_info.h" -#include "net/http/http_response_headers.h" -#include "net/http/http_response_info.h" #include "net/socket/client_socket.h" #include "net/socket/client_socket_factory.h" #include "net/socket/ssl_client_socket.h" #include "net/spdy/spdy_frame_builder.h" -#include "net/spdy/spdy_http_stream.h" #include "net/spdy/spdy_protocol.h" #include "net/spdy/spdy_settings_storage.h" #include "net/spdy/spdy_stream.h" @@ -66,112 +62,6 @@ const int kReadBufferSize = 2 * 1024; const int kReadBufferSize = 8 * 1024; #endif -// Convert a SpdyHeaderBlock into an HttpResponseInfo. -// |headers| input parameter with the SpdyHeaderBlock. -// |info| output parameter for the HttpResponseInfo. -// Returns true if successfully converted. False if there was a failure -// or if the SpdyHeaderBlock was invalid. -bool SpdyHeadersToHttpResponse(const spdy::SpdyHeaderBlock& headers, - HttpResponseInfo* response) { - std::string version; - std::string status; - - // The "status" and "version" headers are required. - spdy::SpdyHeaderBlock::const_iterator it; - it = headers.find("status"); - if (it == headers.end()) { - LOG(ERROR) << "SpdyHeaderBlock without status header."; - return false; - } - status = it->second; - - // Grab the version. If not provided by the server, - it = headers.find("version"); - if (it == headers.end()) { - LOG(ERROR) << "SpdyHeaderBlock without version header."; - return false; - } - version = it->second; - - response->response_time = base::Time::Now(); - - std::string raw_headers(version); - raw_headers.push_back(' '); - raw_headers.append(status); - raw_headers.push_back('\0'); - for (it = headers.begin(); it != headers.end(); ++it) { - // For each value, if the server sends a NUL-separated - // list of values, we separate that back out into - // individual headers for each value in the list. - // e.g. - // Set-Cookie "foo\0bar" - // becomes - // Set-Cookie: foo\0 - // Set-Cookie: bar\0 - std::string value = it->second; - size_t start = 0; - size_t end = 0; - do { - end = value.find('\0', start); - std::string tval; - if (end != value.npos) - tval = value.substr(start, (end - start)); - else - tval = value.substr(start); - raw_headers.append(it->first); - raw_headers.push_back(':'); - raw_headers.append(tval); - raw_headers.push_back('\0'); - start = end + 1; - } while (end != value.npos); - } - - response->headers = new HttpResponseHeaders(raw_headers); - response->was_fetched_via_spdy = true; - return true; -} - -// Create a SpdyHeaderBlock for a Spdy SYN_STREAM Frame from -// a HttpRequestInfo block. -void CreateSpdyHeadersFromHttpRequest( - const HttpRequestInfo& info, spdy::SpdyHeaderBlock* headers) { - // TODO(willchan): It's not really necessary to convert from - // HttpRequestHeaders to spdy::SpdyHeaderBlock. - - static const char kHttpProtocolVersion[] = "HTTP/1.1"; - - HttpRequestHeaders::Iterator it(info.extra_headers); - - while (it.GetNext()) { - std::string name = StringToLowerASCII(it.name()); - if (headers->find(name) == headers->end()) { - (*headers)[name] = it.value(); - } else { - std::string new_value = (*headers)[name]; - new_value.append(1, '\0'); // +=() doesn't append 0's - new_value += it.value(); - (*headers)[name] = new_value; - } - } - - // TODO(mbelshe): Add Proxy headers here. (See http_network_transaction.cc) - // TODO(mbelshe): Add authentication headers here. - - (*headers)["method"] = info.method; - (*headers)["url"] = info.url.spec(); - (*headers)["version"] = kHttpProtocolVersion; - if (!info.referrer.is_empty()) - (*headers)["referer"] = info.referrer.spec(); - - // Honor load flags that impact proxy caches. - if (info.load_flags & LOAD_BYPASS_CACHE) { - (*headers)["pragma"] = "no-cache"; - (*headers)["cache-control"] = "no-cache"; - } else if (info.load_flags & LOAD_VALIDATE_CACHE) { - (*headers)["cache-control"] = "max-age=0"; - } -} - void AdjustSocketBufferSizes(ClientSocket* socket) { // Adjust socket buffer sizes. // SPDY uses one socket, and we want a really big buffer. @@ -347,43 +237,21 @@ net::Error SpdySession::Connect(const std::string& group_name, return static_cast<net::Error>(rv); } -scoped_refptr<SpdyHttpStream> SpdySession::GetOrCreateStream( - const HttpRequestInfo& request, - const UploadDataStream* upload_data, +scoped_refptr<SpdyStream> SpdySession::GetPushStream( + const GURL& url, const BoundNetLog& stream_net_log) { CHECK_NE(state_, CLOSED); - const GURL& url = request.url; const std::string& path = url.PathForRequest(); - scoped_refptr<SpdyHttpStream> stream; - - // Check if we have a push stream for this path. - if (request.method == "GET") { - // Only HTTP will push a stream. - scoped_refptr<SpdyHttpStream> stream = GetPushStream(path); - if (stream) { - DCHECK(streams_pushed_and_claimed_count_ < streams_pushed_count_); - // Update the request time - stream->SetRequestTime(base::Time::Now()); - // Change the request info, updating the response's request time too - stream->SetRequestInfo(request); - const HttpResponseInfo* response = stream->GetResponseInfo(); - if (response && response->headers->HasHeader("vary")) { - // TODO(ahendrickson) -- What is the right thing to do if the server - // pushes data with a vary field? - void* iter = NULL; - std::string value; - response->headers->EnumerateHeader(&iter, "vary", &value); - LOG(ERROR) << "SpdyStream: " - << "Received pushed stream ID " << stream->stream_id() - << "with vary field value '" << value << "'"; - } - streams_pushed_and_claimed_count_++; - return stream; - } + scoped_refptr<SpdyStream> stream = GetActivePushStream(path); + if (stream) { + DCHECK(streams_pushed_and_claimed_count_ < streams_pushed_count_); + streams_pushed_and_claimed_count_++; + return stream; } // Check if we have a pending push stream for this url. + // Note that we shouldn't have a pushed stream for non-GET method. PendingStreamMap::iterator it; it = pending_streams_.find(path); if (it != pending_streams_.end()) { @@ -392,63 +260,73 @@ scoped_refptr<SpdyHttpStream> SpdySession::GetOrCreateStream( // Server will assign a stream id when the push stream arrives. Use 0 for // now. net_log_.AddEvent(NetLog::TYPE_SPDY_STREAM_ADOPTED_PUSH_STREAM, NULL); - stream = new SpdyHttpStream(this, 0, true); - stream->SetRequestInfo(request); + stream = new SpdyStream(this, 0, true); stream->set_path(path); stream->set_net_log(stream_net_log); it->second = stream; return stream; } + return NULL; +} + +const scoped_refptr<SpdyStream>& SpdySession::CreateStream( + const GURL& url, + RequestPriority priority, + const BoundNetLog& stream_net_log) { + const std::string& path = url.PathForRequest(); const spdy::SpdyStreamId stream_id = GetNewStreamId(); - // If we still don't have a stream, activate one now. - stream = new SpdyHttpStream(this, stream_id, false); - stream->SetRequestInfo(request); - stream->set_priority(request.priority); + scoped_refptr<SpdyStream> stream(new SpdyStream(this, stream_id, false)); + + stream->set_priority(priority); stream->set_path(path); stream->set_net_log(stream_net_log); ActivateStream(stream); UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyPriorityCount", - static_cast<int>(request.priority), 0, 10, 11); + static_cast<int>(priority), 0, 10, 11); LOG(INFO) << "SpdyStream: Creating stream " << stream_id << " for " << url; - // TODO(mbelshe): Optimize memory allocations - DCHECK(request.priority >= SPDY_PRIORITY_HIGHEST && - request.priority <= SPDY_PRIORITY_LOWEST); + DCHECK(priority >= SPDY_PRIORITY_HIGHEST && + priority <= SPDY_PRIORITY_LOWEST); - // Convert from HttpRequestHeaders to Spdy Headers. - linked_ptr<spdy::SpdyHeaderBlock> headers(new spdy::SpdyHeaderBlock); - CreateSpdyHeadersFromHttpRequest(request, headers.get()); + DCHECK_EQ(active_streams_[stream_id].get(), stream.get()); + return active_streams_[stream_id]; +} - spdy::SpdyControlFlags flags = spdy::CONTROL_FLAG_NONE; - if (!request.upload_data || !upload_data->size()) - flags = spdy::CONTROL_FLAG_FIN; +int SpdySession::WriteSynStream( + spdy::SpdyStreamId stream_id, + RequestPriority priority, + spdy::SpdyControlFlags flags, + const linked_ptr<spdy::SpdyHeaderBlock>& headers) { + // Find our stream + if (!IsStreamActive(stream_id)) + return ERR_INVALID_SPDY_STREAM; + const scoped_refptr<SpdyStream>& stream = active_streams_[stream_id]; + CHECK_EQ(stream->stream_id(), stream_id); - // Create a SYN_STREAM packet and add to the output queue. scoped_ptr<spdy::SpdySynStreamControlFrame> syn_frame( - spdy_framer_.CreateSynStream(stream_id, 0, request.priority, flags, false, + spdy_framer_.CreateSynStream(stream_id, 0, priority, flags, false, headers.get())); - QueueFrame(syn_frame.get(), request.priority, stream); + QueueFrame(syn_frame.get(), priority, stream); static StatsCounter spdy_requests("spdy.requests"); spdy_requests.Increment(); - - LOG(INFO) << "FETCHING: " << request.url.spec(); streams_initiated_count_++; LOG(INFO) << "SPDY SYN_STREAM HEADERS ----------------------------------"; DumpSpdyHeaders(*headers); - if (stream_net_log.HasListener()) { - stream_net_log.AddEvent( + const BoundNetLog& log = stream->net_log(); + if (log.HasListener()) { + log.AddEvent( NetLog::TYPE_SPDY_STREAM_SYN_STREAM, new NetLogSpdySynParameter(headers, flags, stream_id)); } - return stream; + return ERR_IO_PENDING; } int SpdySession::WriteStreamData(spdy::SpdyStreamId stream_id, @@ -637,7 +515,6 @@ void SpdySession::OnWriteComplete(int result) { // We only notify the stream when we've fully written the pending frame. if (!in_flight_write_.buffer()->BytesRemaining()) { - scoped_refptr<SpdyStream> stream = in_flight_write_.stream(); if (stream) { // Report the number of bytes written to the caller, but exclude the // frame size overhead. NOTE: if this frame was compressed the @@ -875,7 +752,7 @@ void SpdySession::DeleteStream(spdy::SpdyStreamId id, int status) { // Remove the stream from pushed_streams_ and active_streams_. ActivePushedStreamList::iterator it; for (it = pushed_streams_.begin(); it != pushed_streams_.end(); ++it) { - scoped_refptr<SpdyHttpStream> curr = *it; + scoped_refptr<SpdyStream> curr = *it; if (id == curr->stream_id()) { pushed_streams_.erase(it); break; @@ -901,13 +778,13 @@ void SpdySession::RemoveFromPool() { } } -scoped_refptr<SpdyHttpStream> SpdySession::GetPushStream( +scoped_refptr<SpdyStream> SpdySession::GetActivePushStream( const std::string& path) { static StatsCounter used_push_streams("spdy.claimed_push_streams"); LOG(INFO) << "Looking for push stream: " << path; - scoped_refptr<SpdyHttpStream> stream; + scoped_refptr<SpdyStream> stream; // We just walk a linear list here. ActivePushedStreamList::iterator it; @@ -925,12 +802,15 @@ scoped_refptr<SpdyHttpStream> SpdySession::GetPushStream( return NULL; } -void SpdySession::GetSSLInfo(SSLInfo* ssl_info) { +bool SpdySession::GetSSLInfo(SSLInfo* ssl_info, bool* was_npn_negotiated) { if (is_secure_) { SSLClientSocket* ssl_socket = reinterpret_cast<SSLClientSocket*>(connection_->socket()); ssl_socket->GetSSLInfo(ssl_info); + *was_npn_negotiated = ssl_socket->wasNpnNegotiated(); + return true; } + return false; } void SpdySession::OnError(spdy::SpdyFramer* framer) { @@ -955,29 +835,9 @@ void SpdySession::OnStreamFrameData(spdy::SpdyStreamId stream_id, bool SpdySession::Respond(const spdy::SpdyHeaderBlock& headers, const scoped_refptr<SpdyStream> stream) { - // TODO(mbelshe): For now we convert from our nice hash map back - // to a string of headers; this is because the HttpResponseInfo - // is a bit rigid for its http (non-spdy) design. - HttpResponseInfo response; - // TODO(ahendrickson): This is recorded after the entire SYN_STREAM control - // frame has been received and processed. Move to framer? - response.response_time = base::Time::Now(); int rv = OK; - if (SpdyHeadersToHttpResponse(headers, &response)) { - GetSSLInfo(&response.ssl_info); - response.request_time = stream->GetRequestTime(); - response.vary_data.Init(*stream->GetRequestInfo(), *response.headers); - if (is_secure_) { - SSLClientSocket* ssl_socket = - reinterpret_cast<SSLClientSocket*>(connection_->socket()); - response.was_npn_negotiated = ssl_socket->wasNpnNegotiated(); - } - rv = stream->OnResponseReceived(response); - } else { - rv = ERR_INVALID_RESPONSE; - } - + rv = stream->OnResponseReceived(headers); if (rv < 0) { DCHECK_NE(rv, ERR_IO_PENDING); const spdy::SpdyStreamId stream_id = stream->stream_id(); @@ -1024,7 +884,7 @@ void SpdySession::OnSyn(const spdy::SpdySynStreamControlFrame& frame, } // Only HTTP push a stream. - scoped_refptr<SpdyHttpStream> stream; + scoped_refptr<SpdyStream> stream; // Check if we already have a delegate awaiting this stream. PendingStreamMap::iterator it; @@ -1047,7 +907,7 @@ void SpdySession::OnSyn(const spdy::SpdySynStreamControlFrame& frame, stream_id)); } } else { - stream = new SpdyHttpStream(this, stream_id, true); + stream = new SpdyStream(this, stream_id, true); if (net_log_.HasListener()) { net_log_.AddEvent( @@ -1056,15 +916,6 @@ void SpdySession::OnSyn(const spdy::SpdySynStreamControlFrame& frame, headers, static_cast<spdy::SpdyControlFlags>(frame.flags()), stream_id)); } - - // A new HttpResponseInfo object needs to be generated so the call to - // OnResponseReceived below has something to fill in. - // When a SpdyNetworkTransaction is created for this resource, the - // response_info is copied over and this version is destroyed. - // - // TODO(cbentzel): Minimize allocations and copies of HttpResponseInfo - // object. Should it just be part of SpdyStream? - stream->SetPushResponse(new HttpResponseInfo()); } pushed_streams_.push_back(stream); diff --git a/net/spdy/spdy_session.h b/net/spdy/spdy_session.h index 588b7fb..a46dad4 100644 --- a/net/spdy/spdy_session.h +++ b/net/spdy/spdy_session.h @@ -31,11 +31,8 @@ namespace net { -class SpdyHttpStream; class SpdyStream; class HttpNetworkSession; -struct HttpRequestInfo; -class HttpResponseInfo; class BoundNetLog; class SSLInfo; @@ -59,21 +56,33 @@ class SpdySession : public base::RefCounted<SpdySession>, const TCPSocketParams& destination, RequestPriority priority); - // Get a stream for a given |request|. In the typical case, this will involve - // the creation of a new stream (and will send the SYN frame). If the server - // initiates a stream, it might already exist for a given path. The server - // might also not have initiated the stream yet, but indicated it will via - // X-Associated-Content. - // Returns the new or existing stream. Never returns NULL. - scoped_refptr<SpdyHttpStream> GetOrCreateStream( - const HttpRequestInfo& request, - const UploadDataStream* upload_data, + // Get a pushed stream for a given |url|. + // If the server initiates a stream, it might already exist for a given path. + // The server might also not have initiated the stream yet, but indicated it + // will via X-Associated-Content. + // Returns existing stream or NULL. + scoped_refptr<SpdyStream> GetPushStream( + const GURL& url, + const BoundNetLog& stream_net_log); + + // Create a new stream for a given |url|. + // Returns the new stream. Never returns NULL. + const scoped_refptr<SpdyStream>& CreateStream( + const GURL& url, + RequestPriority priority, const BoundNetLog& stream_net_log); // Used by SpdySessionPool to initialize with a pre-existing SSL socket. // Returns OK on success, or an error on failure. net::Error InitializeWithSSLSocket(ClientSocketHandle* connection); + // Send the SYN frame for |stream_id|. + int WriteSynStream( + spdy::SpdyStreamId stream_id, + RequestPriority priority, + spdy::SpdyControlFlags flags, + const linked_ptr<spdy::SpdyHeaderBlock>& headers); + // Write a data frame to the stream. // Used to create and queue a data frame for the given stream. int WriteStreamData(spdy::SpdyStreamId stream_id, net::IOBuffer* data, @@ -92,13 +101,16 @@ class SpdySession : public base::RefCounted<SpdySession>, // Closes all streams. Used as part of shutdown. void CloseAllStreams(net::Error status); + // Fills SSL info in |ssl_info| and returns true when SSL is in use. + bool GetSSLInfo(SSLInfo* ssl_info, bool* was_npn_negotiated); + // Enable or disable SSL. static void SetSSLMode(bool enable) { use_ssl_ = enable; } static bool SSLMode() { return use_ssl_; } private: friend class base::RefCounted<SpdySession>; - FRIEND_TEST(SpdySessionTest, GetPushStream); + FRIEND_TEST(SpdySessionTest, GetActivePushStream); enum State { IDLE, @@ -109,9 +121,8 @@ class SpdySession : public base::RefCounted<SpdySession>, typedef std::map<int, scoped_refptr<SpdyStream> > ActiveStreamMap; // Only HTTP push a stream. - typedef std::list<scoped_refptr<SpdyHttpStream> > ActivePushedStreamList; - typedef std::map<std::string, scoped_refptr<SpdyHttpStream> > - PendingStreamMap; + typedef std::list<scoped_refptr<SpdyStream> > ActivePushedStreamList; + typedef std::map<std::string, scoped_refptr<SpdyStream> > PendingStreamMap; typedef std::priority_queue<SpdyIOBuffer> OutputQueue; virtual ~SpdySession(); @@ -175,15 +186,13 @@ class SpdySession : public base::RefCounted<SpdySession>, // Check if we have a pending pushed-stream for this url // Returns the stream if found (and returns it from the pending // list), returns NULL otherwise. - scoped_refptr<SpdyHttpStream> GetPushStream(const std::string& url); + scoped_refptr<SpdyStream> GetActivePushStream(const std::string& url); - // Creates an HttpResponseInfo instance, and calls OnResponseReceived(). + // Calls OnResponseReceived(). // Returns true if successful. bool Respond(const spdy::SpdyHeaderBlock& headers, const scoped_refptr<SpdyStream> stream); - void GetSSLInfo(SSLInfo* ssl_info); - void RecordHistograms(); // Callbacks for the Spdy session. diff --git a/net/spdy/spdy_session_unittest.cc b/net/spdy/spdy_session_unittest.cc index 1cefb49..67dcb94 100644 --- a/net/spdy/spdy_session_unittest.cc +++ b/net/spdy/spdy_session_unittest.cc @@ -179,7 +179,7 @@ static const uint8 kPush[] = { } // namespace -TEST_F(SpdySessionTest, GetPushStream) { +TEST_F(SpdySessionTest, GetActivePushStream) { SpdySessionTest::TurnOffCompression(); SessionDependencies session_deps; @@ -216,9 +216,9 @@ TEST_F(SpdySessionTest, GetPushStream) { // No push streams should exist in the beginning. std::string test_push_path = "/foo.js"; - scoped_refptr<SpdyHttpStream> first_stream = session->GetPushStream( + scoped_refptr<SpdyStream> first_stream = session->GetActivePushStream( test_push_path); - EXPECT_EQ(static_cast<SpdyHttpStream*>(NULL), first_stream.get()); + EXPECT_EQ(static_cast<SpdyStream*>(NULL), first_stream.get()); // Read in the data which contains a server-issued SYN_STREAM. TCPSocketParams tcp_params(test_host_port_pair, MEDIUM, GURL(), false); @@ -227,14 +227,14 @@ TEST_F(SpdySessionTest, GetPushStream) { MessageLoop::current()->RunAllPending(); // An unpushed path should not work. - scoped_refptr<SpdyHttpStream> unpushed_stream = session->GetPushStream( + scoped_refptr<SpdyStream> unpushed_stream = session->GetActivePushStream( "/unpushed_path"); - EXPECT_EQ(static_cast<SpdyHttpStream*>(NULL), unpushed_stream.get()); + EXPECT_EQ(static_cast<SpdyStream*>(NULL), unpushed_stream.get()); // The pushed path should be found. - scoped_refptr<SpdyHttpStream> second_stream = session->GetPushStream( + scoped_refptr<SpdyStream> second_stream = session->GetActivePushStream( test_push_path); - ASSERT_NE(static_cast<SpdyHttpStream*>(NULL), second_stream.get()); + ASSERT_NE(static_cast<SpdyStream*>(NULL), second_stream.get()); EXPECT_EQ(test_push_path, second_stream->path()); EXPECT_EQ(2U, second_stream->stream_id()); EXPECT_EQ(0, second_stream->priority()); diff --git a/net/spdy/spdy_stream.cc b/net/spdy/spdy_stream.cc index baff2b9..3fbde08 100644 --- a/net/spdy/spdy_stream.cc +++ b/net/spdy/spdy_stream.cc @@ -7,8 +7,6 @@ #include "base/logging.h" #include "base/message_loop.h" #include "base/singleton.h" -#include "net/http/http_request_info.h" -#include "net/http/http_response_info.h" #include "net/spdy/spdy_session.h" namespace net { @@ -20,8 +18,9 @@ SpdyStream::SpdyStream( pushed_(pushed), metrics_(Singleton<BandwidthMetrics>::get()), session_(session), + delegate_(NULL), request_time_(base::Time::Now()), - response_(NULL), + response_(new spdy::SpdyHeaderBlock), response_complete_(false), io_state_(STATE_NONE), response_status_(OK), @@ -39,23 +38,36 @@ SpdyStream::~SpdyStream() { DCHECK(response_complete_); } -const HttpResponseInfo* SpdyStream::GetResponseInfo() const { - return response_; +void SpdyStream::SetDelegate(Delegate* delegate) { + CHECK(delegate); + delegate_ = delegate; + + if (!response_->empty()) { + // The stream already got response. + delegate_->OnResponseReceived(*response_, response_time_, OK); + } + + std::vector<scoped_refptr<IOBufferWithSize> > buffers; + buffers.swap(pending_buffers_); + for (size_t i = 0; i < buffers.size(); ++i) { + if (delegate_) + delegate_->OnDataReceived(buffers[i]->data(), buffers[i]->size()); + } } -void SpdyStream::SetPushResponse(HttpResponseInfo* response_info) { - DCHECK(!response_); - DCHECK(!push_response_.get()); - push_response_.reset(response_info); - response_ = response_info; +void SpdyStream::DetachDelegate() { + delegate_ = NULL; + if (!cancelled()) + Cancel(); } -const HttpRequestInfo* SpdyStream::GetRequestInfo() const { - return &request_; +const linked_ptr<spdy::SpdyHeaderBlock>& SpdyStream::spdy_headers() const { + return request_; } -void SpdyStream::SetRequestInfo(const HttpRequestInfo& request) { - request_ = request; +void SpdyStream::set_spdy_headers( + const linked_ptr<spdy::SpdyHeaderBlock>& headers) { + request_ = headers; } base::Time SpdyStream::GetRequestTime() const { @@ -64,25 +76,20 @@ base::Time SpdyStream::GetRequestTime() const { void SpdyStream::SetRequestTime(base::Time t) { request_time_ = t; - - // This should only get called in the case of a request occuring - // during server push that has already begun but hasn't finished, - // so we set the response's request time to be the actual one - if (response_) - response_->request_time = request_time_; } -int SpdyStream::DoOnResponseReceived(const HttpResponseInfo& response) { +int SpdyStream::OnResponseReceived(const spdy::SpdyHeaderBlock& response) { int rv = OK; + LOG(INFO) << "OnResponseReceived"; metrics_.StartStream(); - CHECK(!response_->headers); - - *response_ = response; // TODO(mbelshe): avoid copy. - DCHECK(response_->headers); + DCHECK(response_->empty()); + *response_ = response; // TODO(ukai): avoid copy. + DCHECK(!response_->empty()); recv_first_byte_time_ = base::TimeTicks::Now(); + response_time_ = base::Time::Now(); if (io_state_ == STATE_NONE) { CHECK(pushed_); @@ -101,11 +108,15 @@ int SpdyStream::DoOnResponseReceived(const HttpResponseInfo& response) { } rv = DoLoop(rv); + if (delegate_) + rv = delegate_->OnResponseReceived(*response_, response_time_, rv); + // if delegate_ is not yet attached, we'll return response when delegate + // gets attached to the stream. return rv; } -bool SpdyStream::DoOnDataReceived(const char* data, int length) { +void SpdyStream::OnDataReceived(const char* data, int length) { DCHECK_GE(length, 0); LOG(INFO) << "SpdyStream: Data (" << length << " bytes) received for " << stream_id_; @@ -115,9 +126,9 @@ bool SpdyStream::DoOnDataReceived(const char* data, int length) { // If we don't have a response, then the SYN_REPLY did not come through. // We cannot pass data up to the caller unless the reply headers have been // received. - if (!response_->headers) { + if (response_->empty()) { session_->CloseStream(stream_id_, ERR_SYN_REPLY_NOT_RECEIVED); - return false; + return; } // A zero-length read means that the stream is being closed. @@ -126,7 +137,7 @@ bool SpdyStream::DoOnDataReceived(const char* data, int length) { scoped_refptr<SpdyStream> self(this); session_->CloseStream(stream_id_, net::OK); UpdateHistograms(); - return true; + return; } // Track our bandwidth. @@ -134,10 +145,19 @@ bool SpdyStream::DoOnDataReceived(const char* data, int length) { recv_bytes_ += length; recv_last_byte_time_ = base::TimeTicks::Now(); - return true; + if (!delegate_) { + // It should be valid for this to happen in the server push case. + // We'll return received data when delegate gets attached to the stream. + IOBufferWithSize* buf = new IOBufferWithSize(length); + memcpy(buf->data(), data, length); + pending_buffers_.push_back(buf); + return; + } + + delegate_->OnDataReceived(data, length); } -void SpdyStream::DoOnWriteComplete(int status) { +void SpdyStream::OnWriteComplete(int status) { // TODO(mbelshe): Check for cancellation here. If we're cancelled, we // should discontinue the DoLoop. @@ -151,45 +171,35 @@ void SpdyStream::DoOnWriteComplete(int status) { DoLoop(status); } -void SpdyStream::DoOnClose(int status) { +void SpdyStream::OnClose(int status) { response_complete_ = true; response_status_ = status; stream_id_ = 0; + Delegate* delegate = delegate_; + delegate_ = NULL; + if (delegate) + delegate->OnClose(status); } -void SpdyStream::DoCancel() { +void SpdyStream::Cancel() { cancelled_ = true; session_->CloseStream(stream_id_, ERR_ABORTED); } -int SpdyStream::DoSendRequest(UploadDataStream* upload_data, - HttpResponseInfo* response) { +int SpdyStream::DoSendRequest(bool has_upload_data) { CHECK(!cancelled_); - CHECK(response); - - // SendRequest can be called in two cases. - // - // a) A client initiated request. In this case, response_ should be NULL - // to start with. - // b) A client request which matches a response that the server has already - // pushed. In this case, the value of |*push_response_| is copied over to - // the new response object |*response|. |push_response_| is cleared - // and |*push_response_| is deleted, and |response_| is reset to - // |response|. - if (push_response_.get()) { - *response = *push_response_; - push_response_.reset(NULL); - response_ = NULL; - } - - DCHECK_EQ(static_cast<HttpResponseInfo*>(NULL), response_); - response_ = response; - if (upload_data) { - if (upload_data->size()) - request_body_stream_.reset(upload_data); - else - delete upload_data; + if (!pushed_) { + spdy::SpdyControlFlags flags = spdy::CONTROL_FLAG_NONE; + if (!has_upload_data) + flags = spdy::CONTROL_FLAG_FIN; + + CHECK(request_.get()); + int result = session_->WriteSynStream( + stream_id_, static_cast<RequestPriority>(priority_), flags, + request_); + if (result != ERR_IO_PENDING) + return result; } send_time_ = base::TimeTicks::Now(); @@ -198,7 +208,7 @@ int SpdyStream::DoSendRequest(UploadDataStream* upload_data, if (!pushed_) io_state_ = STATE_SEND_HEADERS; else { - if (response_->headers) { + if (!response_->empty()) { io_state_ = STATE_READ_BODY; } else { io_state_ = STATE_READ_HEADERS; @@ -212,7 +222,7 @@ int SpdyStream::DoReadResponseHeaders() { CHECK(!cancelled_); // The SYN_REPLY has already been received. - if (response_->headers) + if (!response_->empty()) return OK; io_state_ = STATE_READ_HEADERS; @@ -220,6 +230,14 @@ int SpdyStream::DoReadResponseHeaders() { return ERR_IO_PENDING; } +int SpdyStream::WriteStreamData(IOBuffer* data, int length) { + return session_->WriteStreamData(stream_id_, data, length); +} + +bool SpdyStream::GetSSLInfo(SSLInfo* ssl_info, bool* was_npn_negotiated) { + return session_->GetSSLInfo(ssl_info, was_npn_negotiated); +} + int SpdyStream::DoLoop(int result) { do { State state = io_state_; @@ -295,8 +313,11 @@ int SpdyStream::DoSendHeadersComplete(int result) { CHECK_GT(result, 0); + if (!delegate_) + return ERR_UNEXPECTED; + // There is no body, skip that state. - if (!request_body_stream_.get()) { + if (delegate_->OnSendHeadersComplete(result)) { io_state_ = STATE_READ_HEADERS; return OK; } @@ -314,12 +335,9 @@ int SpdyStream::DoSendBody() { // the number of bytes in the frame that were written, only consume the // data portion, of course. io_state_ = STATE_SEND_BODY_COMPLETE; - int buf_len = static_cast<int>(request_body_stream_->buf_len()); - if (!buf_len) - return OK; - return session_->WriteStreamData(stream_id_, - request_body_stream_->buf(), - buf_len); + if (!delegate_) + return ERR_UNEXPECTED; + return delegate_->OnSendBody(); } int SpdyStream::DoSendBodyComplete(int result) { @@ -328,9 +346,10 @@ int SpdyStream::DoSendBodyComplete(int result) { CHECK_NE(result, 0); - request_body_stream_->DidConsume(result); + if (!delegate_) + return ERR_UNEXPECTED; - if (!request_body_stream_->eof()) + if (!delegate_->OnSendBodyComplete(result)) io_state_ = STATE_SEND_BODY; else io_state_ = STATE_READ_HEADERS; @@ -340,7 +359,7 @@ int SpdyStream::DoSendBodyComplete(int result) { int SpdyStream::DoReadHeaders() { io_state_ = STATE_READ_HEADERS_COMPLETE; - return response_->headers ? OK : ERR_IO_PENDING; + return !response_->empty() ? OK : ERR_IO_PENDING; } int SpdyStream::DoReadHeadersComplete(int result) { diff --git a/net/spdy/spdy_stream.h b/net/spdy/spdy_stream.h index 1792cf1..919bca8 100644 --- a/net/spdy/spdy_stream.h +++ b/net/spdy/spdy_stream.h @@ -6,25 +6,25 @@ #define NET_SPDY_SPDY_STREAM_H_ #include <string> +#include <vector> #include "base/basictypes.h" +#include "base/linked_ptr.h" #include "base/ref_counted.h" #include "base/scoped_ptr.h" #include "net/base/bandwidth_metrics.h" #include "net/base/io_buffer.h" #include "net/base/net_log.h" -#include "net/http/http_request_info.h" +#include "net/spdy/spdy_framer.h" #include "net/spdy/spdy_protocol.h" namespace net { -class HttpResponseInfo; class SpdySession; -class UploadDataStream; +class SSLInfo; // The SpdyStream is used by the SpdySession to represent each stream known -// on the SpdySession. This class provides interfaces for SpdySession to use -// and base implementations for the interfaces. +// on the SpdySession. This class provides interfaces for SpdySession to use. // Streams can be created either by the client or by the server. When they // are initiated by the client, both the SpdySession and client object (such as // a SpdyNetworkTransaction) will maintain a reference to the stream. When @@ -32,9 +32,58 @@ class UploadDataStream; // until such a time as a client object requests a stream for the path. class SpdyStream : public base::RefCounted<SpdyStream> { public: + // Delegate handles protocol specific behavior of spdy stream. + class Delegate { + public: + Delegate() {} + + // Called when SYN frame has been sent. + // Returns true if no more data to be sent after SYN frame. + virtual bool OnSendHeadersComplete(int status) = 0; + + // Called when stream is ready to send data. + // Returns network error code. OK when it successfully sent data. + virtual int OnSendBody() = 0; + + // Called when data has been sent. |status| indicates network error + // or number of bytes has been sent. + // Returns true if no more data to be sent. + virtual bool OnSendBodyComplete(int status) = 0; + + // Called when SYN_REPLY received. |status| indicates network error. + // Returns network error code. + virtual int OnResponseReceived(const spdy::SpdyHeaderBlock& response, + base::Time response_time, + int status) = 0; + + // Called when data is received. + // Returns true if data is successfully processed. + virtual void OnDataReceived(const char* data, int length) = 0; + + // Called when SpdyStream is closed. + virtual void OnClose(int status) = 0; + + protected: + friend class base::RefCounted<Delegate>; + virtual ~Delegate() {} + + private: + DISALLOW_COPY_AND_ASSIGN(Delegate); + }; + // SpdyStream constructor SpdyStream(SpdySession* session, spdy::SpdyStreamId stream_id, bool pushed); + // Set new |delegate|. |delegate| must not be NULL. + // If it already received SYN_REPLY or data, OnResponseReceived() or + // OnDataReceived() will be called. + void SetDelegate(Delegate* delegate); + Delegate* GetDelegate() { return delegate_; } + + // Detach delegate from the stream. It will cancel the stream if it was not + // cancelled yet. It is safe to call multiple times. + void DetachDelegate(); + // Is this stream a pushed stream from the server. bool pushed() const { return pushed_; } @@ -51,9 +100,8 @@ class SpdyStream : public base::RefCounted<SpdyStream> { const BoundNetLog& net_log() const { return net_log_; } void set_net_log(const BoundNetLog& log) { net_log_ = log; } - const HttpResponseInfo* GetResponseInfo() const; - const HttpRequestInfo* GetRequestInfo() const; - void SetRequestInfo(const HttpRequestInfo& request); + const linked_ptr<spdy::SpdyHeaderBlock>& spdy_headers() const; + void set_spdy_headers(const linked_ptr<spdy::SpdyHeaderBlock>& headers); base::Time GetRequestTime() const; void SetRequestTime(base::Time t); @@ -61,7 +109,7 @@ class SpdyStream : public base::RefCounted<SpdyStream> { // received for this stream. |path| is the path of the URL for a server // initiated stream, otherwise is empty. // Returns a status code. - virtual int OnResponseReceived(const HttpResponseInfo& response) = 0; + int OnResponseReceived(const spdy::SpdyHeaderBlock& response); // Called by the SpdySession when response data has been received for this // stream. This callback may be called multiple times as data arrives @@ -70,61 +118,43 @@ class SpdyStream : public base::RefCounted<SpdyStream> { // from this buffer before returning from this callback. // |length| is the number of bytes received or an error. // A zero-length count does not indicate end-of-stream. - // Returns true on success and false on error. - virtual bool OnDataReceived(const char* buffer, int bytes) = 0; + void OnDataReceived(const char* buffer, int bytes); // Called by the SpdySession when a write has completed. This callback // will be called multiple times for each write which completes. Writes // include the SYN_STREAM write and also DATA frame writes. // |result| is the number of bytes written or a net error code. - virtual void OnWriteComplete(int status) = 0; + void OnWriteComplete(int status); // Called by the SpdySession when the request is finished. This callback // will always be called at the end of the request and signals to the // stream that the stream has no more network events. No further callbacks // to the stream will be made after this call. // |status| is an error code or OK. - virtual void OnClose(int status) = 0; + void OnClose(int status); - virtual void Cancel() = 0; + void Cancel(); bool cancelled() const { return cancelled_; } - void SetPushResponse(HttpResponseInfo* response_info); - - protected: - friend class base::RefCounted<SpdyStream>; - virtual ~SpdyStream(); - - int DoOnResponseReceived(const HttpResponseInfo& response); - bool DoOnDataReceived(const char* buffer,int bytes); - void DoOnWriteComplete(int status); - void DoOnClose(int status); + // Interface for Spdy[Http|WebSocket]Stream to use. - void DoCancel(); - - // Sends the request. If |upload_data| is non-NULL, sends that in the - // request body. Note that the actual SYN_STREAM packet will have already - // been sent by this point. - // Note that SpdyStream takes ownership of |upload_data|. - // TODO(ukai): move out HTTP-specific thing to SpdyHttpStream. - int DoSendRequest(UploadDataStream* upload_data, - HttpResponseInfo* response_info); + // Sends the request. + // For non push stream, it will send SYN_STREAM frame. + int DoSendRequest(bool has_upload_data); // Reads response headers. If the SpdyStream have already received // the response headers, return OK and response headers filled in - // |response_info| given in SendRequest. - // Otherwise, return ERR_IO_PENDING. + // |response| given in SendRequest. + // Otherwise, return ERR_IO_PENDING and OnResponseReceived() will be called. int DoReadResponseHeaders(); - const UploadDataStream* request_body_stream() const { - return request_body_stream_.get(); - } + // Sends DATA frame. + int WriteStreamData(IOBuffer* data, int length); + + bool GetSSLInfo(SSLInfo* ssl_info, bool* was_npn_negotiated); bool is_idle() const { return io_state_ == STATE_NONE; } bool response_complete() const { return response_complete_; } - void set_response_complete(bool response_complete) { - response_complete_ = response_complete; - } int response_status() const { return response_status_; } private: @@ -141,6 +171,9 @@ class SpdyStream : public base::RefCounted<SpdyStream> { STATE_DONE }; + friend class base::RefCounted<SpdyStream>; + virtual ~SpdyStream(); + // Try to make progress sending/receiving the request/response. int DoLoop(int result); @@ -166,26 +199,18 @@ class SpdyStream : public base::RefCounted<SpdyStream> { scoped_refptr<SpdySession> session_; + // The transaction should own the delegate. + SpdyStream::Delegate* delegate_; + // The request to send. - HttpRequestInfo request_; + linked_ptr<spdy::SpdyHeaderBlock> request_; // The time at which the request was made that resulted in this response. // For cached responses, this time could be "far" in the past. base::Time request_time_; - // The push_response_ is the HTTP response data which is part of - // a server-initiated SYN_STREAM. If a client request comes in - // which matches the push stream, the data in push_response_ will - // be copied over to the response_ object owned by the caller - // of the request. - scoped_ptr<HttpResponseInfo> push_response_; - - // response_ is the HTTP response data object which is filled in - // when a SYN_REPLY comes in for the stream. It is not owned by this - // stream object. - HttpResponseInfo* response_; - - scoped_ptr<UploadDataStream> request_body_stream_; + linked_ptr<spdy::SpdyHeaderBlock> response_; + base::Time response_time_; bool response_complete_; // TODO(mbelshe): fold this into the io_state. State io_state_; @@ -204,6 +229,8 @@ class SpdyStream : public base::RefCounted<SpdyStream> { int send_bytes_; int recv_bytes_; bool histograms_recorded_; + // Data received before delegate is attached. + std::vector<scoped_refptr<IOBufferWithSize> > pending_buffers_; DISALLOW_COPY_AND_ASSIGN(SpdyStream); }; |