summaryrefslogtreecommitdiffstats
path: root/net/spdy
diff options
context:
space:
mode:
authorakalin@chromium.org <akalin@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2013-05-23 16:41:38 +0000
committerakalin@chromium.org <akalin@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2013-05-23 16:41:38 +0000
commitaa19cfca991c78d1322f25aef3d62f48bc519f9f (patch)
treea9bcfec0de6b561d56d8c44d647d9a26e2548aa3 /net/spdy
parent595ec5160ffd04c3afcdacea05ef054faf49e5e6 (diff)
downloadchromium_src-aa19cfca991c78d1322f25aef3d62f48bc519f9f.zip
chromium_src-aa19cfca991c78d1322f25aef3d62f48bc519f9f.tar.gz
chromium_src-aa19cfca991c78d1322f25aef3d62f48bc519f9f.tar.bz2
[SPDY] Change SpdyStream::QueueStreamData() To SendStreamData()
Change its semantics so that it holds a reference to the passed-in data and calls back to the delegate only when all the data has been sent. Fix bug where flow control wouldn't work for non-HTTP streams. Now it just queues up the next data frame (which must exist, since it must have been trying to send something when it got stalled). Remove now-redundant bytes_sent parameters from SpdyStream::Delegate methods. Remove now-redundant DrainableIOBuffer member variable in SpdyHttpStream. Remove now-redundant logic in SpdyProxyClientSocket to split up data to send into frames. Add SpdyStream tests for sending large blobs of data. Enable bidirectional flow control tests for SpdyStream. BUG=242288 Review URL: https://chromiumcodereview.appspot.com/15740018 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@201828 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'net/spdy')
-rw-r--r--net/spdy/spdy_http_stream.cc37
-rw-r--r--net/spdy/spdy_http_stream.h7
-rw-r--r--net/spdy/spdy_proxy_client_socket.cc56
-rw-r--r--net/spdy/spdy_proxy_client_socket.h6
-rw-r--r--net/spdy/spdy_session_spdy3_unittest.cc24
-rw-r--r--net/spdy/spdy_stream.cc117
-rw-r--r--net/spdy/spdy_stream.h45
-rw-r--r--net/spdy/spdy_stream_spdy2_unittest.cc134
-rw-r--r--net/spdy/spdy_stream_spdy3_unittest.cc146
-rw-r--r--net/spdy/spdy_stream_test_util.cc44
-rw-r--r--net/spdy/spdy_stream_test_util.h23
-rw-r--r--net/spdy/spdy_websocket_stream.cc12
-rw-r--r--net/spdy/spdy_websocket_stream.h5
13 files changed, 444 insertions, 212 deletions
diff --git a/net/spdy/spdy_http_stream.cc b/net/spdy/spdy_http_stream.cc
index 3129412..0545773 100644
--- a/net/spdy/spdy_http_stream.cc
+++ b/net/spdy/spdy_http_stream.cc
@@ -38,6 +38,7 @@ SpdyHttpStream::SpdyHttpStream(SpdySession* spdy_session,
response_info_(NULL),
response_headers_received_(false),
user_buffer_len_(0),
+ raw_request_body_buf_size_(0),
buffered_read_callback_pending_(false),
more_read_data_pending_(false),
direct_(direct) {}
@@ -236,7 +237,7 @@ int SpdyHttpStream::SendRequest(const HttpRequestHeaders& request_headers,
// body data is written with this size at a time.
raw_request_body_buf_ = new IOBufferWithSize(kMaxSpdyFrameChunkSize);
// The request body buffer is empty at first.
- request_body_buf_ = new DrainableIOBuffer(raw_request_body_buf_, 0);
+ raw_request_body_buf_size_ = 0;
}
CHECK(!callback.is_null());
@@ -288,7 +289,7 @@ SpdySendStatus SpdyHttpStream::OnSendHeadersComplete() {
void SpdyHttpStream::OnSendBody() {
CHECK(request_info_ && request_info_->upload_data_stream);
- if (request_body_buf_->BytesRemaining() > 0) {
+ if (raw_request_body_buf_size_ > 0) {
SendRequestBodyData();
} else {
// We shouldn't be called if there's no more data to read.
@@ -297,20 +298,14 @@ void SpdyHttpStream::OnSendBody() {
}
}
-SpdySendStatus SpdyHttpStream::OnSendBodyComplete(size_t bytes_sent) {
+SpdySendStatus SpdyHttpStream::OnSendBodyComplete() {
// |status| is the number of bytes written to the SPDY stream.
CHECK(request_info_ && request_info_->upload_data_stream);
- DCHECK_GE(static_cast<int>(bytes_sent), 0);
- DCHECK_LE(static_cast<int>(bytes_sent), request_body_buf_->BytesRemaining());
-
- request_body_buf_->DidConsume(static_cast<int>(bytes_sent));
+ raw_request_body_buf_size_ = 0;
// Check for more data to send.
- if (!request_info_->upload_data_stream->IsEOF() ||
- (request_body_buf_->BytesRemaining() > 0))
- return MORE_DATA_TO_SEND;
-
- return NO_MORE_DATA_TO_SEND;
+ return request_info_->upload_data_stream->IsEOF() ?
+ NO_MORE_DATA_TO_SEND : MORE_DATA_TO_SEND;
}
int SpdyHttpStream::OnResponseReceived(const SpdyHeaderBlock& response,
@@ -402,10 +397,10 @@ int SpdyHttpStream::OnDataReceived(scoped_ptr<SpdyBuffer> buffer) {
return OK;
}
-void SpdyHttpStream::OnDataSent(size_t /*bytes_sent*/) {
+void SpdyHttpStream::OnDataSent() {
// For HTTP streams, no data is sent from the client while in the OPEN state,
// so it is never called.
- NOTREACHED();
+ CHECK(false);
}
void SpdyHttpStream::OnClose(int status) {
@@ -437,7 +432,7 @@ void SpdyHttpStream::OnStreamCreated(
void SpdyHttpStream::ReadAndSendRequestBodyData() {
CHECK(request_info_ && request_info_->upload_data_stream);
- CHECK_EQ(0, request_body_buf_->BytesRemaining());
+ CHECK_EQ(raw_request_body_buf_size_, 0);
// Read the data from the request body stream.
const int rv = request_info_->upload_data_stream->Read(
@@ -455,20 +450,20 @@ void SpdyHttpStream::ReadAndSendRequestBodyData() {
void SpdyHttpStream::OnRequestBodyReadCompleted(int status) {
CHECK_GE(status, 0);
- request_body_buf_ = new DrainableIOBuffer(raw_request_body_buf_, status);
+ raw_request_body_buf_size_ = status;
SendRequestBodyData();
}
void SpdyHttpStream::SendRequestBodyData() {
const bool eof = request_info_->upload_data_stream->IsEOF();
if (eof) {
- CHECK_GE(request_body_buf_->BytesRemaining(), 0);
+ CHECK_GE(raw_request_body_buf_size_, 0);
} else {
- CHECK_GT(request_body_buf_->BytesRemaining(), 0);
+ CHECK_GT(raw_request_body_buf_size_, 0);
}
- stream_->QueueStreamData(request_body_buf_,
- request_body_buf_->BytesRemaining(),
- eof ? DATA_FLAG_FIN : DATA_FLAG_NONE);
+ stream_->SendStreamData(raw_request_body_buf_,
+ raw_request_body_buf_size_,
+ eof ? DATA_FLAG_FIN : DATA_FLAG_NONE);
}
void SpdyHttpStream::ScheduleBufferedReadCallback() {
diff --git a/net/spdy/spdy_http_stream.h b/net/spdy/spdy_http_stream.h
index 5f405d3..1fdc3cb 100644
--- a/net/spdy/spdy_http_stream.h
+++ b/net/spdy/spdy_http_stream.h
@@ -86,13 +86,13 @@ class NET_EXPORT_PRIVATE SpdyHttpStream : public SpdyStream::Delegate,
// SpdyStream::Delegate implementation.
virtual SpdySendStatus OnSendHeadersComplete() OVERRIDE;
virtual void OnSendBody() OVERRIDE;
- virtual SpdySendStatus OnSendBodyComplete(size_t bytes_sent) OVERRIDE;
+ virtual SpdySendStatus OnSendBodyComplete() OVERRIDE;
virtual int OnResponseReceived(const SpdyHeaderBlock& response,
base::Time response_time,
int status) OVERRIDE;
virtual void OnHeadersSent() OVERRIDE;
virtual int OnDataReceived(scoped_ptr<SpdyBuffer> buffer) OVERRIDE;
- virtual void OnDataSent(size_t bytes_sent) OVERRIDE;
+ virtual void OnDataSent() OVERRIDE;
virtual void OnClose(int status) OVERRIDE;
private:
@@ -157,8 +157,7 @@ class NET_EXPORT_PRIVATE SpdyHttpStream : public SpdyStream::Delegate,
// Temporary buffer used to read the request body from UploadDataStream.
scoped_refptr<IOBufferWithSize> raw_request_body_buf_;
- // Wraps raw_request_body_buf_ to read the remaining data progressively.
- scoped_refptr<DrainableIOBuffer> request_body_buf_;
+ int raw_request_body_buf_size_;
// Is there a scheduled read callback pending.
bool buffered_read_callback_pending_;
diff --git a/net/spdy/spdy_proxy_client_socket.cc b/net/spdy/spdy_proxy_client_socket.cc
index 5bc617c..43ec5b2 100644
--- a/net/spdy/spdy_proxy_client_socket.cc
+++ b/net/spdy/spdy_proxy_client_socket.cc
@@ -8,6 +8,7 @@
#include "base/bind.h"
#include "base/bind_helpers.h"
+#include "base/callback_helpers.h"
#include "base/logging.h"
#include "base/string_util.h"
#include "base/values.h"
@@ -41,7 +42,6 @@ SpdyProxyClientSocket::SpdyProxyClientSocket(
auth_handler_factory)),
user_buffer_len_(0),
write_buffer_len_(0),
- write_bytes_outstanding_(0),
weak_factory_(this),
net_log_(BoundNetLog::Make(spdy_stream->net_log().net_log(),
NetLog::SOURCE_PROXY_CLIENT_SOCKET)) {
@@ -133,7 +133,6 @@ void SpdyProxyClientSocket::Disconnect() {
read_callback_.Reset();
write_buffer_len_ = 0;
- write_bytes_outstanding_ = 0;
write_callback_.Reset();
next_state_ = STATE_DISCONNECTED;
@@ -227,33 +226,12 @@ int SpdyProxyClientSocket::Write(IOBuffer* buf, int buf_len,
return ERR_SOCKET_NOT_CONNECTED;
DCHECK(spdy_stream_);
- write_bytes_outstanding_= buf_len;
- if (buf_len <= kMaxSpdyFrameChunkSize) {
- spdy_stream_->QueueStreamData(buf, buf_len, DATA_FLAG_NONE);
- net_log_.AddByteTransferEvent(NetLog::TYPE_SOCKET_BYTES_SENT,
- buf_len, buf->data());
- write_callback_ = callback;
- write_buffer_len_ = buf_len;
- return ERR_IO_PENDING;
- }
-
- // Since a SPDY Data frame can only include kMaxSpdyFrameChunkSize bytes
- // we need to send multiple data frames
- for (int i = 0; i < buf_len; i += kMaxSpdyFrameChunkSize) {
- int len = std::min(kMaxSpdyFrameChunkSize, buf_len - i);
- scoped_refptr<DrainableIOBuffer> iobuf(new DrainableIOBuffer(buf, i + len));
- iobuf->SetOffset(i);
- spdy_stream_->QueueStreamData(iobuf, len, DATA_FLAG_NONE);
- net_log_.AddByteTransferEvent(NetLog::TYPE_SOCKET_BYTES_SENT,
- len, buf->data());
- }
- if (write_bytes_outstanding_ > 0) {
- write_callback_ = callback;
- write_buffer_len_ = buf_len;
- return ERR_IO_PENDING;
- } else {
- return buf_len;
- }
+ spdy_stream_->SendStreamData(buf, buf_len, DATA_FLAG_NONE);
+ net_log_.AddByteTransferEvent(NetLog::TYPE_SOCKET_BYTES_SENT,
+ buf_len, buf->data());
+ write_callback_ = callback;
+ write_buffer_len_ = buf_len;
+ return ERR_IO_PENDING;
}
bool SpdyProxyClientSocket::SetReceiveBufferSize(int32 size) {
@@ -474,8 +452,7 @@ void SpdyProxyClientSocket::OnSendBody() {
CHECK(false);
}
-SpdySendStatus SpdyProxyClientSocket::OnSendBodyComplete(
- size_t /*bytes_sent*/) {
+SpdySendStatus SpdyProxyClientSocket::OnSendBodyComplete() {
// Because we use |spdy_stream_| via STATE_OPEN (ala WebSockets)
// OnSendBodyComplete() must never be called.
CHECK(false);
@@ -528,20 +505,12 @@ int SpdyProxyClientSocket::OnDataReceived(scoped_ptr<SpdyBuffer> buffer) {
return OK;
}
-void SpdyProxyClientSocket::OnDataSent(size_t bytes_sent) {
+void SpdyProxyClientSocket::OnDataSent() {
DCHECK(!write_callback_.is_null());
- DCHECK_LE(static_cast<int>(bytes_sent), write_bytes_outstanding_);
- write_bytes_outstanding_ -= static_cast<int>(bytes_sent);
-
- if (write_bytes_outstanding_ == 0) {
- int rv = write_buffer_len_;
- write_buffer_len_ = 0;
- write_bytes_outstanding_ = 0;
- CompletionCallback c = write_callback_;
- write_callback_.Reset();
- c.Run(rv);
- }
+ int rv = write_buffer_len_;
+ write_buffer_len_ = 0;
+ ResetAndReturn(&write_callback_).Run(rv);
}
void SpdyProxyClientSocket::OnClose(int status) {
@@ -559,7 +528,6 @@ void SpdyProxyClientSocket::OnClose(int status) {
CompletionCallback write_callback = write_callback_;
write_callback_.Reset();
write_buffer_len_ = 0;
- write_bytes_outstanding_ = 0;
// If we're in the middle of connecting, we need to make sure
// we invoke the connect callback.
diff --git a/net/spdy/spdy_proxy_client_socket.h b/net/spdy/spdy_proxy_client_socket.h
index ee187d7..0e5f777 100644
--- a/net/spdy/spdy_proxy_client_socket.h
+++ b/net/spdy/spdy_proxy_client_socket.h
@@ -93,13 +93,13 @@ class NET_EXPORT_PRIVATE SpdyProxyClientSocket : public ProxyClientSocket,
// SpdyStream::Delegate implementation.
virtual SpdySendStatus OnSendHeadersComplete() OVERRIDE;
virtual void OnSendBody() OVERRIDE;
- virtual SpdySendStatus OnSendBodyComplete(size_t bytes_sent) OVERRIDE;
+ virtual SpdySendStatus OnSendBodyComplete() OVERRIDE;
virtual int OnResponseReceived(const SpdyHeaderBlock& response,
base::Time response_time,
int status) OVERRIDE;
virtual void OnHeadersSent() OVERRIDE;
virtual int OnDataReceived(scoped_ptr<SpdyBuffer> buffer) OVERRIDE;
- virtual void OnDataSent(size_t bytes_sent) OVERRIDE;
+ virtual void OnDataSent() OVERRIDE;
virtual void OnClose(int status) OVERRIDE;
private:
@@ -158,8 +158,6 @@ class NET_EXPORT_PRIVATE SpdyProxyClientSocket : public ProxyClientSocket,
// User specified number of bytes to be written.
int write_buffer_len_;
- // Number of bytes written which have not been confirmed
- int write_bytes_outstanding_;
// True if the transport socket has ever sent data.
bool was_ever_used_;
diff --git a/net/spdy/spdy_session_spdy3_unittest.cc b/net/spdy/spdy_session_spdy3_unittest.cc
index d3abc2c..17c525f 100644
--- a/net/spdy/spdy_session_spdy3_unittest.cc
+++ b/net/spdy/spdy_session_spdy3_unittest.cc
@@ -3007,7 +3007,7 @@ void SpdySessionSpdy3Test::RunResumeAfterUnstallTest31(
EXPECT_EQ("200", delegate.GetResponseHeaderValue(":status"));
EXPECT_EQ("HTTP/1.1", delegate.GetResponseHeaderValue(":version"));
EXPECT_EQ(std::string(), delegate.TakeReceivedData());
- EXPECT_EQ(static_cast<int>(kBodyDataSize), delegate.body_data_sent());
+ EXPECT_TRUE(data.at_write_eof());
}
// Run the resume-after-unstall test with all possible stall and
@@ -3190,13 +3190,13 @@ TEST_F(SpdySessionSpdy3Test, ResumeByPriorityAfterSendWindowSizeIncrease31) {
EXPECT_EQ("200", delegate1.GetResponseHeaderValue(":status"));
EXPECT_EQ("HTTP/1.1", delegate1.GetResponseHeaderValue(":version"));
EXPECT_EQ(std::string(), delegate1.TakeReceivedData());
- EXPECT_EQ(static_cast<int>(kBodyDataSize), delegate1.body_data_sent());
EXPECT_TRUE(delegate2.send_headers_completed());
EXPECT_EQ("200", delegate2.GetResponseHeaderValue(":status"));
EXPECT_EQ("HTTP/1.1", delegate2.GetResponseHeaderValue(":version"));
EXPECT_EQ(std::string(), delegate2.TakeReceivedData());
- EXPECT_EQ(static_cast<int>(kBodyDataSize), delegate2.body_data_sent());
+
+ EXPECT_TRUE(data.at_write_eof());
}
// Delegate that closes a given stream after sending its body.
@@ -3212,12 +3212,13 @@ class StreamClosingDelegate : public test::StreamDelegateWithBody {
stream_to_close_ = stream_to_close;
}
- virtual void OnSendBody() OVERRIDE {
- test::StreamDelegateWithBody::OnSendBody();
+ virtual SpdySendStatus OnSendBodyComplete() OVERRIDE {
+ SpdySendStatus status = test::StreamDelegateWithBody::OnSendBodyComplete();
if (stream_to_close_) {
stream_to_close_->Close();
EXPECT_EQ(NULL, stream_to_close_.get());
}
+ return status;
}
private:
@@ -3354,6 +3355,8 @@ TEST_F(SpdySessionSpdy3Test, SendWindowSizeIncreaseWithDeletedStreams31) {
// Unstall stream2, which should then close stream3.
delegate2.set_stream_to_close(stream3);
UnstallSessionSend(session, kBodyDataSize);
+
+ data.RunFor(1);
EXPECT_EQ(NULL, stream3.get());
EXPECT_FALSE(stream2->send_stalled_by_flow_control());
@@ -3361,7 +3364,7 @@ TEST_F(SpdySessionSpdy3Test, SendWindowSizeIncreaseWithDeletedStreams31) {
EXPECT_TRUE(session->IsStreamActive(stream_id2));
EXPECT_FALSE(session->IsStreamActive(stream_id3));
- data.RunFor(3);
+ data.RunFor(2);
EXPECT_EQ(NULL, stream2.get());
EXPECT_EQ(ERR_CONNECTION_CLOSED, delegate1.WaitForClose());
@@ -3370,17 +3373,16 @@ TEST_F(SpdySessionSpdy3Test, SendWindowSizeIncreaseWithDeletedStreams31) {
EXPECT_TRUE(delegate1.send_headers_completed());
EXPECT_EQ(std::string(), delegate1.TakeReceivedData());
- EXPECT_EQ(0, delegate1.body_data_sent());
EXPECT_TRUE(delegate2.send_headers_completed());
EXPECT_EQ("200", delegate2.GetResponseHeaderValue(":status"));
EXPECT_EQ("HTTP/1.1", delegate2.GetResponseHeaderValue(":version"));
EXPECT_EQ(std::string(), delegate2.TakeReceivedData());
- EXPECT_EQ(static_cast<int>(kBodyDataSize), delegate2.body_data_sent());
EXPECT_TRUE(delegate3.send_headers_completed());
EXPECT_EQ(std::string(), delegate3.TakeReceivedData());
- EXPECT_EQ(0, delegate3.body_data_sent());
+
+ EXPECT_TRUE(data.at_write_eof());
}
// Cause a stall by reducing the flow control send window to
@@ -3489,11 +3491,11 @@ TEST_F(SpdySessionSpdy3Test, SendWindowSizeIncreaseWithDeletedSession31) {
EXPECT_TRUE(delegate1.send_headers_completed());
EXPECT_EQ(std::string(), delegate1.TakeReceivedData());
- EXPECT_EQ(0, delegate1.body_data_sent());
EXPECT_TRUE(delegate2.send_headers_completed());
EXPECT_EQ(std::string(), delegate2.TakeReceivedData());
- EXPECT_EQ(0, delegate2.body_data_sent());
+
+ EXPECT_TRUE(data.at_write_eof());
}
// Tests the case of a non-SPDY request closing an idle SPDY session when no
diff --git a/net/spdy/spdy_stream.cc b/net/spdy/spdy_stream.cc
index 936fdf9..fd63726 100644
--- a/net/spdy/spdy_stream.cc
+++ b/net/spdy/spdy_stream.cc
@@ -129,6 +129,7 @@ SpdyStream::SpdyStream(SpdySession* session,
response_received_(false),
session_(session),
delegate_(NULL),
+ pending_send_flags_(DATA_FLAG_NONE),
request_time_(base::Time::Now()),
response_(new SpdyHeaderBlock),
io_state_(STATE_NONE),
@@ -625,7 +626,7 @@ int SpdyStream::SendRequest(bool has_upload_data) {
return DoLoop(OK);
}
-void SpdyStream::QueueHeaders(scoped_ptr<SpdyHeaderBlock> headers) {
+void SpdyStream::SendHeaders(scoped_ptr<SpdyHeaderBlock> headers) {
// Until the first headers by SYN_STREAM have been completely sent, we can
// not be sure that our stream_id is correct.
DCHECK_GT(io_state_, STATE_SEND_HEADERS_COMPLETE);
@@ -637,39 +638,13 @@ void SpdyStream::QueueHeaders(scoped_ptr<SpdyHeaderBlock> headers) {
new HeaderBufferProducer(GetWeakPtr(), headers.Pass())));
}
-void SpdyStream::QueueStreamData(IOBuffer* data,
- int length,
- SpdyDataFlags flags) {
- // Until the headers have been completely sent, we can not be sure
- // that our stream_id is correct.
- DCHECK_GT(io_state_, STATE_SEND_HEADERS_COMPLETE);
- CHECK_GT(stream_id_, 0u);
-
- scoped_ptr<SpdyBuffer> data_buffer(session_->CreateDataBuffer(
- stream_id_, data, length, flags));
- // We'll get called again by PossiblyResumeIfSendStalled().
- if (!data_buffer)
- return;
-
- if (session_->flow_control_state() >= SpdySession::FLOW_CONTROL_STREAM) {
- DCHECK_GE(data_buffer->GetRemainingSize(),
- session_->GetDataFrameMinimumSize());
- size_t payload_size =
- data_buffer->GetRemainingSize() - session_->GetDataFrameMinimumSize();
- DCHECK_LE(payload_size, session_->GetDataFrameMaximumPayload());
- DecreaseSendWindowSize(static_cast<int32>(payload_size));
- // This currently isn't strictly needed, since write frames are
- // discarded only if the stream is about to be closed. But have it
- // here anyway just in case this changes.
- data_buffer->AddConsumeCallback(
- base::Bind(&SpdyStream::OnWriteBufferConsumed,
- GetWeakPtr(), payload_size));
- }
-
- session_->EnqueueStreamWrite(
- GetWeakPtr(), DATA,
- scoped_ptr<SpdyBufferProducer>(
- new SimpleBufferProducer(data_buffer.Pass())));
+void SpdyStream::SendStreamData(IOBuffer* data,
+ int length,
+ SpdyDataFlags flags) {
+ CHECK(!pending_send_data_);
+ pending_send_data_ = new DrainableIOBuffer(data, length);
+ pending_send_flags_ = flags;
+ QueueNextDataFrame();
}
bool SpdyStream::GetSSLInfo(SSLInfo* ssl_info,
@@ -692,8 +667,7 @@ void SpdyStream::PossiblyResumeIfSendStalled() {
NetLog::TYPE_SPDY_STREAM_FLOW_CONTROL_UNSTALLED,
NetLog::IntegerCallback("stream_id", stream_id_));
send_stalled_by_flow_control_ = false;
- io_state_ = STATE_SEND_BODY;
- DoLoop(OK);
+ QueueNextDataFrame();
}
}
@@ -903,11 +877,6 @@ int SpdyStream::DoSendHeadersComplete() {
// DoSendBody is called to send the optional body for the request. This call
// will also be called as each write of a chunk of the body completes.
int SpdyStream::DoSendBody() {
- // If we're already in the STATE_SEND_BODY state, then we've already
- // sent a portion of the body. In that case, we need to first consume
- // the bytes written in the body stream. Note that the bytes written is
- // the number of bytes in the frame that were written, only consume the
- // data portion, of course.
io_state_ = STATE_SEND_BODY_COMPLETE;
CHECK(delegate_);
delegate_->OnSendBody();
@@ -935,15 +904,25 @@ int SpdyStream::DoSendBodyComplete(int result) {
return ERR_UNEXPECTED;
}
+ send_bytes_ += frame_payload_size;
+
+ pending_send_data_->DidConsume(frame_payload_size);
+ if (pending_send_data_->BytesRemaining() > 0) {
+ io_state_ = STATE_SEND_BODY_COMPLETE;
+ QueueNextDataFrame();
+ return ERR_IO_PENDING;
+ }
+
+ pending_send_data_ = NULL;
+ pending_send_flags_ = DATA_FLAG_NONE;
+
if (!delegate_) {
NOTREACHED();
return ERR_UNEXPECTED;
}
- send_bytes_ += frame_payload_size;
-
io_state_ =
- (delegate_->OnSendBodyComplete(frame_payload_size) == MORE_DATA_TO_SEND) ?
+ (delegate_->OnSendBodyComplete() == MORE_DATA_TO_SEND) ?
STATE_SEND_BODY : STATE_WAITING_FOR_RESPONSE;
return OK;
@@ -967,8 +946,19 @@ int SpdyStream::DoOpen() {
}
send_bytes_ += frame_payload_size;
+
+ pending_send_data_->DidConsume(frame_payload_size);
+ if (pending_send_data_->BytesRemaining() > 0) {
+ QueueNextDataFrame();
+ return ERR_IO_PENDING;
+ }
+
+ pending_send_data_ = NULL;
+ pending_send_flags_ = DATA_FLAG_NONE;
+
if (delegate_)
- delegate_->OnDataSent(frame_payload_size);
+ delegate_->OnDataSent();
+
break;
}
@@ -1002,4 +992,41 @@ void SpdyStream::UpdateHistograms() {
UMA_HISTOGRAM_COUNTS("Net.SpdyRecvBytes", recv_bytes_);
}
+void SpdyStream::QueueNextDataFrame() {
+ // Until the headers have been completely sent, we can not be sure
+ // that our stream_id is correct.
+ DCHECK_GT(io_state_, STATE_SEND_HEADERS_COMPLETE);
+ CHECK_GT(stream_id_, 0u);
+ CHECK(pending_send_data_);
+ CHECK_GT(pending_send_data_->BytesRemaining(), 0);
+
+ scoped_ptr<SpdyBuffer> data_buffer(session_->CreateDataBuffer(
+ stream_id_,
+ pending_send_data_, pending_send_data_->BytesRemaining(),
+ pending_send_flags_));
+ // We'll get called again by PossiblyResumeIfSendStalled().
+ if (!data_buffer)
+ return;
+
+ if (session_->flow_control_state() >= SpdySession::FLOW_CONTROL_STREAM) {
+ DCHECK_GE(data_buffer->GetRemainingSize(),
+ session_->GetDataFrameMinimumSize());
+ size_t payload_size =
+ data_buffer->GetRemainingSize() - session_->GetDataFrameMinimumSize();
+ DCHECK_LE(payload_size, session_->GetDataFrameMaximumPayload());
+ DecreaseSendWindowSize(static_cast<int32>(payload_size));
+ // This currently isn't strictly needed, since write frames are
+ // discarded only if the stream is about to be closed. But have it
+ // here anyway just in case this changes.
+ data_buffer->AddConsumeCallback(
+ base::Bind(&SpdyStream::OnWriteBufferConsumed,
+ GetWeakPtr(), payload_size));
+ }
+
+ session_->EnqueueStreamWrite(
+ GetWeakPtr(), DATA,
+ scoped_ptr<SpdyBufferProducer>(
+ new SimpleBufferProducer(data_buffer.Pass())));
+}
+
} // namespace net
diff --git a/net/spdy/spdy_stream.h b/net/spdy/spdy_stream.h
index b9ecfe9..5f14f87 100644
--- a/net/spdy/spdy_stream.h
+++ b/net/spdy/spdy_stream.h
@@ -56,12 +56,12 @@ class NET_EXPORT_PRIVATE SpdyStream {
public:
Delegate() {}
- // Called when SYN frame has been sent.
- // Returns true if no more data to be sent after SYN frame.
+ // Called when SYN frame has been sent. Must return whether
+ // there's body data to send.
virtual SpdySendStatus OnSendHeadersComplete() = 0;
// Called when the stream is ready to send body data. The
- // delegate must call QueueStreamData() on the stream, either
+ // delegate must call SendStreamData() on the stream, either
// immediately or asynchronously (e.g., if the data to be send has
// to be read asynchronously).
//
@@ -69,10 +69,9 @@ class NET_EXPORT_PRIVATE SpdyStream {
// OnSendBodyComplete() returns MORE_DATA_TO_SEND.
virtual void OnSendBody() = 0;
- // Called when body data has been sent. |bytes_sent| is the number
- // of bytes that has been sent (may be zero). Must return whether
+ // Called when body data has been sent. Must return whether
// there's more body data to send.
- virtual SpdySendStatus OnSendBodyComplete(size_t bytes_sent) = 0;
+ virtual SpdySendStatus OnSendBodyComplete() = 0;
// Called when the SYN_STREAM, SYN_REPLY, or HEADERS frames are received.
// Normal streams will receive a SYN_REPLY and optional HEADERS frames.
@@ -93,7 +92,7 @@ class NET_EXPORT_PRIVATE SpdyStream {
virtual int OnDataReceived(scoped_ptr<SpdyBuffer> buffer) = 0;
// Called when data is sent.
- virtual void OnDataSent(size_t bytes_sent) = 0;
+ virtual void OnDataSent() = 0;
// Called when SpdyStream is closed. No other delegate functions
// will be called after this is called, and the delegate must not
@@ -288,19 +287,28 @@ class NET_EXPORT_PRIVATE SpdyStream {
// this once crbug.com/113107 is addressed.
bool body_sent() const { return io_state_ > STATE_SEND_BODY_COMPLETE; }
- // Interface for Spdy[Http|WebSocket]Stream to use.
+ // Interface for the delegate to use.
+ //
+ // TODO(akalin): Mandate that only one send can be in flight at one
+ // time.
// Sends the request.
// For non push stream, it will send SYN_STREAM frame.
int SendRequest(bool has_upload_data);
- // Queues a HEADERS frame to be sent.
- void QueueHeaders(scoped_ptr<SpdyHeaderBlock> headers);
+ // Sends a HEADERS frame. The delegate will be notified via
+ // OnHeadersSent() when the send is complete.
+ void SendHeaders(scoped_ptr<SpdyHeaderBlock> headers);
- // Queues a DATA frame to be sent. May not queue all the data that
- // is given (or even any of it) depending on flow control.
- void QueueStreamData(IOBuffer* data, int length,
- SpdyDataFlags flags);
+ // Sends a DATA frame. The delegate will be notified via
+ // OnSendBodyComplete() (if the response hasn't been received yet)
+ // or OnDataSent() (if the response has been received) when the send
+ // is complete. Only one data send can be in flight at one time.
+ //
+ // |flags| must be DATA_FLAG_NONE except for the last piece of data
+ // for a request in a request/response stream, where it should be
+ // DATA_FLAG_FIN.
+ void SendStreamData(IOBuffer* data, int length, SpdyDataFlags flags);
// Fills SSL info in |ssl_info| and returns true when SSL is in use.
bool GetSSLInfo(SSLInfo* ssl_info,
@@ -390,6 +398,11 @@ class NET_EXPORT_PRIVATE SpdyStream {
scoped_ptr<SpdyFrame> ProduceHeaderFrame(
scoped_ptr<SpdyHeaderBlock> header_block);
+ // Queues the send for next frame of the remaining data in
+ // |pending_send_data_|. Must be called only when
+ // |pending_send_data_| and |pending_send_flags_| are set.
+ void QueueNextDataFrame();
+
base::WeakPtrFactory<SpdyStream> weak_ptr_factory_;
// Sentinel variable used to make sure we don't get destroyed by a
@@ -424,6 +437,10 @@ class NET_EXPORT_PRIVATE SpdyStream {
// The request to send.
scoped_ptr<SpdyHeaderBlock> request_;
+ // The data waiting to be sent.
+ scoped_refptr<DrainableIOBuffer> pending_send_data_;
+ SpdyDataFlags pending_send_flags_;
+
// The time at which the request was made that resulted in this response.
// For cached responses, this time could be "far" in the past.
base::Time request_time_;
diff --git a/net/spdy/spdy_stream_spdy2_unittest.cc b/net/spdy/spdy_stream_spdy2_unittest.cc
index d8d0208..43dc11b 100644
--- a/net/spdy/spdy_stream_spdy2_unittest.cc
+++ b/net/spdy/spdy_stream_spdy2_unittest.cc
@@ -141,7 +141,7 @@ TEST_F(SpdyStreamSpdy2Test, SendDataAfterOpen) {
EXPECT_EQ("HTTP/1.1", delegate.GetResponseHeaderValue("version"));
EXPECT_EQ(std::string(kPostBody, kPostBodyLength),
delegate.TakeReceivedData());
- EXPECT_EQ(static_cast<int>(kPostBodyLength), delegate.data_sent());
+ EXPECT_TRUE(data.at_write_eof());
}
TEST_F(SpdyStreamSpdy2Test, SendHeaderAndDataAfterOpen) {
@@ -216,9 +216,8 @@ TEST_F(SpdyStreamSpdy2Test, SendHeaderAndDataAfterOpen) {
EXPECT_TRUE(delegate.send_headers_completed());
EXPECT_EQ("101", delegate.GetResponseHeaderValue("status"));
- EXPECT_EQ(1, delegate.headers_sent());
EXPECT_EQ(std::string(), delegate.TakeReceivedData());
- EXPECT_EQ(6, delegate.data_sent());
+ EXPECT_TRUE(data.at_write_eof());
}
TEST_F(SpdyStreamSpdy2Test, PushedStream) {
@@ -334,7 +333,7 @@ TEST_F(SpdyStreamSpdy2Test, StreamError) {
EXPECT_EQ("HTTP/1.1", delegate.GetResponseHeaderValue("version"));
EXPECT_EQ(std::string(kPostBody, kPostBodyLength),
delegate.TakeReceivedData());
- EXPECT_EQ(static_cast<int>(kPostBodyLength), delegate.data_sent());
+ EXPECT_TRUE(data.at_write_eof());
// Check that the NetLog was filled reasonably.
net::CapturingNetLog::CapturedEntryList entries;
@@ -352,6 +351,133 @@ TEST_F(SpdyStreamSpdy2Test, StreamError) {
EXPECT_EQ(static_cast<int>(stream_id), stream_id2);
}
+// Make sure that large blocks of data are properly split up into
+// frame-sized chunks for a request/response (i.e., an HTTP-like)
+// stream.
+TEST_F(SpdyStreamSpdy2Test, SendLargeDataAfterOpenRequestResponse) {
+ GURL url(kStreamUrl);
+
+ session_ = SpdySessionDependencies::SpdyCreateSession(&session_deps_);
+
+ scoped_ptr<SpdyFrame> req(
+ ConstructSpdyPost(kStreamUrl, kPostBodyLength, NULL, 0));
+ std::string chunk_data(kMaxSpdyFrameChunkSize, 'x');
+ scoped_ptr<SpdyFrame> chunk(
+ ConstructSpdyBodyFrame(
+ 1, chunk_data.data(), chunk_data.length(), false));
+ MockWrite writes[] = {
+ CreateMockWrite(*req, 0),
+ CreateMockWrite(*chunk, 1),
+ CreateMockWrite(*chunk, 2),
+ CreateMockWrite(*chunk, 3),
+ };
+
+ scoped_ptr<SpdyFrame> resp(ConstructSpdyPostSynReply(NULL, 0));
+ MockRead reads[] = {
+ CreateMockRead(*resp, 4),
+ MockRead(ASYNC, 0, 0, 5), // EOF
+ };
+
+ OrderedSocketData data(reads, arraysize(reads), writes, arraysize(writes));
+ MockConnect connect_data(SYNCHRONOUS, OK);
+ data.set_connect_data(connect_data);
+
+ session_deps_.socket_factory->AddSocketDataProvider(&data);
+
+ scoped_refptr<SpdySession> session(CreateSpdySession());
+
+ InitializeSpdySession(session, host_port_pair_);
+
+ base::WeakPtr<SpdyStream> stream =
+ CreateStreamSynchronously(session, url, LOWEST, BoundNetLog());
+ ASSERT_TRUE(stream.get() != NULL);
+
+ std::string body_data(3 * kMaxSpdyFrameChunkSize, 'x');
+ StreamDelegateWithBody delegate(stream, body_data);
+ stream->SetDelegate(&delegate);
+
+ EXPECT_FALSE(stream->HasUrl());
+
+ stream->set_spdy_headers(
+ spdy_util_.ConstructPostHeaderBlock(kStreamUrl, kPostBodyLength));
+ EXPECT_TRUE(stream->HasUrl());
+ EXPECT_EQ(kStreamUrl, stream->GetUrl().spec());
+
+ EXPECT_EQ(ERR_IO_PENDING, stream->SendRequest(true));
+
+ EXPECT_EQ(ERR_CONNECTION_CLOSED, delegate.WaitForClose());
+
+ EXPECT_TRUE(delegate.send_headers_completed());
+ EXPECT_EQ("200", delegate.GetResponseHeaderValue("status"));
+ EXPECT_EQ("HTTP/1.1", delegate.GetResponseHeaderValue("version"));
+ EXPECT_EQ(std::string(), delegate.TakeReceivedData());
+ EXPECT_TRUE(data.at_write_eof());
+}
+
+// Make sure that large blocks of data are properly split up into
+// frame-sized chunks for a bidirectional (i.e., non-HTTP-like)
+// stream.
+TEST_F(SpdyStreamSpdy2Test, SendLargeDataAfterOpenBidirectional) {
+ GURL url(kStreamUrl);
+
+ session_ = SpdySessionDependencies::SpdyCreateSession(&session_deps_);
+
+ scoped_ptr<SpdyFrame> req(
+ ConstructSpdyPost(kStreamUrl, kPostBodyLength, NULL, 0));
+ std::string chunk_data(kMaxSpdyFrameChunkSize, 'x');
+ scoped_ptr<SpdyFrame> chunk(
+ ConstructSpdyBodyFrame(
+ 1, chunk_data.data(), chunk_data.length(), false));
+ MockWrite writes[] = {
+ CreateMockWrite(*req, 0),
+ CreateMockWrite(*chunk, 2),
+ CreateMockWrite(*chunk, 3),
+ CreateMockWrite(*chunk, 4),
+ };
+
+ scoped_ptr<SpdyFrame> resp(ConstructSpdyPostSynReply(NULL, 0));
+ MockRead reads[] = {
+ CreateMockRead(*resp, 1),
+ MockRead(ASYNC, 0, 0, 5), // EOF
+ };
+
+ OrderedSocketData data(reads, arraysize(reads), writes, arraysize(writes));
+ MockConnect connect_data(SYNCHRONOUS, OK);
+ data.set_connect_data(connect_data);
+
+ session_deps_.socket_factory->AddSocketDataProvider(&data);
+
+ scoped_refptr<SpdySession> session(CreateSpdySession());
+
+ InitializeSpdySession(session, host_port_pair_);
+
+ base::WeakPtr<SpdyStream> stream =
+ CreateStreamSynchronously(session, url, LOWEST, BoundNetLog());
+ ASSERT_TRUE(stream.get() != NULL);
+
+ std::string body_data(3 * kMaxSpdyFrameChunkSize, 'x');
+ StreamDelegateSendImmediate delegate(
+ stream, scoped_ptr<SpdyHeaderBlock>(), body_data);
+ stream->SetDelegate(&delegate);
+
+ EXPECT_FALSE(stream->HasUrl());
+
+ stream->set_spdy_headers(
+ spdy_util_.ConstructPostHeaderBlock(kStreamUrl, kPostBodyLength));
+ EXPECT_TRUE(stream->HasUrl());
+ EXPECT_EQ(kStreamUrl, stream->GetUrl().spec());
+
+ EXPECT_EQ(ERR_IO_PENDING, stream->SendRequest(true));
+
+ EXPECT_EQ(ERR_CONNECTION_CLOSED, delegate.WaitForClose());
+
+ EXPECT_TRUE(delegate.send_headers_completed());
+ EXPECT_EQ("200", delegate.GetResponseHeaderValue("status"));
+ EXPECT_EQ("HTTP/1.1", delegate.GetResponseHeaderValue("version"));
+ EXPECT_EQ(std::string(), delegate.TakeReceivedData());
+ EXPECT_TRUE(data.at_write_eof());
+}
+
} // namespace
} // namespace test
diff --git a/net/spdy/spdy_stream_spdy3_unittest.cc b/net/spdy/spdy_stream_spdy3_unittest.cc
index b08dcd1..db8f612 100644
--- a/net/spdy/spdy_stream_spdy3_unittest.cc
+++ b/net/spdy/spdy_stream_spdy3_unittest.cc
@@ -150,7 +150,7 @@ TEST_F(SpdyStreamSpdy3Test, SendDataAfterOpen) {
EXPECT_EQ("HTTP/1.1", delegate.GetResponseHeaderValue(":version"));
EXPECT_EQ(std::string(kPostBody, kPostBodyLength),
delegate.TakeReceivedData());
- EXPECT_EQ(static_cast<int>(kPostBodyLength), delegate.data_sent());
+ EXPECT_TRUE(data.at_write_eof());
}
TEST_F(SpdyStreamSpdy3Test, SendHeaderAndDataAfterOpen) {
@@ -226,9 +226,8 @@ TEST_F(SpdyStreamSpdy3Test, SendHeaderAndDataAfterOpen) {
EXPECT_TRUE(delegate.send_headers_completed());
EXPECT_EQ("101", delegate.GetResponseHeaderValue(":status"));
- EXPECT_EQ(1, delegate.headers_sent());
EXPECT_EQ(std::string(), delegate.TakeReceivedData());
- EXPECT_EQ(6, delegate.data_sent());
+ EXPECT_TRUE(data.at_write_eof());
}
TEST_F(SpdyStreamSpdy3Test, PushedStream) {
@@ -347,7 +346,7 @@ TEST_F(SpdyStreamSpdy3Test, StreamError) {
EXPECT_EQ("HTTP/1.1", delegate.GetResponseHeaderValue(":version"));
EXPECT_EQ(std::string(kPostBody, kPostBodyLength),
delegate.TakeReceivedData());
- EXPECT_EQ(static_cast<int>(kPostBodyLength), delegate.data_sent());
+ EXPECT_TRUE(data.at_write_eof());
// Check that the NetLog was filled reasonably.
net::CapturingNetLog::CapturedEntryList entries;
@@ -365,6 +364,133 @@ TEST_F(SpdyStreamSpdy3Test, StreamError) {
EXPECT_EQ(static_cast<int>(stream_id), stream_id2);
}
+// Make sure that large blocks of data are properly split up into
+// frame-sized chunks for a request/response (i.e., an HTTP-like)
+// stream.
+TEST_F(SpdyStreamSpdy3Test, SendLargeDataAfterOpenRequestResponse) {
+ GURL url(kStreamUrl);
+
+ session_ = SpdySessionDependencies::SpdyCreateSession(&session_deps_);
+
+ scoped_ptr<SpdyFrame> req(
+ ConstructSpdyPost(kStreamUrl, 1, kPostBodyLength, LOWEST, NULL, 0));
+ std::string chunk_data(kMaxSpdyFrameChunkSize, 'x');
+ scoped_ptr<SpdyFrame> chunk(
+ ConstructSpdyBodyFrame(
+ 1, chunk_data.data(), chunk_data.length(), false));
+ MockWrite writes[] = {
+ CreateMockWrite(*req, 0),
+ CreateMockWrite(*chunk, 1),
+ CreateMockWrite(*chunk, 2),
+ CreateMockWrite(*chunk, 3),
+ };
+
+ scoped_ptr<SpdyFrame> resp(ConstructSpdyPostSynReply(NULL, 0));
+ MockRead reads[] = {
+ CreateMockRead(*resp, 4),
+ MockRead(ASYNC, 0, 0, 5), // EOF
+ };
+
+ OrderedSocketData data(reads, arraysize(reads), writes, arraysize(writes));
+ MockConnect connect_data(SYNCHRONOUS, OK);
+ data.set_connect_data(connect_data);
+
+ session_deps_.socket_factory->AddSocketDataProvider(&data);
+
+ scoped_refptr<SpdySession> session(CreateSpdySession());
+
+ InitializeSpdySession(session, host_port_pair_);
+
+ base::WeakPtr<SpdyStream> stream =
+ CreateStreamSynchronously(session, url, LOWEST, BoundNetLog());
+ ASSERT_TRUE(stream.get() != NULL);
+
+ std::string body_data(3 * kMaxSpdyFrameChunkSize, 'x');
+ StreamDelegateWithBody delegate(stream, body_data);
+ stream->SetDelegate(&delegate);
+
+ EXPECT_FALSE(stream->HasUrl());
+
+ stream->set_spdy_headers(
+ spdy_util_.ConstructPostHeaderBlock(kStreamUrl, kPostBodyLength));
+ EXPECT_TRUE(stream->HasUrl());
+ EXPECT_EQ(kStreamUrl, stream->GetUrl().spec());
+
+ EXPECT_EQ(ERR_IO_PENDING, stream->SendRequest(true));
+
+ EXPECT_EQ(ERR_CONNECTION_CLOSED, delegate.WaitForClose());
+
+ EXPECT_TRUE(delegate.send_headers_completed());
+ EXPECT_EQ("200", delegate.GetResponseHeaderValue(":status"));
+ EXPECT_EQ("HTTP/1.1", delegate.GetResponseHeaderValue(":version"));
+ EXPECT_EQ(std::string(), delegate.TakeReceivedData());
+ EXPECT_TRUE(data.at_write_eof());
+}
+
+// Make sure that large blocks of data are properly split up into
+// frame-sized chunks for a bidirectional (i.e., non-HTTP-like)
+// stream.
+TEST_F(SpdyStreamSpdy3Test, SendLargeDataAfterOpenBidirectional) {
+ GURL url(kStreamUrl);
+
+ session_ = SpdySessionDependencies::SpdyCreateSession(&session_deps_);
+
+ scoped_ptr<SpdyFrame> req(
+ ConstructSpdyPost(kStreamUrl, 1, kPostBodyLength, LOWEST, NULL, 0));
+ std::string chunk_data(kMaxSpdyFrameChunkSize, 'x');
+ scoped_ptr<SpdyFrame> chunk(
+ ConstructSpdyBodyFrame(
+ 1, chunk_data.data(), chunk_data.length(), false));
+ MockWrite writes[] = {
+ CreateMockWrite(*req, 0),
+ CreateMockWrite(*chunk, 2),
+ CreateMockWrite(*chunk, 3),
+ CreateMockWrite(*chunk, 4),
+ };
+
+ scoped_ptr<SpdyFrame> resp(ConstructSpdyPostSynReply(NULL, 0));
+ MockRead reads[] = {
+ CreateMockRead(*resp, 1),
+ MockRead(ASYNC, 0, 0, 5), // EOF
+ };
+
+ OrderedSocketData data(reads, arraysize(reads), writes, arraysize(writes));
+ MockConnect connect_data(SYNCHRONOUS, OK);
+ data.set_connect_data(connect_data);
+
+ session_deps_.socket_factory->AddSocketDataProvider(&data);
+
+ scoped_refptr<SpdySession> session(CreateSpdySession());
+
+ InitializeSpdySession(session, host_port_pair_);
+
+ base::WeakPtr<SpdyStream> stream =
+ CreateStreamSynchronously(session, url, LOWEST, BoundNetLog());
+ ASSERT_TRUE(stream.get() != NULL);
+
+ std::string body_data(3 * kMaxSpdyFrameChunkSize, 'x');
+ StreamDelegateSendImmediate delegate(
+ stream, scoped_ptr<SpdyHeaderBlock>(), body_data);
+ stream->SetDelegate(&delegate);
+
+ EXPECT_FALSE(stream->HasUrl());
+
+ stream->set_spdy_headers(
+ spdy_util_.ConstructPostHeaderBlock(kStreamUrl, kPostBodyLength));
+ EXPECT_TRUE(stream->HasUrl());
+ EXPECT_EQ(kStreamUrl, stream->GetUrl().spec());
+
+ EXPECT_EQ(ERR_IO_PENDING, stream->SendRequest(true));
+
+ EXPECT_EQ(ERR_CONNECTION_CLOSED, delegate.WaitForClose());
+
+ EXPECT_TRUE(delegate.send_headers_completed());
+ EXPECT_EQ("200", delegate.GetResponseHeaderValue(":status"));
+ EXPECT_EQ("HTTP/1.1", delegate.GetResponseHeaderValue(":version"));
+ EXPECT_EQ(std::string(), delegate.TakeReceivedData());
+ EXPECT_TRUE(data.at_write_eof());
+}
+
// Call IncreaseSendWindowSize on a stream with a large enough delta
// to overflow an int32. The SpdyStream should handle that case
// gracefully.
@@ -530,7 +656,7 @@ void SpdyStreamSpdy3Test::RunResumeAfterUnstallRequestResponseTest(
EXPECT_EQ("200", delegate.GetResponseHeaderValue(":status"));
EXPECT_EQ("HTTP/1.1", delegate.GetResponseHeaderValue(":version"));
EXPECT_EQ(std::string(), delegate.TakeReceivedData());
- EXPECT_EQ(static_cast<int>(kPostBodyLength), delegate.body_data_sent());
+ EXPECT_TRUE(data.at_write_eof());
}
TEST_F(SpdyStreamSpdy3Test, ResumeAfterSendWindowSizeIncreaseRequestResponse) {
@@ -622,19 +748,15 @@ void SpdyStreamSpdy3Test::RunResumeAfterUnstallBidirectionalTest(
EXPECT_EQ("HTTP/1.1", delegate.GetResponseHeaderValue(":version"));
EXPECT_EQ(std::string(kPostBody, kPostBodyLength),
delegate.TakeReceivedData());
- EXPECT_EQ(static_cast<int>(kPostBodyLength), delegate.data_sent());
+ EXPECT_TRUE(data.at_write_eof());
}
-// TODO(akalin): Re-enable these when http://crbug.com/242288 is
-// fixed.
-TEST_F(SpdyStreamSpdy3Test,
- DISABLED_ResumeAfterSendWindowSizeIncreaseBidirectional) {
+TEST_F(SpdyStreamSpdy3Test, ResumeAfterSendWindowSizeIncreaseBidirectional) {
RunResumeAfterUnstallBidirectionalTest(
base::Bind(&IncreaseStreamSendWindowSize));
}
-TEST_F(SpdyStreamSpdy3Test,
- DISABLED_ResumeAfterSendWindowSizeAdjustBidirectional) {
+TEST_F(SpdyStreamSpdy3Test, ResumeAfterSendWindowSizeAdjustBidirectional) {
RunResumeAfterUnstallBidirectionalTest(
base::Bind(&AdjustStreamSendWindowSize));
}
diff --git a/net/spdy/spdy_stream_test_util.cc b/net/spdy/spdy_stream_test_util.cc
index c9a06fd..3055267 100644
--- a/net/spdy/spdy_stream_test_util.cc
+++ b/net/spdy/spdy_stream_test_util.cc
@@ -28,7 +28,7 @@ void ClosingDelegate::OnSendBody() {
ADD_FAILURE() << "OnSendBody should not be called";
}
-SpdySendStatus ClosingDelegate::OnSendBodyComplete(size_t /*bytes_sent*/) {
+SpdySendStatus ClosingDelegate::OnSendBodyComplete() {
return NO_MORE_DATA_TO_SEND;
}
@@ -44,7 +44,7 @@ int ClosingDelegate::OnDataReceived(scoped_ptr<SpdyBuffer> buffer) {
return OK;
}
-void ClosingDelegate::OnDataSent(size_t bytes_sent) {}
+void ClosingDelegate::OnDataSent() {}
void ClosingDelegate::OnClose(int status) {
DCHECK(stream_);
@@ -56,9 +56,7 @@ StreamDelegateBase::StreamDelegateBase(
const base::WeakPtr<SpdyStream>& stream)
: stream_(stream),
stream_id_(0),
- send_headers_completed_(false),
- headers_sent_(0),
- data_sent_(0) {
+ send_headers_completed_(false) {
}
StreamDelegateBase::~StreamDelegateBase() {
@@ -79,9 +77,7 @@ int StreamDelegateBase::OnResponseReceived(const SpdyHeaderBlock& response,
return status;
}
-void StreamDelegateBase::OnHeadersSent() {
- headers_sent_++;
-}
+void StreamDelegateBase::OnHeadersSent() {}
int StreamDelegateBase::OnDataReceived(scoped_ptr<SpdyBuffer> buffer) {
if (buffer)
@@ -89,9 +85,7 @@ int StreamDelegateBase::OnDataReceived(scoped_ptr<SpdyBuffer> buffer) {
return OK;
}
-void StreamDelegateBase::OnDataSent(size_t bytes_sent) {
- data_sent_ += bytes_sent;
-}
+void StreamDelegateBase::OnDataSent() {}
void StreamDelegateBase::OnClose(int status) {
if (!stream_)
@@ -135,8 +129,7 @@ void StreamDelegateDoNothing::OnSendBody() {
ADD_FAILURE() << "OnSendBody should not be called";
}
-SpdySendStatus StreamDelegateDoNothing::OnSendBodyComplete(
- size_t /*bytes_sent*/) {
+SpdySendStatus StreamDelegateDoNothing::OnSendBodyComplete() {
return NO_MORE_DATA_TO_SEND;
}
@@ -155,8 +148,7 @@ void StreamDelegateSendImmediate::OnSendBody() {
ADD_FAILURE() << "OnSendBody should not be called";
}
-SpdySendStatus StreamDelegateSendImmediate::OnSendBodyComplete(
- size_t /*bytes_sent*/) {
+SpdySendStatus StreamDelegateSendImmediate::OnSendBodyComplete() {
ADD_FAILURE() << "OnSendBodyComplete should not be called";
return NO_MORE_DATA_TO_SEND;
}
@@ -168,11 +160,11 @@ int StreamDelegateSendImmediate::OnResponseReceived(
status =
StreamDelegateBase::OnResponseReceived(response, response_time, status);
if (headers_.get()) {
- stream()->QueueHeaders(headers_.Pass());
+ stream()->SendHeaders(headers_.Pass());
}
if (data_.data()) {
scoped_refptr<StringIOBuffer> buf(new StringIOBuffer(data_.as_string()));
- stream()->QueueStreamData(buf, buf->size(), DATA_FLAG_NONE);
+ stream()->SendStreamData(buf, buf->size(), DATA_FLAG_NONE);
}
return status;
}
@@ -181,9 +173,7 @@ StreamDelegateWithBody::StreamDelegateWithBody(
const base::WeakPtr<SpdyStream>& stream,
base::StringPiece data)
: StreamDelegateBase(stream),
- buf_(new DrainableIOBuffer(new StringIOBuffer(data.as_string()),
- data.size())),
- body_data_sent_(0) {}
+ buf_(new StringIOBuffer(data.as_string())) {}
StreamDelegateWithBody::~StreamDelegateWithBody() {
}
@@ -194,20 +184,10 @@ SpdySendStatus StreamDelegateWithBody::OnSendHeadersComplete() {
}
void StreamDelegateWithBody::OnSendBody() {
- stream()->QueueStreamData(buf_.get(), buf_->BytesRemaining(),
- DATA_FLAG_NONE);
+ stream()->SendStreamData(buf_.get(), buf_->size(), DATA_FLAG_NONE);
}
-SpdySendStatus StreamDelegateWithBody::OnSendBodyComplete(size_t bytes_sent) {
- EXPECT_GT(bytes_sent, 0u);
-
- buf_->DidConsume(bytes_sent);
- body_data_sent_ += bytes_sent;
- if (buf_->BytesRemaining() > 0) {
- // Go back to OnSendBody() to send the remaining data.
- return MORE_DATA_TO_SEND;
- }
-
+SpdySendStatus StreamDelegateWithBody::OnSendBodyComplete() {
return NO_MORE_DATA_TO_SEND;
}
diff --git a/net/spdy/spdy_stream_test_util.h b/net/spdy/spdy_stream_test_util.h
index 334f1eb..d99c967 100644
--- a/net/spdy/spdy_stream_test_util.h
+++ b/net/spdy/spdy_stream_test_util.h
@@ -28,13 +28,13 @@ class ClosingDelegate : public SpdyStream::Delegate {
// SpdyStream::Delegate implementation.
virtual SpdySendStatus OnSendHeadersComplete() OVERRIDE;
virtual void OnSendBody() OVERRIDE;
- virtual SpdySendStatus OnSendBodyComplete(size_t bytes_sent) OVERRIDE;
+ virtual SpdySendStatus OnSendBodyComplete() OVERRIDE;
virtual int OnResponseReceived(const SpdyHeaderBlock& response,
base::Time response_time,
int status) OVERRIDE;
virtual void OnHeadersSent() OVERRIDE;
virtual int OnDataReceived(scoped_ptr<SpdyBuffer> buffer) OVERRIDE;
- virtual void OnDataSent(size_t bytes_sent) OVERRIDE;
+ virtual void OnDataSent() OVERRIDE;
virtual void OnClose(int status) OVERRIDE;
// Returns whether or not the stream is closed.
@@ -53,13 +53,13 @@ class StreamDelegateBase : public SpdyStream::Delegate {
virtual SpdySendStatus OnSendHeadersComplete() OVERRIDE;
virtual void OnSendBody() = 0;
- virtual SpdySendStatus OnSendBodyComplete(size_t bytes_sent) = 0;
+ virtual SpdySendStatus OnSendBodyComplete() = 0;
virtual int OnResponseReceived(const SpdyHeaderBlock& response,
base::Time response_time,
int status) OVERRIDE;
virtual void OnHeadersSent() OVERRIDE;
virtual int OnDataReceived(scoped_ptr<SpdyBuffer> buffer) OVERRIDE;
- virtual void OnDataSent(size_t bytes_sent) OVERRIDE;
+ virtual void OnDataSent() OVERRIDE;
virtual void OnClose(int status) OVERRIDE;
// Waits for the stream to be closed and returns the status passed
@@ -79,8 +79,6 @@ class StreamDelegateBase : public SpdyStream::Delegate {
std::string GetResponseHeaderValue(const std::string& name) const;
bool send_headers_completed() const { return send_headers_completed_; }
- int headers_sent() const { return headers_sent_; }
- int data_sent() const { return data_sent_; }
protected:
const base::WeakPtr<SpdyStream>& stream() { return stream_; }
@@ -92,8 +90,6 @@ class StreamDelegateBase : public SpdyStream::Delegate {
bool send_headers_completed_;
SpdyHeaderBlock response_;
SpdyReadQueue received_data_queue_;
- int headers_sent_;
- int data_sent_;
};
// Test delegate that does nothing. Used to capture data about the
@@ -104,7 +100,7 @@ class StreamDelegateDoNothing : public StreamDelegateBase {
virtual ~StreamDelegateDoNothing();
virtual void OnSendBody() OVERRIDE;
- virtual SpdySendStatus OnSendBodyComplete(size_t bytes_sent) OVERRIDE;
+ virtual SpdySendStatus OnSendBodyComplete() OVERRIDE;
};
// Test delegate that sends data immediately in OnResponseReceived().
@@ -117,7 +113,7 @@ class StreamDelegateSendImmediate : public StreamDelegateBase {
virtual ~StreamDelegateSendImmediate();
virtual void OnSendBody() OVERRIDE;
- virtual SpdySendStatus OnSendBodyComplete(size_t bytes_sent) OVERRIDE;
+ virtual SpdySendStatus OnSendBodyComplete() OVERRIDE;
virtual int OnResponseReceived(const SpdyHeaderBlock& response,
base::Time response_time,
int status) OVERRIDE;
@@ -136,13 +132,10 @@ class StreamDelegateWithBody : public StreamDelegateBase {
virtual SpdySendStatus OnSendHeadersComplete() OVERRIDE;
virtual void OnSendBody() OVERRIDE;
- virtual SpdySendStatus OnSendBodyComplete(size_t bytes_sent) OVERRIDE;
-
- int body_data_sent() const { return body_data_sent_; }
+ virtual SpdySendStatus OnSendBodyComplete() OVERRIDE;
private:
- scoped_refptr<DrainableIOBuffer> buf_;
- int body_data_sent_;
+ scoped_refptr<StringIOBuffer> buf_;
};
} // namespace test
diff --git a/net/spdy/spdy_websocket_stream.cc b/net/spdy/spdy_websocket_stream.cc
index 8dc2f4a..aa75978 100644
--- a/net/spdy/spdy_websocket_stream.cc
+++ b/net/spdy/spdy_websocket_stream.cc
@@ -21,6 +21,7 @@ SpdyWebSocketStream::SpdyWebSocketStream(
SpdySession* spdy_session, Delegate* delegate)
: weak_ptr_factory_(this),
spdy_session_(spdy_session),
+ pending_send_data_length_(0),
delegate_(delegate) {
DCHECK(spdy_session_);
DCHECK(delegate_);
@@ -67,9 +68,11 @@ int SpdyWebSocketStream::SendData(const char* data, int length) {
NOTREACHED();
return ERR_UNEXPECTED;
}
+ DCHECK_GE(length, 0);
+ pending_send_data_length_ = static_cast<size_t>(length);
scoped_refptr<IOBuffer> buf(new IOBuffer(length));
memcpy(buf->data(), data, length);
- stream_->QueueStreamData(buf.get(), length, DATA_FLAG_NONE);
+ stream_->SendStreamData(buf.get(), length, DATA_FLAG_NONE);
return ERR_IO_PENDING;
}
@@ -90,7 +93,7 @@ void SpdyWebSocketStream::OnSendBody() {
CHECK(false);
}
-SpdySendStatus SpdyWebSocketStream::OnSendBodyComplete(size_t bytes_sent) {
+SpdySendStatus SpdyWebSocketStream::OnSendBodyComplete() {
CHECK(false);
return NO_MORE_DATA_TO_SEND;
}
@@ -113,9 +116,10 @@ int SpdyWebSocketStream::OnDataReceived(scoped_ptr<SpdyBuffer> buffer) {
return OK;
}
-void SpdyWebSocketStream::OnDataSent(size_t bytes_sent) {
+void SpdyWebSocketStream::OnDataSent() {
DCHECK(delegate_);
- delegate_->OnSentSpdyData(bytes_sent);
+ delegate_->OnSentSpdyData(pending_send_data_length_);
+ pending_send_data_length_ = 0;
}
void SpdyWebSocketStream::OnClose(int status) {
diff --git a/net/spdy/spdy_websocket_stream.h b/net/spdy/spdy_websocket_stream.h
index 77fe1ed..d3290404 100644
--- a/net/spdy/spdy_websocket_stream.h
+++ b/net/spdy/spdy_websocket_stream.h
@@ -76,13 +76,13 @@ class NET_EXPORT_PRIVATE SpdyWebSocketStream
// SpdyStream::Delegate
virtual SpdySendStatus OnSendHeadersComplete() OVERRIDE;
virtual void OnSendBody() OVERRIDE;
- virtual SpdySendStatus OnSendBodyComplete(size_t bytes_sent) OVERRIDE;
+ virtual SpdySendStatus OnSendBodyComplete() OVERRIDE;
virtual int OnResponseReceived(const SpdyHeaderBlock& response,
base::Time response_time,
int status) OVERRIDE;
virtual void OnHeadersSent() OVERRIDE;
virtual int OnDataReceived(scoped_ptr<SpdyBuffer> buffer) OVERRIDE;
- virtual void OnDataSent(size_t bytes_sent) OVERRIDE;
+ virtual void OnDataSent() OVERRIDE;
virtual void OnClose(int status) OVERRIDE;
private:
@@ -97,6 +97,7 @@ class NET_EXPORT_PRIVATE SpdyWebSocketStream
SpdyStreamRequest stream_request_;
base::WeakPtr<SpdyStream> stream_;
scoped_refptr<SpdySession> spdy_session_;
+ size_t pending_send_data_length_;
Delegate* delegate_;
DISALLOW_COPY_AND_ASSIGN(SpdyWebSocketStream);