diff options
author | akalin@chromium.org <akalin@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2013-05-23 16:41:38 +0000 |
---|---|---|
committer | akalin@chromium.org <akalin@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2013-05-23 16:41:38 +0000 |
commit | aa19cfca991c78d1322f25aef3d62f48bc519f9f (patch) | |
tree | a9bcfec0de6b561d56d8c44d647d9a26e2548aa3 /net/spdy | |
parent | 595ec5160ffd04c3afcdacea05ef054faf49e5e6 (diff) | |
download | chromium_src-aa19cfca991c78d1322f25aef3d62f48bc519f9f.zip chromium_src-aa19cfca991c78d1322f25aef3d62f48bc519f9f.tar.gz chromium_src-aa19cfca991c78d1322f25aef3d62f48bc519f9f.tar.bz2 |
[SPDY] Change SpdyStream::QueueStreamData() To SendStreamData()
Change its semantics so that it holds a reference to the passed-in data
and calls back to the delegate only when all the data has been sent.
Fix bug where flow control wouldn't work for non-HTTP streams. Now it
just queues up the next data frame (which must exist, since it must
have been trying to send something when it got stalled).
Remove now-redundant bytes_sent parameters from SpdyStream::Delegate
methods.
Remove now-redundant DrainableIOBuffer member variable in SpdyHttpStream.
Remove now-redundant logic in SpdyProxyClientSocket to split up data to
send into frames.
Add SpdyStream tests for sending large blobs of data.
Enable bidirectional flow control tests for SpdyStream.
BUG=242288
Review URL: https://chromiumcodereview.appspot.com/15740018
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@201828 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'net/spdy')
-rw-r--r-- | net/spdy/spdy_http_stream.cc | 37 | ||||
-rw-r--r-- | net/spdy/spdy_http_stream.h | 7 | ||||
-rw-r--r-- | net/spdy/spdy_proxy_client_socket.cc | 56 | ||||
-rw-r--r-- | net/spdy/spdy_proxy_client_socket.h | 6 | ||||
-rw-r--r-- | net/spdy/spdy_session_spdy3_unittest.cc | 24 | ||||
-rw-r--r-- | net/spdy/spdy_stream.cc | 117 | ||||
-rw-r--r-- | net/spdy/spdy_stream.h | 45 | ||||
-rw-r--r-- | net/spdy/spdy_stream_spdy2_unittest.cc | 134 | ||||
-rw-r--r-- | net/spdy/spdy_stream_spdy3_unittest.cc | 146 | ||||
-rw-r--r-- | net/spdy/spdy_stream_test_util.cc | 44 | ||||
-rw-r--r-- | net/spdy/spdy_stream_test_util.h | 23 | ||||
-rw-r--r-- | net/spdy/spdy_websocket_stream.cc | 12 | ||||
-rw-r--r-- | net/spdy/spdy_websocket_stream.h | 5 |
13 files changed, 444 insertions, 212 deletions
diff --git a/net/spdy/spdy_http_stream.cc b/net/spdy/spdy_http_stream.cc index 3129412..0545773 100644 --- a/net/spdy/spdy_http_stream.cc +++ b/net/spdy/spdy_http_stream.cc @@ -38,6 +38,7 @@ SpdyHttpStream::SpdyHttpStream(SpdySession* spdy_session, response_info_(NULL), response_headers_received_(false), user_buffer_len_(0), + raw_request_body_buf_size_(0), buffered_read_callback_pending_(false), more_read_data_pending_(false), direct_(direct) {} @@ -236,7 +237,7 @@ int SpdyHttpStream::SendRequest(const HttpRequestHeaders& request_headers, // body data is written with this size at a time. raw_request_body_buf_ = new IOBufferWithSize(kMaxSpdyFrameChunkSize); // The request body buffer is empty at first. - request_body_buf_ = new DrainableIOBuffer(raw_request_body_buf_, 0); + raw_request_body_buf_size_ = 0; } CHECK(!callback.is_null()); @@ -288,7 +289,7 @@ SpdySendStatus SpdyHttpStream::OnSendHeadersComplete() { void SpdyHttpStream::OnSendBody() { CHECK(request_info_ && request_info_->upload_data_stream); - if (request_body_buf_->BytesRemaining() > 0) { + if (raw_request_body_buf_size_ > 0) { SendRequestBodyData(); } else { // We shouldn't be called if there's no more data to read. @@ -297,20 +298,14 @@ void SpdyHttpStream::OnSendBody() { } } -SpdySendStatus SpdyHttpStream::OnSendBodyComplete(size_t bytes_sent) { +SpdySendStatus SpdyHttpStream::OnSendBodyComplete() { // |status| is the number of bytes written to the SPDY stream. CHECK(request_info_ && request_info_->upload_data_stream); - DCHECK_GE(static_cast<int>(bytes_sent), 0); - DCHECK_LE(static_cast<int>(bytes_sent), request_body_buf_->BytesRemaining()); - - request_body_buf_->DidConsume(static_cast<int>(bytes_sent)); + raw_request_body_buf_size_ = 0; // Check for more data to send. - if (!request_info_->upload_data_stream->IsEOF() || - (request_body_buf_->BytesRemaining() > 0)) - return MORE_DATA_TO_SEND; - - return NO_MORE_DATA_TO_SEND; + return request_info_->upload_data_stream->IsEOF() ? + NO_MORE_DATA_TO_SEND : MORE_DATA_TO_SEND; } int SpdyHttpStream::OnResponseReceived(const SpdyHeaderBlock& response, @@ -402,10 +397,10 @@ int SpdyHttpStream::OnDataReceived(scoped_ptr<SpdyBuffer> buffer) { return OK; } -void SpdyHttpStream::OnDataSent(size_t /*bytes_sent*/) { +void SpdyHttpStream::OnDataSent() { // For HTTP streams, no data is sent from the client while in the OPEN state, // so it is never called. - NOTREACHED(); + CHECK(false); } void SpdyHttpStream::OnClose(int status) { @@ -437,7 +432,7 @@ void SpdyHttpStream::OnStreamCreated( void SpdyHttpStream::ReadAndSendRequestBodyData() { CHECK(request_info_ && request_info_->upload_data_stream); - CHECK_EQ(0, request_body_buf_->BytesRemaining()); + CHECK_EQ(raw_request_body_buf_size_, 0); // Read the data from the request body stream. const int rv = request_info_->upload_data_stream->Read( @@ -455,20 +450,20 @@ void SpdyHttpStream::ReadAndSendRequestBodyData() { void SpdyHttpStream::OnRequestBodyReadCompleted(int status) { CHECK_GE(status, 0); - request_body_buf_ = new DrainableIOBuffer(raw_request_body_buf_, status); + raw_request_body_buf_size_ = status; SendRequestBodyData(); } void SpdyHttpStream::SendRequestBodyData() { const bool eof = request_info_->upload_data_stream->IsEOF(); if (eof) { - CHECK_GE(request_body_buf_->BytesRemaining(), 0); + CHECK_GE(raw_request_body_buf_size_, 0); } else { - CHECK_GT(request_body_buf_->BytesRemaining(), 0); + CHECK_GT(raw_request_body_buf_size_, 0); } - stream_->QueueStreamData(request_body_buf_, - request_body_buf_->BytesRemaining(), - eof ? DATA_FLAG_FIN : DATA_FLAG_NONE); + stream_->SendStreamData(raw_request_body_buf_, + raw_request_body_buf_size_, + eof ? DATA_FLAG_FIN : DATA_FLAG_NONE); } void SpdyHttpStream::ScheduleBufferedReadCallback() { diff --git a/net/spdy/spdy_http_stream.h b/net/spdy/spdy_http_stream.h index 5f405d3..1fdc3cb 100644 --- a/net/spdy/spdy_http_stream.h +++ b/net/spdy/spdy_http_stream.h @@ -86,13 +86,13 @@ class NET_EXPORT_PRIVATE SpdyHttpStream : public SpdyStream::Delegate, // SpdyStream::Delegate implementation. virtual SpdySendStatus OnSendHeadersComplete() OVERRIDE; virtual void OnSendBody() OVERRIDE; - virtual SpdySendStatus OnSendBodyComplete(size_t bytes_sent) OVERRIDE; + virtual SpdySendStatus OnSendBodyComplete() OVERRIDE; virtual int OnResponseReceived(const SpdyHeaderBlock& response, base::Time response_time, int status) OVERRIDE; virtual void OnHeadersSent() OVERRIDE; virtual int OnDataReceived(scoped_ptr<SpdyBuffer> buffer) OVERRIDE; - virtual void OnDataSent(size_t bytes_sent) OVERRIDE; + virtual void OnDataSent() OVERRIDE; virtual void OnClose(int status) OVERRIDE; private: @@ -157,8 +157,7 @@ class NET_EXPORT_PRIVATE SpdyHttpStream : public SpdyStream::Delegate, // Temporary buffer used to read the request body from UploadDataStream. scoped_refptr<IOBufferWithSize> raw_request_body_buf_; - // Wraps raw_request_body_buf_ to read the remaining data progressively. - scoped_refptr<DrainableIOBuffer> request_body_buf_; + int raw_request_body_buf_size_; // Is there a scheduled read callback pending. bool buffered_read_callback_pending_; diff --git a/net/spdy/spdy_proxy_client_socket.cc b/net/spdy/spdy_proxy_client_socket.cc index 5bc617c..43ec5b2 100644 --- a/net/spdy/spdy_proxy_client_socket.cc +++ b/net/spdy/spdy_proxy_client_socket.cc @@ -8,6 +8,7 @@ #include "base/bind.h" #include "base/bind_helpers.h" +#include "base/callback_helpers.h" #include "base/logging.h" #include "base/string_util.h" #include "base/values.h" @@ -41,7 +42,6 @@ SpdyProxyClientSocket::SpdyProxyClientSocket( auth_handler_factory)), user_buffer_len_(0), write_buffer_len_(0), - write_bytes_outstanding_(0), weak_factory_(this), net_log_(BoundNetLog::Make(spdy_stream->net_log().net_log(), NetLog::SOURCE_PROXY_CLIENT_SOCKET)) { @@ -133,7 +133,6 @@ void SpdyProxyClientSocket::Disconnect() { read_callback_.Reset(); write_buffer_len_ = 0; - write_bytes_outstanding_ = 0; write_callback_.Reset(); next_state_ = STATE_DISCONNECTED; @@ -227,33 +226,12 @@ int SpdyProxyClientSocket::Write(IOBuffer* buf, int buf_len, return ERR_SOCKET_NOT_CONNECTED; DCHECK(spdy_stream_); - write_bytes_outstanding_= buf_len; - if (buf_len <= kMaxSpdyFrameChunkSize) { - spdy_stream_->QueueStreamData(buf, buf_len, DATA_FLAG_NONE); - net_log_.AddByteTransferEvent(NetLog::TYPE_SOCKET_BYTES_SENT, - buf_len, buf->data()); - write_callback_ = callback; - write_buffer_len_ = buf_len; - return ERR_IO_PENDING; - } - - // Since a SPDY Data frame can only include kMaxSpdyFrameChunkSize bytes - // we need to send multiple data frames - for (int i = 0; i < buf_len; i += kMaxSpdyFrameChunkSize) { - int len = std::min(kMaxSpdyFrameChunkSize, buf_len - i); - scoped_refptr<DrainableIOBuffer> iobuf(new DrainableIOBuffer(buf, i + len)); - iobuf->SetOffset(i); - spdy_stream_->QueueStreamData(iobuf, len, DATA_FLAG_NONE); - net_log_.AddByteTransferEvent(NetLog::TYPE_SOCKET_BYTES_SENT, - len, buf->data()); - } - if (write_bytes_outstanding_ > 0) { - write_callback_ = callback; - write_buffer_len_ = buf_len; - return ERR_IO_PENDING; - } else { - return buf_len; - } + spdy_stream_->SendStreamData(buf, buf_len, DATA_FLAG_NONE); + net_log_.AddByteTransferEvent(NetLog::TYPE_SOCKET_BYTES_SENT, + buf_len, buf->data()); + write_callback_ = callback; + write_buffer_len_ = buf_len; + return ERR_IO_PENDING; } bool SpdyProxyClientSocket::SetReceiveBufferSize(int32 size) { @@ -474,8 +452,7 @@ void SpdyProxyClientSocket::OnSendBody() { CHECK(false); } -SpdySendStatus SpdyProxyClientSocket::OnSendBodyComplete( - size_t /*bytes_sent*/) { +SpdySendStatus SpdyProxyClientSocket::OnSendBodyComplete() { // Because we use |spdy_stream_| via STATE_OPEN (ala WebSockets) // OnSendBodyComplete() must never be called. CHECK(false); @@ -528,20 +505,12 @@ int SpdyProxyClientSocket::OnDataReceived(scoped_ptr<SpdyBuffer> buffer) { return OK; } -void SpdyProxyClientSocket::OnDataSent(size_t bytes_sent) { +void SpdyProxyClientSocket::OnDataSent() { DCHECK(!write_callback_.is_null()); - DCHECK_LE(static_cast<int>(bytes_sent), write_bytes_outstanding_); - write_bytes_outstanding_ -= static_cast<int>(bytes_sent); - - if (write_bytes_outstanding_ == 0) { - int rv = write_buffer_len_; - write_buffer_len_ = 0; - write_bytes_outstanding_ = 0; - CompletionCallback c = write_callback_; - write_callback_.Reset(); - c.Run(rv); - } + int rv = write_buffer_len_; + write_buffer_len_ = 0; + ResetAndReturn(&write_callback_).Run(rv); } void SpdyProxyClientSocket::OnClose(int status) { @@ -559,7 +528,6 @@ void SpdyProxyClientSocket::OnClose(int status) { CompletionCallback write_callback = write_callback_; write_callback_.Reset(); write_buffer_len_ = 0; - write_bytes_outstanding_ = 0; // If we're in the middle of connecting, we need to make sure // we invoke the connect callback. diff --git a/net/spdy/spdy_proxy_client_socket.h b/net/spdy/spdy_proxy_client_socket.h index ee187d7..0e5f777 100644 --- a/net/spdy/spdy_proxy_client_socket.h +++ b/net/spdy/spdy_proxy_client_socket.h @@ -93,13 +93,13 @@ class NET_EXPORT_PRIVATE SpdyProxyClientSocket : public ProxyClientSocket, // SpdyStream::Delegate implementation. virtual SpdySendStatus OnSendHeadersComplete() OVERRIDE; virtual void OnSendBody() OVERRIDE; - virtual SpdySendStatus OnSendBodyComplete(size_t bytes_sent) OVERRIDE; + virtual SpdySendStatus OnSendBodyComplete() OVERRIDE; virtual int OnResponseReceived(const SpdyHeaderBlock& response, base::Time response_time, int status) OVERRIDE; virtual void OnHeadersSent() OVERRIDE; virtual int OnDataReceived(scoped_ptr<SpdyBuffer> buffer) OVERRIDE; - virtual void OnDataSent(size_t bytes_sent) OVERRIDE; + virtual void OnDataSent() OVERRIDE; virtual void OnClose(int status) OVERRIDE; private: @@ -158,8 +158,6 @@ class NET_EXPORT_PRIVATE SpdyProxyClientSocket : public ProxyClientSocket, // User specified number of bytes to be written. int write_buffer_len_; - // Number of bytes written which have not been confirmed - int write_bytes_outstanding_; // True if the transport socket has ever sent data. bool was_ever_used_; diff --git a/net/spdy/spdy_session_spdy3_unittest.cc b/net/spdy/spdy_session_spdy3_unittest.cc index d3abc2c..17c525f 100644 --- a/net/spdy/spdy_session_spdy3_unittest.cc +++ b/net/spdy/spdy_session_spdy3_unittest.cc @@ -3007,7 +3007,7 @@ void SpdySessionSpdy3Test::RunResumeAfterUnstallTest31( EXPECT_EQ("200", delegate.GetResponseHeaderValue(":status")); EXPECT_EQ("HTTP/1.1", delegate.GetResponseHeaderValue(":version")); EXPECT_EQ(std::string(), delegate.TakeReceivedData()); - EXPECT_EQ(static_cast<int>(kBodyDataSize), delegate.body_data_sent()); + EXPECT_TRUE(data.at_write_eof()); } // Run the resume-after-unstall test with all possible stall and @@ -3190,13 +3190,13 @@ TEST_F(SpdySessionSpdy3Test, ResumeByPriorityAfterSendWindowSizeIncrease31) { EXPECT_EQ("200", delegate1.GetResponseHeaderValue(":status")); EXPECT_EQ("HTTP/1.1", delegate1.GetResponseHeaderValue(":version")); EXPECT_EQ(std::string(), delegate1.TakeReceivedData()); - EXPECT_EQ(static_cast<int>(kBodyDataSize), delegate1.body_data_sent()); EXPECT_TRUE(delegate2.send_headers_completed()); EXPECT_EQ("200", delegate2.GetResponseHeaderValue(":status")); EXPECT_EQ("HTTP/1.1", delegate2.GetResponseHeaderValue(":version")); EXPECT_EQ(std::string(), delegate2.TakeReceivedData()); - EXPECT_EQ(static_cast<int>(kBodyDataSize), delegate2.body_data_sent()); + + EXPECT_TRUE(data.at_write_eof()); } // Delegate that closes a given stream after sending its body. @@ -3212,12 +3212,13 @@ class StreamClosingDelegate : public test::StreamDelegateWithBody { stream_to_close_ = stream_to_close; } - virtual void OnSendBody() OVERRIDE { - test::StreamDelegateWithBody::OnSendBody(); + virtual SpdySendStatus OnSendBodyComplete() OVERRIDE { + SpdySendStatus status = test::StreamDelegateWithBody::OnSendBodyComplete(); if (stream_to_close_) { stream_to_close_->Close(); EXPECT_EQ(NULL, stream_to_close_.get()); } + return status; } private: @@ -3354,6 +3355,8 @@ TEST_F(SpdySessionSpdy3Test, SendWindowSizeIncreaseWithDeletedStreams31) { // Unstall stream2, which should then close stream3. delegate2.set_stream_to_close(stream3); UnstallSessionSend(session, kBodyDataSize); + + data.RunFor(1); EXPECT_EQ(NULL, stream3.get()); EXPECT_FALSE(stream2->send_stalled_by_flow_control()); @@ -3361,7 +3364,7 @@ TEST_F(SpdySessionSpdy3Test, SendWindowSizeIncreaseWithDeletedStreams31) { EXPECT_TRUE(session->IsStreamActive(stream_id2)); EXPECT_FALSE(session->IsStreamActive(stream_id3)); - data.RunFor(3); + data.RunFor(2); EXPECT_EQ(NULL, stream2.get()); EXPECT_EQ(ERR_CONNECTION_CLOSED, delegate1.WaitForClose()); @@ -3370,17 +3373,16 @@ TEST_F(SpdySessionSpdy3Test, SendWindowSizeIncreaseWithDeletedStreams31) { EXPECT_TRUE(delegate1.send_headers_completed()); EXPECT_EQ(std::string(), delegate1.TakeReceivedData()); - EXPECT_EQ(0, delegate1.body_data_sent()); EXPECT_TRUE(delegate2.send_headers_completed()); EXPECT_EQ("200", delegate2.GetResponseHeaderValue(":status")); EXPECT_EQ("HTTP/1.1", delegate2.GetResponseHeaderValue(":version")); EXPECT_EQ(std::string(), delegate2.TakeReceivedData()); - EXPECT_EQ(static_cast<int>(kBodyDataSize), delegate2.body_data_sent()); EXPECT_TRUE(delegate3.send_headers_completed()); EXPECT_EQ(std::string(), delegate3.TakeReceivedData()); - EXPECT_EQ(0, delegate3.body_data_sent()); + + EXPECT_TRUE(data.at_write_eof()); } // Cause a stall by reducing the flow control send window to @@ -3489,11 +3491,11 @@ TEST_F(SpdySessionSpdy3Test, SendWindowSizeIncreaseWithDeletedSession31) { EXPECT_TRUE(delegate1.send_headers_completed()); EXPECT_EQ(std::string(), delegate1.TakeReceivedData()); - EXPECT_EQ(0, delegate1.body_data_sent()); EXPECT_TRUE(delegate2.send_headers_completed()); EXPECT_EQ(std::string(), delegate2.TakeReceivedData()); - EXPECT_EQ(0, delegate2.body_data_sent()); + + EXPECT_TRUE(data.at_write_eof()); } // Tests the case of a non-SPDY request closing an idle SPDY session when no diff --git a/net/spdy/spdy_stream.cc b/net/spdy/spdy_stream.cc index 936fdf9..fd63726 100644 --- a/net/spdy/spdy_stream.cc +++ b/net/spdy/spdy_stream.cc @@ -129,6 +129,7 @@ SpdyStream::SpdyStream(SpdySession* session, response_received_(false), session_(session), delegate_(NULL), + pending_send_flags_(DATA_FLAG_NONE), request_time_(base::Time::Now()), response_(new SpdyHeaderBlock), io_state_(STATE_NONE), @@ -625,7 +626,7 @@ int SpdyStream::SendRequest(bool has_upload_data) { return DoLoop(OK); } -void SpdyStream::QueueHeaders(scoped_ptr<SpdyHeaderBlock> headers) { +void SpdyStream::SendHeaders(scoped_ptr<SpdyHeaderBlock> headers) { // Until the first headers by SYN_STREAM have been completely sent, we can // not be sure that our stream_id is correct. DCHECK_GT(io_state_, STATE_SEND_HEADERS_COMPLETE); @@ -637,39 +638,13 @@ void SpdyStream::QueueHeaders(scoped_ptr<SpdyHeaderBlock> headers) { new HeaderBufferProducer(GetWeakPtr(), headers.Pass()))); } -void SpdyStream::QueueStreamData(IOBuffer* data, - int length, - SpdyDataFlags flags) { - // Until the headers have been completely sent, we can not be sure - // that our stream_id is correct. - DCHECK_GT(io_state_, STATE_SEND_HEADERS_COMPLETE); - CHECK_GT(stream_id_, 0u); - - scoped_ptr<SpdyBuffer> data_buffer(session_->CreateDataBuffer( - stream_id_, data, length, flags)); - // We'll get called again by PossiblyResumeIfSendStalled(). - if (!data_buffer) - return; - - if (session_->flow_control_state() >= SpdySession::FLOW_CONTROL_STREAM) { - DCHECK_GE(data_buffer->GetRemainingSize(), - session_->GetDataFrameMinimumSize()); - size_t payload_size = - data_buffer->GetRemainingSize() - session_->GetDataFrameMinimumSize(); - DCHECK_LE(payload_size, session_->GetDataFrameMaximumPayload()); - DecreaseSendWindowSize(static_cast<int32>(payload_size)); - // This currently isn't strictly needed, since write frames are - // discarded only if the stream is about to be closed. But have it - // here anyway just in case this changes. - data_buffer->AddConsumeCallback( - base::Bind(&SpdyStream::OnWriteBufferConsumed, - GetWeakPtr(), payload_size)); - } - - session_->EnqueueStreamWrite( - GetWeakPtr(), DATA, - scoped_ptr<SpdyBufferProducer>( - new SimpleBufferProducer(data_buffer.Pass()))); +void SpdyStream::SendStreamData(IOBuffer* data, + int length, + SpdyDataFlags flags) { + CHECK(!pending_send_data_); + pending_send_data_ = new DrainableIOBuffer(data, length); + pending_send_flags_ = flags; + QueueNextDataFrame(); } bool SpdyStream::GetSSLInfo(SSLInfo* ssl_info, @@ -692,8 +667,7 @@ void SpdyStream::PossiblyResumeIfSendStalled() { NetLog::TYPE_SPDY_STREAM_FLOW_CONTROL_UNSTALLED, NetLog::IntegerCallback("stream_id", stream_id_)); send_stalled_by_flow_control_ = false; - io_state_ = STATE_SEND_BODY; - DoLoop(OK); + QueueNextDataFrame(); } } @@ -903,11 +877,6 @@ int SpdyStream::DoSendHeadersComplete() { // DoSendBody is called to send the optional body for the request. This call // will also be called as each write of a chunk of the body completes. int SpdyStream::DoSendBody() { - // If we're already in the STATE_SEND_BODY state, then we've already - // sent a portion of the body. In that case, we need to first consume - // the bytes written in the body stream. Note that the bytes written is - // the number of bytes in the frame that were written, only consume the - // data portion, of course. io_state_ = STATE_SEND_BODY_COMPLETE; CHECK(delegate_); delegate_->OnSendBody(); @@ -935,15 +904,25 @@ int SpdyStream::DoSendBodyComplete(int result) { return ERR_UNEXPECTED; } + send_bytes_ += frame_payload_size; + + pending_send_data_->DidConsume(frame_payload_size); + if (pending_send_data_->BytesRemaining() > 0) { + io_state_ = STATE_SEND_BODY_COMPLETE; + QueueNextDataFrame(); + return ERR_IO_PENDING; + } + + pending_send_data_ = NULL; + pending_send_flags_ = DATA_FLAG_NONE; + if (!delegate_) { NOTREACHED(); return ERR_UNEXPECTED; } - send_bytes_ += frame_payload_size; - io_state_ = - (delegate_->OnSendBodyComplete(frame_payload_size) == MORE_DATA_TO_SEND) ? + (delegate_->OnSendBodyComplete() == MORE_DATA_TO_SEND) ? STATE_SEND_BODY : STATE_WAITING_FOR_RESPONSE; return OK; @@ -967,8 +946,19 @@ int SpdyStream::DoOpen() { } send_bytes_ += frame_payload_size; + + pending_send_data_->DidConsume(frame_payload_size); + if (pending_send_data_->BytesRemaining() > 0) { + QueueNextDataFrame(); + return ERR_IO_PENDING; + } + + pending_send_data_ = NULL; + pending_send_flags_ = DATA_FLAG_NONE; + if (delegate_) - delegate_->OnDataSent(frame_payload_size); + delegate_->OnDataSent(); + break; } @@ -1002,4 +992,41 @@ void SpdyStream::UpdateHistograms() { UMA_HISTOGRAM_COUNTS("Net.SpdyRecvBytes", recv_bytes_); } +void SpdyStream::QueueNextDataFrame() { + // Until the headers have been completely sent, we can not be sure + // that our stream_id is correct. + DCHECK_GT(io_state_, STATE_SEND_HEADERS_COMPLETE); + CHECK_GT(stream_id_, 0u); + CHECK(pending_send_data_); + CHECK_GT(pending_send_data_->BytesRemaining(), 0); + + scoped_ptr<SpdyBuffer> data_buffer(session_->CreateDataBuffer( + stream_id_, + pending_send_data_, pending_send_data_->BytesRemaining(), + pending_send_flags_)); + // We'll get called again by PossiblyResumeIfSendStalled(). + if (!data_buffer) + return; + + if (session_->flow_control_state() >= SpdySession::FLOW_CONTROL_STREAM) { + DCHECK_GE(data_buffer->GetRemainingSize(), + session_->GetDataFrameMinimumSize()); + size_t payload_size = + data_buffer->GetRemainingSize() - session_->GetDataFrameMinimumSize(); + DCHECK_LE(payload_size, session_->GetDataFrameMaximumPayload()); + DecreaseSendWindowSize(static_cast<int32>(payload_size)); + // This currently isn't strictly needed, since write frames are + // discarded only if the stream is about to be closed. But have it + // here anyway just in case this changes. + data_buffer->AddConsumeCallback( + base::Bind(&SpdyStream::OnWriteBufferConsumed, + GetWeakPtr(), payload_size)); + } + + session_->EnqueueStreamWrite( + GetWeakPtr(), DATA, + scoped_ptr<SpdyBufferProducer>( + new SimpleBufferProducer(data_buffer.Pass()))); +} + } // namespace net diff --git a/net/spdy/spdy_stream.h b/net/spdy/spdy_stream.h index b9ecfe9..5f14f87 100644 --- a/net/spdy/spdy_stream.h +++ b/net/spdy/spdy_stream.h @@ -56,12 +56,12 @@ class NET_EXPORT_PRIVATE SpdyStream { public: Delegate() {} - // Called when SYN frame has been sent. - // Returns true if no more data to be sent after SYN frame. + // Called when SYN frame has been sent. Must return whether + // there's body data to send. virtual SpdySendStatus OnSendHeadersComplete() = 0; // Called when the stream is ready to send body data. The - // delegate must call QueueStreamData() on the stream, either + // delegate must call SendStreamData() on the stream, either // immediately or asynchronously (e.g., if the data to be send has // to be read asynchronously). // @@ -69,10 +69,9 @@ class NET_EXPORT_PRIVATE SpdyStream { // OnSendBodyComplete() returns MORE_DATA_TO_SEND. virtual void OnSendBody() = 0; - // Called when body data has been sent. |bytes_sent| is the number - // of bytes that has been sent (may be zero). Must return whether + // Called when body data has been sent. Must return whether // there's more body data to send. - virtual SpdySendStatus OnSendBodyComplete(size_t bytes_sent) = 0; + virtual SpdySendStatus OnSendBodyComplete() = 0; // Called when the SYN_STREAM, SYN_REPLY, or HEADERS frames are received. // Normal streams will receive a SYN_REPLY and optional HEADERS frames. @@ -93,7 +92,7 @@ class NET_EXPORT_PRIVATE SpdyStream { virtual int OnDataReceived(scoped_ptr<SpdyBuffer> buffer) = 0; // Called when data is sent. - virtual void OnDataSent(size_t bytes_sent) = 0; + virtual void OnDataSent() = 0; // Called when SpdyStream is closed. No other delegate functions // will be called after this is called, and the delegate must not @@ -288,19 +287,28 @@ class NET_EXPORT_PRIVATE SpdyStream { // this once crbug.com/113107 is addressed. bool body_sent() const { return io_state_ > STATE_SEND_BODY_COMPLETE; } - // Interface for Spdy[Http|WebSocket]Stream to use. + // Interface for the delegate to use. + // + // TODO(akalin): Mandate that only one send can be in flight at one + // time. // Sends the request. // For non push stream, it will send SYN_STREAM frame. int SendRequest(bool has_upload_data); - // Queues a HEADERS frame to be sent. - void QueueHeaders(scoped_ptr<SpdyHeaderBlock> headers); + // Sends a HEADERS frame. The delegate will be notified via + // OnHeadersSent() when the send is complete. + void SendHeaders(scoped_ptr<SpdyHeaderBlock> headers); - // Queues a DATA frame to be sent. May not queue all the data that - // is given (or even any of it) depending on flow control. - void QueueStreamData(IOBuffer* data, int length, - SpdyDataFlags flags); + // Sends a DATA frame. The delegate will be notified via + // OnSendBodyComplete() (if the response hasn't been received yet) + // or OnDataSent() (if the response has been received) when the send + // is complete. Only one data send can be in flight at one time. + // + // |flags| must be DATA_FLAG_NONE except for the last piece of data + // for a request in a request/response stream, where it should be + // DATA_FLAG_FIN. + void SendStreamData(IOBuffer* data, int length, SpdyDataFlags flags); // Fills SSL info in |ssl_info| and returns true when SSL is in use. bool GetSSLInfo(SSLInfo* ssl_info, @@ -390,6 +398,11 @@ class NET_EXPORT_PRIVATE SpdyStream { scoped_ptr<SpdyFrame> ProduceHeaderFrame( scoped_ptr<SpdyHeaderBlock> header_block); + // Queues the send for next frame of the remaining data in + // |pending_send_data_|. Must be called only when + // |pending_send_data_| and |pending_send_flags_| are set. + void QueueNextDataFrame(); + base::WeakPtrFactory<SpdyStream> weak_ptr_factory_; // Sentinel variable used to make sure we don't get destroyed by a @@ -424,6 +437,10 @@ class NET_EXPORT_PRIVATE SpdyStream { // The request to send. scoped_ptr<SpdyHeaderBlock> request_; + // The data waiting to be sent. + scoped_refptr<DrainableIOBuffer> pending_send_data_; + SpdyDataFlags pending_send_flags_; + // The time at which the request was made that resulted in this response. // For cached responses, this time could be "far" in the past. base::Time request_time_; diff --git a/net/spdy/spdy_stream_spdy2_unittest.cc b/net/spdy/spdy_stream_spdy2_unittest.cc index d8d0208..43dc11b 100644 --- a/net/spdy/spdy_stream_spdy2_unittest.cc +++ b/net/spdy/spdy_stream_spdy2_unittest.cc @@ -141,7 +141,7 @@ TEST_F(SpdyStreamSpdy2Test, SendDataAfterOpen) { EXPECT_EQ("HTTP/1.1", delegate.GetResponseHeaderValue("version")); EXPECT_EQ(std::string(kPostBody, kPostBodyLength), delegate.TakeReceivedData()); - EXPECT_EQ(static_cast<int>(kPostBodyLength), delegate.data_sent()); + EXPECT_TRUE(data.at_write_eof()); } TEST_F(SpdyStreamSpdy2Test, SendHeaderAndDataAfterOpen) { @@ -216,9 +216,8 @@ TEST_F(SpdyStreamSpdy2Test, SendHeaderAndDataAfterOpen) { EXPECT_TRUE(delegate.send_headers_completed()); EXPECT_EQ("101", delegate.GetResponseHeaderValue("status")); - EXPECT_EQ(1, delegate.headers_sent()); EXPECT_EQ(std::string(), delegate.TakeReceivedData()); - EXPECT_EQ(6, delegate.data_sent()); + EXPECT_TRUE(data.at_write_eof()); } TEST_F(SpdyStreamSpdy2Test, PushedStream) { @@ -334,7 +333,7 @@ TEST_F(SpdyStreamSpdy2Test, StreamError) { EXPECT_EQ("HTTP/1.1", delegate.GetResponseHeaderValue("version")); EXPECT_EQ(std::string(kPostBody, kPostBodyLength), delegate.TakeReceivedData()); - EXPECT_EQ(static_cast<int>(kPostBodyLength), delegate.data_sent()); + EXPECT_TRUE(data.at_write_eof()); // Check that the NetLog was filled reasonably. net::CapturingNetLog::CapturedEntryList entries; @@ -352,6 +351,133 @@ TEST_F(SpdyStreamSpdy2Test, StreamError) { EXPECT_EQ(static_cast<int>(stream_id), stream_id2); } +// Make sure that large blocks of data are properly split up into +// frame-sized chunks for a request/response (i.e., an HTTP-like) +// stream. +TEST_F(SpdyStreamSpdy2Test, SendLargeDataAfterOpenRequestResponse) { + GURL url(kStreamUrl); + + session_ = SpdySessionDependencies::SpdyCreateSession(&session_deps_); + + scoped_ptr<SpdyFrame> req( + ConstructSpdyPost(kStreamUrl, kPostBodyLength, NULL, 0)); + std::string chunk_data(kMaxSpdyFrameChunkSize, 'x'); + scoped_ptr<SpdyFrame> chunk( + ConstructSpdyBodyFrame( + 1, chunk_data.data(), chunk_data.length(), false)); + MockWrite writes[] = { + CreateMockWrite(*req, 0), + CreateMockWrite(*chunk, 1), + CreateMockWrite(*chunk, 2), + CreateMockWrite(*chunk, 3), + }; + + scoped_ptr<SpdyFrame> resp(ConstructSpdyPostSynReply(NULL, 0)); + MockRead reads[] = { + CreateMockRead(*resp, 4), + MockRead(ASYNC, 0, 0, 5), // EOF + }; + + OrderedSocketData data(reads, arraysize(reads), writes, arraysize(writes)); + MockConnect connect_data(SYNCHRONOUS, OK); + data.set_connect_data(connect_data); + + session_deps_.socket_factory->AddSocketDataProvider(&data); + + scoped_refptr<SpdySession> session(CreateSpdySession()); + + InitializeSpdySession(session, host_port_pair_); + + base::WeakPtr<SpdyStream> stream = + CreateStreamSynchronously(session, url, LOWEST, BoundNetLog()); + ASSERT_TRUE(stream.get() != NULL); + + std::string body_data(3 * kMaxSpdyFrameChunkSize, 'x'); + StreamDelegateWithBody delegate(stream, body_data); + stream->SetDelegate(&delegate); + + EXPECT_FALSE(stream->HasUrl()); + + stream->set_spdy_headers( + spdy_util_.ConstructPostHeaderBlock(kStreamUrl, kPostBodyLength)); + EXPECT_TRUE(stream->HasUrl()); + EXPECT_EQ(kStreamUrl, stream->GetUrl().spec()); + + EXPECT_EQ(ERR_IO_PENDING, stream->SendRequest(true)); + + EXPECT_EQ(ERR_CONNECTION_CLOSED, delegate.WaitForClose()); + + EXPECT_TRUE(delegate.send_headers_completed()); + EXPECT_EQ("200", delegate.GetResponseHeaderValue("status")); + EXPECT_EQ("HTTP/1.1", delegate.GetResponseHeaderValue("version")); + EXPECT_EQ(std::string(), delegate.TakeReceivedData()); + EXPECT_TRUE(data.at_write_eof()); +} + +// Make sure that large blocks of data are properly split up into +// frame-sized chunks for a bidirectional (i.e., non-HTTP-like) +// stream. +TEST_F(SpdyStreamSpdy2Test, SendLargeDataAfterOpenBidirectional) { + GURL url(kStreamUrl); + + session_ = SpdySessionDependencies::SpdyCreateSession(&session_deps_); + + scoped_ptr<SpdyFrame> req( + ConstructSpdyPost(kStreamUrl, kPostBodyLength, NULL, 0)); + std::string chunk_data(kMaxSpdyFrameChunkSize, 'x'); + scoped_ptr<SpdyFrame> chunk( + ConstructSpdyBodyFrame( + 1, chunk_data.data(), chunk_data.length(), false)); + MockWrite writes[] = { + CreateMockWrite(*req, 0), + CreateMockWrite(*chunk, 2), + CreateMockWrite(*chunk, 3), + CreateMockWrite(*chunk, 4), + }; + + scoped_ptr<SpdyFrame> resp(ConstructSpdyPostSynReply(NULL, 0)); + MockRead reads[] = { + CreateMockRead(*resp, 1), + MockRead(ASYNC, 0, 0, 5), // EOF + }; + + OrderedSocketData data(reads, arraysize(reads), writes, arraysize(writes)); + MockConnect connect_data(SYNCHRONOUS, OK); + data.set_connect_data(connect_data); + + session_deps_.socket_factory->AddSocketDataProvider(&data); + + scoped_refptr<SpdySession> session(CreateSpdySession()); + + InitializeSpdySession(session, host_port_pair_); + + base::WeakPtr<SpdyStream> stream = + CreateStreamSynchronously(session, url, LOWEST, BoundNetLog()); + ASSERT_TRUE(stream.get() != NULL); + + std::string body_data(3 * kMaxSpdyFrameChunkSize, 'x'); + StreamDelegateSendImmediate delegate( + stream, scoped_ptr<SpdyHeaderBlock>(), body_data); + stream->SetDelegate(&delegate); + + EXPECT_FALSE(stream->HasUrl()); + + stream->set_spdy_headers( + spdy_util_.ConstructPostHeaderBlock(kStreamUrl, kPostBodyLength)); + EXPECT_TRUE(stream->HasUrl()); + EXPECT_EQ(kStreamUrl, stream->GetUrl().spec()); + + EXPECT_EQ(ERR_IO_PENDING, stream->SendRequest(true)); + + EXPECT_EQ(ERR_CONNECTION_CLOSED, delegate.WaitForClose()); + + EXPECT_TRUE(delegate.send_headers_completed()); + EXPECT_EQ("200", delegate.GetResponseHeaderValue("status")); + EXPECT_EQ("HTTP/1.1", delegate.GetResponseHeaderValue("version")); + EXPECT_EQ(std::string(), delegate.TakeReceivedData()); + EXPECT_TRUE(data.at_write_eof()); +} + } // namespace } // namespace test diff --git a/net/spdy/spdy_stream_spdy3_unittest.cc b/net/spdy/spdy_stream_spdy3_unittest.cc index b08dcd1..db8f612 100644 --- a/net/spdy/spdy_stream_spdy3_unittest.cc +++ b/net/spdy/spdy_stream_spdy3_unittest.cc @@ -150,7 +150,7 @@ TEST_F(SpdyStreamSpdy3Test, SendDataAfterOpen) { EXPECT_EQ("HTTP/1.1", delegate.GetResponseHeaderValue(":version")); EXPECT_EQ(std::string(kPostBody, kPostBodyLength), delegate.TakeReceivedData()); - EXPECT_EQ(static_cast<int>(kPostBodyLength), delegate.data_sent()); + EXPECT_TRUE(data.at_write_eof()); } TEST_F(SpdyStreamSpdy3Test, SendHeaderAndDataAfterOpen) { @@ -226,9 +226,8 @@ TEST_F(SpdyStreamSpdy3Test, SendHeaderAndDataAfterOpen) { EXPECT_TRUE(delegate.send_headers_completed()); EXPECT_EQ("101", delegate.GetResponseHeaderValue(":status")); - EXPECT_EQ(1, delegate.headers_sent()); EXPECT_EQ(std::string(), delegate.TakeReceivedData()); - EXPECT_EQ(6, delegate.data_sent()); + EXPECT_TRUE(data.at_write_eof()); } TEST_F(SpdyStreamSpdy3Test, PushedStream) { @@ -347,7 +346,7 @@ TEST_F(SpdyStreamSpdy3Test, StreamError) { EXPECT_EQ("HTTP/1.1", delegate.GetResponseHeaderValue(":version")); EXPECT_EQ(std::string(kPostBody, kPostBodyLength), delegate.TakeReceivedData()); - EXPECT_EQ(static_cast<int>(kPostBodyLength), delegate.data_sent()); + EXPECT_TRUE(data.at_write_eof()); // Check that the NetLog was filled reasonably. net::CapturingNetLog::CapturedEntryList entries; @@ -365,6 +364,133 @@ TEST_F(SpdyStreamSpdy3Test, StreamError) { EXPECT_EQ(static_cast<int>(stream_id), stream_id2); } +// Make sure that large blocks of data are properly split up into +// frame-sized chunks for a request/response (i.e., an HTTP-like) +// stream. +TEST_F(SpdyStreamSpdy3Test, SendLargeDataAfterOpenRequestResponse) { + GURL url(kStreamUrl); + + session_ = SpdySessionDependencies::SpdyCreateSession(&session_deps_); + + scoped_ptr<SpdyFrame> req( + ConstructSpdyPost(kStreamUrl, 1, kPostBodyLength, LOWEST, NULL, 0)); + std::string chunk_data(kMaxSpdyFrameChunkSize, 'x'); + scoped_ptr<SpdyFrame> chunk( + ConstructSpdyBodyFrame( + 1, chunk_data.data(), chunk_data.length(), false)); + MockWrite writes[] = { + CreateMockWrite(*req, 0), + CreateMockWrite(*chunk, 1), + CreateMockWrite(*chunk, 2), + CreateMockWrite(*chunk, 3), + }; + + scoped_ptr<SpdyFrame> resp(ConstructSpdyPostSynReply(NULL, 0)); + MockRead reads[] = { + CreateMockRead(*resp, 4), + MockRead(ASYNC, 0, 0, 5), // EOF + }; + + OrderedSocketData data(reads, arraysize(reads), writes, arraysize(writes)); + MockConnect connect_data(SYNCHRONOUS, OK); + data.set_connect_data(connect_data); + + session_deps_.socket_factory->AddSocketDataProvider(&data); + + scoped_refptr<SpdySession> session(CreateSpdySession()); + + InitializeSpdySession(session, host_port_pair_); + + base::WeakPtr<SpdyStream> stream = + CreateStreamSynchronously(session, url, LOWEST, BoundNetLog()); + ASSERT_TRUE(stream.get() != NULL); + + std::string body_data(3 * kMaxSpdyFrameChunkSize, 'x'); + StreamDelegateWithBody delegate(stream, body_data); + stream->SetDelegate(&delegate); + + EXPECT_FALSE(stream->HasUrl()); + + stream->set_spdy_headers( + spdy_util_.ConstructPostHeaderBlock(kStreamUrl, kPostBodyLength)); + EXPECT_TRUE(stream->HasUrl()); + EXPECT_EQ(kStreamUrl, stream->GetUrl().spec()); + + EXPECT_EQ(ERR_IO_PENDING, stream->SendRequest(true)); + + EXPECT_EQ(ERR_CONNECTION_CLOSED, delegate.WaitForClose()); + + EXPECT_TRUE(delegate.send_headers_completed()); + EXPECT_EQ("200", delegate.GetResponseHeaderValue(":status")); + EXPECT_EQ("HTTP/1.1", delegate.GetResponseHeaderValue(":version")); + EXPECT_EQ(std::string(), delegate.TakeReceivedData()); + EXPECT_TRUE(data.at_write_eof()); +} + +// Make sure that large blocks of data are properly split up into +// frame-sized chunks for a bidirectional (i.e., non-HTTP-like) +// stream. +TEST_F(SpdyStreamSpdy3Test, SendLargeDataAfterOpenBidirectional) { + GURL url(kStreamUrl); + + session_ = SpdySessionDependencies::SpdyCreateSession(&session_deps_); + + scoped_ptr<SpdyFrame> req( + ConstructSpdyPost(kStreamUrl, 1, kPostBodyLength, LOWEST, NULL, 0)); + std::string chunk_data(kMaxSpdyFrameChunkSize, 'x'); + scoped_ptr<SpdyFrame> chunk( + ConstructSpdyBodyFrame( + 1, chunk_data.data(), chunk_data.length(), false)); + MockWrite writes[] = { + CreateMockWrite(*req, 0), + CreateMockWrite(*chunk, 2), + CreateMockWrite(*chunk, 3), + CreateMockWrite(*chunk, 4), + }; + + scoped_ptr<SpdyFrame> resp(ConstructSpdyPostSynReply(NULL, 0)); + MockRead reads[] = { + CreateMockRead(*resp, 1), + MockRead(ASYNC, 0, 0, 5), // EOF + }; + + OrderedSocketData data(reads, arraysize(reads), writes, arraysize(writes)); + MockConnect connect_data(SYNCHRONOUS, OK); + data.set_connect_data(connect_data); + + session_deps_.socket_factory->AddSocketDataProvider(&data); + + scoped_refptr<SpdySession> session(CreateSpdySession()); + + InitializeSpdySession(session, host_port_pair_); + + base::WeakPtr<SpdyStream> stream = + CreateStreamSynchronously(session, url, LOWEST, BoundNetLog()); + ASSERT_TRUE(stream.get() != NULL); + + std::string body_data(3 * kMaxSpdyFrameChunkSize, 'x'); + StreamDelegateSendImmediate delegate( + stream, scoped_ptr<SpdyHeaderBlock>(), body_data); + stream->SetDelegate(&delegate); + + EXPECT_FALSE(stream->HasUrl()); + + stream->set_spdy_headers( + spdy_util_.ConstructPostHeaderBlock(kStreamUrl, kPostBodyLength)); + EXPECT_TRUE(stream->HasUrl()); + EXPECT_EQ(kStreamUrl, stream->GetUrl().spec()); + + EXPECT_EQ(ERR_IO_PENDING, stream->SendRequest(true)); + + EXPECT_EQ(ERR_CONNECTION_CLOSED, delegate.WaitForClose()); + + EXPECT_TRUE(delegate.send_headers_completed()); + EXPECT_EQ("200", delegate.GetResponseHeaderValue(":status")); + EXPECT_EQ("HTTP/1.1", delegate.GetResponseHeaderValue(":version")); + EXPECT_EQ(std::string(), delegate.TakeReceivedData()); + EXPECT_TRUE(data.at_write_eof()); +} + // Call IncreaseSendWindowSize on a stream with a large enough delta // to overflow an int32. The SpdyStream should handle that case // gracefully. @@ -530,7 +656,7 @@ void SpdyStreamSpdy3Test::RunResumeAfterUnstallRequestResponseTest( EXPECT_EQ("200", delegate.GetResponseHeaderValue(":status")); EXPECT_EQ("HTTP/1.1", delegate.GetResponseHeaderValue(":version")); EXPECT_EQ(std::string(), delegate.TakeReceivedData()); - EXPECT_EQ(static_cast<int>(kPostBodyLength), delegate.body_data_sent()); + EXPECT_TRUE(data.at_write_eof()); } TEST_F(SpdyStreamSpdy3Test, ResumeAfterSendWindowSizeIncreaseRequestResponse) { @@ -622,19 +748,15 @@ void SpdyStreamSpdy3Test::RunResumeAfterUnstallBidirectionalTest( EXPECT_EQ("HTTP/1.1", delegate.GetResponseHeaderValue(":version")); EXPECT_EQ(std::string(kPostBody, kPostBodyLength), delegate.TakeReceivedData()); - EXPECT_EQ(static_cast<int>(kPostBodyLength), delegate.data_sent()); + EXPECT_TRUE(data.at_write_eof()); } -// TODO(akalin): Re-enable these when http://crbug.com/242288 is -// fixed. -TEST_F(SpdyStreamSpdy3Test, - DISABLED_ResumeAfterSendWindowSizeIncreaseBidirectional) { +TEST_F(SpdyStreamSpdy3Test, ResumeAfterSendWindowSizeIncreaseBidirectional) { RunResumeAfterUnstallBidirectionalTest( base::Bind(&IncreaseStreamSendWindowSize)); } -TEST_F(SpdyStreamSpdy3Test, - DISABLED_ResumeAfterSendWindowSizeAdjustBidirectional) { +TEST_F(SpdyStreamSpdy3Test, ResumeAfterSendWindowSizeAdjustBidirectional) { RunResumeAfterUnstallBidirectionalTest( base::Bind(&AdjustStreamSendWindowSize)); } diff --git a/net/spdy/spdy_stream_test_util.cc b/net/spdy/spdy_stream_test_util.cc index c9a06fd..3055267 100644 --- a/net/spdy/spdy_stream_test_util.cc +++ b/net/spdy/spdy_stream_test_util.cc @@ -28,7 +28,7 @@ void ClosingDelegate::OnSendBody() { ADD_FAILURE() << "OnSendBody should not be called"; } -SpdySendStatus ClosingDelegate::OnSendBodyComplete(size_t /*bytes_sent*/) { +SpdySendStatus ClosingDelegate::OnSendBodyComplete() { return NO_MORE_DATA_TO_SEND; } @@ -44,7 +44,7 @@ int ClosingDelegate::OnDataReceived(scoped_ptr<SpdyBuffer> buffer) { return OK; } -void ClosingDelegate::OnDataSent(size_t bytes_sent) {} +void ClosingDelegate::OnDataSent() {} void ClosingDelegate::OnClose(int status) { DCHECK(stream_); @@ -56,9 +56,7 @@ StreamDelegateBase::StreamDelegateBase( const base::WeakPtr<SpdyStream>& stream) : stream_(stream), stream_id_(0), - send_headers_completed_(false), - headers_sent_(0), - data_sent_(0) { + send_headers_completed_(false) { } StreamDelegateBase::~StreamDelegateBase() { @@ -79,9 +77,7 @@ int StreamDelegateBase::OnResponseReceived(const SpdyHeaderBlock& response, return status; } -void StreamDelegateBase::OnHeadersSent() { - headers_sent_++; -} +void StreamDelegateBase::OnHeadersSent() {} int StreamDelegateBase::OnDataReceived(scoped_ptr<SpdyBuffer> buffer) { if (buffer) @@ -89,9 +85,7 @@ int StreamDelegateBase::OnDataReceived(scoped_ptr<SpdyBuffer> buffer) { return OK; } -void StreamDelegateBase::OnDataSent(size_t bytes_sent) { - data_sent_ += bytes_sent; -} +void StreamDelegateBase::OnDataSent() {} void StreamDelegateBase::OnClose(int status) { if (!stream_) @@ -135,8 +129,7 @@ void StreamDelegateDoNothing::OnSendBody() { ADD_FAILURE() << "OnSendBody should not be called"; } -SpdySendStatus StreamDelegateDoNothing::OnSendBodyComplete( - size_t /*bytes_sent*/) { +SpdySendStatus StreamDelegateDoNothing::OnSendBodyComplete() { return NO_MORE_DATA_TO_SEND; } @@ -155,8 +148,7 @@ void StreamDelegateSendImmediate::OnSendBody() { ADD_FAILURE() << "OnSendBody should not be called"; } -SpdySendStatus StreamDelegateSendImmediate::OnSendBodyComplete( - size_t /*bytes_sent*/) { +SpdySendStatus StreamDelegateSendImmediate::OnSendBodyComplete() { ADD_FAILURE() << "OnSendBodyComplete should not be called"; return NO_MORE_DATA_TO_SEND; } @@ -168,11 +160,11 @@ int StreamDelegateSendImmediate::OnResponseReceived( status = StreamDelegateBase::OnResponseReceived(response, response_time, status); if (headers_.get()) { - stream()->QueueHeaders(headers_.Pass()); + stream()->SendHeaders(headers_.Pass()); } if (data_.data()) { scoped_refptr<StringIOBuffer> buf(new StringIOBuffer(data_.as_string())); - stream()->QueueStreamData(buf, buf->size(), DATA_FLAG_NONE); + stream()->SendStreamData(buf, buf->size(), DATA_FLAG_NONE); } return status; } @@ -181,9 +173,7 @@ StreamDelegateWithBody::StreamDelegateWithBody( const base::WeakPtr<SpdyStream>& stream, base::StringPiece data) : StreamDelegateBase(stream), - buf_(new DrainableIOBuffer(new StringIOBuffer(data.as_string()), - data.size())), - body_data_sent_(0) {} + buf_(new StringIOBuffer(data.as_string())) {} StreamDelegateWithBody::~StreamDelegateWithBody() { } @@ -194,20 +184,10 @@ SpdySendStatus StreamDelegateWithBody::OnSendHeadersComplete() { } void StreamDelegateWithBody::OnSendBody() { - stream()->QueueStreamData(buf_.get(), buf_->BytesRemaining(), - DATA_FLAG_NONE); + stream()->SendStreamData(buf_.get(), buf_->size(), DATA_FLAG_NONE); } -SpdySendStatus StreamDelegateWithBody::OnSendBodyComplete(size_t bytes_sent) { - EXPECT_GT(bytes_sent, 0u); - - buf_->DidConsume(bytes_sent); - body_data_sent_ += bytes_sent; - if (buf_->BytesRemaining() > 0) { - // Go back to OnSendBody() to send the remaining data. - return MORE_DATA_TO_SEND; - } - +SpdySendStatus StreamDelegateWithBody::OnSendBodyComplete() { return NO_MORE_DATA_TO_SEND; } diff --git a/net/spdy/spdy_stream_test_util.h b/net/spdy/spdy_stream_test_util.h index 334f1eb..d99c967 100644 --- a/net/spdy/spdy_stream_test_util.h +++ b/net/spdy/spdy_stream_test_util.h @@ -28,13 +28,13 @@ class ClosingDelegate : public SpdyStream::Delegate { // SpdyStream::Delegate implementation. virtual SpdySendStatus OnSendHeadersComplete() OVERRIDE; virtual void OnSendBody() OVERRIDE; - virtual SpdySendStatus OnSendBodyComplete(size_t bytes_sent) OVERRIDE; + virtual SpdySendStatus OnSendBodyComplete() OVERRIDE; virtual int OnResponseReceived(const SpdyHeaderBlock& response, base::Time response_time, int status) OVERRIDE; virtual void OnHeadersSent() OVERRIDE; virtual int OnDataReceived(scoped_ptr<SpdyBuffer> buffer) OVERRIDE; - virtual void OnDataSent(size_t bytes_sent) OVERRIDE; + virtual void OnDataSent() OVERRIDE; virtual void OnClose(int status) OVERRIDE; // Returns whether or not the stream is closed. @@ -53,13 +53,13 @@ class StreamDelegateBase : public SpdyStream::Delegate { virtual SpdySendStatus OnSendHeadersComplete() OVERRIDE; virtual void OnSendBody() = 0; - virtual SpdySendStatus OnSendBodyComplete(size_t bytes_sent) = 0; + virtual SpdySendStatus OnSendBodyComplete() = 0; virtual int OnResponseReceived(const SpdyHeaderBlock& response, base::Time response_time, int status) OVERRIDE; virtual void OnHeadersSent() OVERRIDE; virtual int OnDataReceived(scoped_ptr<SpdyBuffer> buffer) OVERRIDE; - virtual void OnDataSent(size_t bytes_sent) OVERRIDE; + virtual void OnDataSent() OVERRIDE; virtual void OnClose(int status) OVERRIDE; // Waits for the stream to be closed and returns the status passed @@ -79,8 +79,6 @@ class StreamDelegateBase : public SpdyStream::Delegate { std::string GetResponseHeaderValue(const std::string& name) const; bool send_headers_completed() const { return send_headers_completed_; } - int headers_sent() const { return headers_sent_; } - int data_sent() const { return data_sent_; } protected: const base::WeakPtr<SpdyStream>& stream() { return stream_; } @@ -92,8 +90,6 @@ class StreamDelegateBase : public SpdyStream::Delegate { bool send_headers_completed_; SpdyHeaderBlock response_; SpdyReadQueue received_data_queue_; - int headers_sent_; - int data_sent_; }; // Test delegate that does nothing. Used to capture data about the @@ -104,7 +100,7 @@ class StreamDelegateDoNothing : public StreamDelegateBase { virtual ~StreamDelegateDoNothing(); virtual void OnSendBody() OVERRIDE; - virtual SpdySendStatus OnSendBodyComplete(size_t bytes_sent) OVERRIDE; + virtual SpdySendStatus OnSendBodyComplete() OVERRIDE; }; // Test delegate that sends data immediately in OnResponseReceived(). @@ -117,7 +113,7 @@ class StreamDelegateSendImmediate : public StreamDelegateBase { virtual ~StreamDelegateSendImmediate(); virtual void OnSendBody() OVERRIDE; - virtual SpdySendStatus OnSendBodyComplete(size_t bytes_sent) OVERRIDE; + virtual SpdySendStatus OnSendBodyComplete() OVERRIDE; virtual int OnResponseReceived(const SpdyHeaderBlock& response, base::Time response_time, int status) OVERRIDE; @@ -136,13 +132,10 @@ class StreamDelegateWithBody : public StreamDelegateBase { virtual SpdySendStatus OnSendHeadersComplete() OVERRIDE; virtual void OnSendBody() OVERRIDE; - virtual SpdySendStatus OnSendBodyComplete(size_t bytes_sent) OVERRIDE; - - int body_data_sent() const { return body_data_sent_; } + virtual SpdySendStatus OnSendBodyComplete() OVERRIDE; private: - scoped_refptr<DrainableIOBuffer> buf_; - int body_data_sent_; + scoped_refptr<StringIOBuffer> buf_; }; } // namespace test diff --git a/net/spdy/spdy_websocket_stream.cc b/net/spdy/spdy_websocket_stream.cc index 8dc2f4a..aa75978 100644 --- a/net/spdy/spdy_websocket_stream.cc +++ b/net/spdy/spdy_websocket_stream.cc @@ -21,6 +21,7 @@ SpdyWebSocketStream::SpdyWebSocketStream( SpdySession* spdy_session, Delegate* delegate) : weak_ptr_factory_(this), spdy_session_(spdy_session), + pending_send_data_length_(0), delegate_(delegate) { DCHECK(spdy_session_); DCHECK(delegate_); @@ -67,9 +68,11 @@ int SpdyWebSocketStream::SendData(const char* data, int length) { NOTREACHED(); return ERR_UNEXPECTED; } + DCHECK_GE(length, 0); + pending_send_data_length_ = static_cast<size_t>(length); scoped_refptr<IOBuffer> buf(new IOBuffer(length)); memcpy(buf->data(), data, length); - stream_->QueueStreamData(buf.get(), length, DATA_FLAG_NONE); + stream_->SendStreamData(buf.get(), length, DATA_FLAG_NONE); return ERR_IO_PENDING; } @@ -90,7 +93,7 @@ void SpdyWebSocketStream::OnSendBody() { CHECK(false); } -SpdySendStatus SpdyWebSocketStream::OnSendBodyComplete(size_t bytes_sent) { +SpdySendStatus SpdyWebSocketStream::OnSendBodyComplete() { CHECK(false); return NO_MORE_DATA_TO_SEND; } @@ -113,9 +116,10 @@ int SpdyWebSocketStream::OnDataReceived(scoped_ptr<SpdyBuffer> buffer) { return OK; } -void SpdyWebSocketStream::OnDataSent(size_t bytes_sent) { +void SpdyWebSocketStream::OnDataSent() { DCHECK(delegate_); - delegate_->OnSentSpdyData(bytes_sent); + delegate_->OnSentSpdyData(pending_send_data_length_); + pending_send_data_length_ = 0; } void SpdyWebSocketStream::OnClose(int status) { diff --git a/net/spdy/spdy_websocket_stream.h b/net/spdy/spdy_websocket_stream.h index 77fe1ed..d3290404 100644 --- a/net/spdy/spdy_websocket_stream.h +++ b/net/spdy/spdy_websocket_stream.h @@ -76,13 +76,13 @@ class NET_EXPORT_PRIVATE SpdyWebSocketStream // SpdyStream::Delegate virtual SpdySendStatus OnSendHeadersComplete() OVERRIDE; virtual void OnSendBody() OVERRIDE; - virtual SpdySendStatus OnSendBodyComplete(size_t bytes_sent) OVERRIDE; + virtual SpdySendStatus OnSendBodyComplete() OVERRIDE; virtual int OnResponseReceived(const SpdyHeaderBlock& response, base::Time response_time, int status) OVERRIDE; virtual void OnHeadersSent() OVERRIDE; virtual int OnDataReceived(scoped_ptr<SpdyBuffer> buffer) OVERRIDE; - virtual void OnDataSent(size_t bytes_sent) OVERRIDE; + virtual void OnDataSent() OVERRIDE; virtual void OnClose(int status) OVERRIDE; private: @@ -97,6 +97,7 @@ class NET_EXPORT_PRIVATE SpdyWebSocketStream SpdyStreamRequest stream_request_; base::WeakPtr<SpdyStream> stream_; scoped_refptr<SpdySession> spdy_session_; + size_t pending_send_data_length_; Delegate* delegate_; DISALLOW_COPY_AND_ASSIGN(SpdyWebSocketStream); |