summaryrefslogtreecommitdiffstats
path: root/net
diff options
context:
space:
mode:
authorakalin@chromium.org <akalin@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2013-04-18 08:31:58 +0000
committerakalin@chromium.org <akalin@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2013-04-18 08:31:58 +0000
commit8a938fed0b68373c2f4b3222b9dc8a6c534e7e7a (patch)
tree6a11658779d98ea44c2a195d9ba1009e27e46e43 /net
parentf9961d8588581ef6aac7b112c59fe8bd762cda44 (diff)
downloadchromium_src-8a938fed0b68373c2f4b3222b9dc8a6c534e7e7a.zip
chromium_src-8a938fed0b68373c2f4b3222b9dc8a6c534e7e7a.tar.gz
chromium_src-8a938fed0b68373c2f4b3222b9dc8a6c534e7e7a.tar.bz2
[SPDY] Avoid leaking bytes from the session flow control send window
Add a ConsumeSource parameter to SpdyBuffer::ConsumeCallback. Use it to detect when a DATA frame to be written is dropped before it is written. Put all the flow control functions together and clean them up a bit. BUG=176592 Review URL: https://chromiumcodereview.appspot.com/14188025 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@194851 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'net')
-rw-r--r--net/spdy/spdy_buffer.cc17
-rw-r--r--net/spdy/spdy_buffer.h31
-rw-r--r--net/spdy/spdy_buffer_unittest.cc15
-rw-r--r--net/spdy/spdy_session.cc176
-rw-r--r--net/spdy/spdy_session.h83
-rw-r--r--net/spdy/spdy_session_spdy3_unittest.cc115
-rw-r--r--net/spdy/spdy_stream.cc158
-rw-r--r--net/spdy/spdy_stream.h82
8 files changed, 436 insertions, 241 deletions
diff --git a/net/spdy/spdy_buffer.cc b/net/spdy/spdy_buffer.cc
index a48b35d..ef3e640 100644
--- a/net/spdy/spdy_buffer.cc
+++ b/net/spdy/spdy_buffer.cc
@@ -41,7 +41,7 @@ SpdyBuffer::SpdyBuffer(const char* data, size_t size) :
SpdyBuffer::~SpdyBuffer() {
if (GetRemainingSize() > 0)
- Consume(GetRemainingSize());
+ ConsumeHelper(GetRemainingSize(), DISCARD);
}
const char* SpdyBuffer::GetRemainingData() const {
@@ -57,17 +57,22 @@ void SpdyBuffer::AddConsumeCallback(const ConsumeCallback& consume_callback) {
}
void SpdyBuffer::Consume(size_t consume_size) {
+ ConsumeHelper(consume_size, CONSUME);
+};
+
+IOBuffer* SpdyBuffer::GetIOBufferForRemainingData() {
+ return new WrappedIOBuffer(GetRemainingData());
+}
+
+void SpdyBuffer::ConsumeHelper(size_t consume_size,
+ ConsumeSource consume_source) {
DCHECK_GE(consume_size, 1u);
DCHECK_LE(consume_size, GetRemainingSize());
offset_ += consume_size;
for (std::vector<ConsumeCallback>::const_iterator it =
consume_callbacks_.begin(); it != consume_callbacks_.end(); ++it) {
- it->Run(consume_size);
+ it->Run(consume_size, consume_source);
}
};
-IOBuffer* SpdyBuffer::GetIOBufferForRemainingData() {
- return new WrappedIOBuffer(GetRemainingData());
-}
-
} // namespace net
diff --git a/net/spdy/spdy_buffer.h b/net/spdy/spdy_buffer.h
index 4d04184..69d39ea 100644
--- a/net/spdy/spdy_buffer.h
+++ b/net/spdy/spdy_buffer.h
@@ -28,9 +28,21 @@ class SpdyFrame;
// fact that IOBuffer member functions are not virtual.
class NET_EXPORT_PRIVATE SpdyBuffer {
public:
- // A Callback that gets called whenever Consume() is called with the
- // number of bytes consumed.
- typedef base::Callback<void(size_t)> ConsumeCallback;
+ // The source of a call to a ConsumeCallback.
+ enum ConsumeSource {
+ // Called via a call to Consume().
+ CONSUME,
+ // Called via the SpdyBuffer being destroyed.
+ DISCARD
+ };
+
+ // A Callback that gets called when bytes are consumed with the
+ // (non-zero) number of bytes consumed and the source of the
+ // consume. May be called any number of times with CONSUME as the
+ // source followed by at most one call with DISCARD as the
+ // source. The sum of the number of bytes consumed equals the total
+ // size of the buffer.
+ typedef base::Callback<void(size_t, ConsumeSource)> ConsumeCallback;
// Construct with the data in the given frame. Assumes that data is
// owned by |frame| or outlives it.
@@ -40,6 +52,8 @@ class NET_EXPORT_PRIVATE SpdyBuffer {
// non-NULL and |size| must be non-zero.
SpdyBuffer(const char* data, size_t size);
+ // If there are bytes remaining in the buffer, triggers a call to
+ // any consume callbacks with a DISCARD source.
~SpdyBuffer();
// Returns the remaining (unconsumed) data.
@@ -48,11 +62,10 @@ class NET_EXPORT_PRIVATE SpdyBuffer {
// Returns the number of remaining (unconsumed) bytes.
size_t GetRemainingSize() const;
- // Add a callback which is called whenever Consume() is called. Used
- // mainly to update flow control windows. The ConsumeCallback should
- // not do anything complicated; ideally it should only update a
- // counter. In particular, it must *not* cause the SpdyBuffer itself
- // to be destroyed.
+ // Add a callback to be called when bytes are consumed. The
+ // ConsumeCallback should not do anything complicated; ideally it
+ // should only update a counter. In particular, it must *not* cause
+ // the SpdyBuffer itself to be destroyed.
void AddConsumeCallback(const ConsumeCallback& consume_callback);
// Consume the given number of bytes, which must be positive but not
@@ -66,6 +79,8 @@ class NET_EXPORT_PRIVATE SpdyBuffer {
IOBuffer* GetIOBufferForRemainingData();
private:
+ void ConsumeHelper(size_t consume_size, ConsumeSource consume_source);
+
const scoped_ptr<SpdyFrame> frame_;
std::vector<ConsumeCallback> consume_callbacks_;
size_t offset_;
diff --git a/net/spdy/spdy_buffer_unittest.cc b/net/spdy/spdy_buffer_unittest.cc
index bcf25d2..5a8bfc8 100644
--- a/net/spdy/spdy_buffer_unittest.cc
+++ b/net/spdy/spdy_buffer_unittest.cc
@@ -54,7 +54,11 @@ TEST_F(SpdyBufferTest, DataConstructor) {
EXPECT_EQ(std::string(kData, kDataSize), BufferToString(buffer));
}
-void IncrementBy(size_t* x, size_t delta) {
+void IncrementBy(size_t* x,
+ SpdyBuffer::ConsumeSource expected_consume_source,
+ size_t delta,
+ SpdyBuffer::ConsumeSource consume_source) {
+ EXPECT_EQ(expected_consume_source, consume_source);
*x += delta;
}
@@ -66,8 +70,10 @@ TEST_F(SpdyBufferTest, Consume) {
size_t x1 = 0;
size_t x2 = 0;
- buffer.AddConsumeCallback(base::Bind(&IncrementBy, &x1));
- buffer.AddConsumeCallback(base::Bind(&IncrementBy, &x2));
+ buffer.AddConsumeCallback(
+ base::Bind(&IncrementBy, &x1, SpdyBuffer::CONSUME));
+ buffer.AddConsumeCallback(
+ base::Bind(&IncrementBy, &x2, SpdyBuffer::CONSUME));
EXPECT_EQ(std::string(kData, kDataSize), BufferToString(buffer));
@@ -89,7 +95,8 @@ TEST_F(SpdyBufferTest, ConsumeOnDestruction) {
{
SpdyBuffer buffer(kData, kDataSize);
- buffer.AddConsumeCallback(base::Bind(&IncrementBy, &x));
+ buffer.AddConsumeCallback(
+ base::Bind(&IncrementBy, &x, SpdyBuffer::DISCARD));
}
EXPECT_EQ(kDataSize, x);
diff --git a/net/spdy/spdy_session.cc b/net/spdy/spdy_session.cc
index 8fa52db..760e4e3 100644
--- a/net/spdy/spdy_session.cc
+++ b/net/spdy/spdy_session.cc
@@ -794,9 +794,6 @@ scoped_ptr<SpdyBuffer> SpdySession::CreateDataBuffer(SpdyStreamId stream_id,
len = new_len;
flags = static_cast<SpdyDataFlags>(flags & ~DATA_FLAG_FIN);
}
- if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION)
- DecreaseSendWindowSize(static_cast<int32>(len));
- stream->DecreaseSendWindowSize(static_cast<int32>(len));
}
if (net_log().IsLoggingAllEvents()) {
@@ -816,7 +813,17 @@ scoped_ptr<SpdyBuffer> SpdySession::CreateDataBuffer(SpdyStreamId stream_id,
buffered_spdy_framer_->CreateDataFrame(
stream_id, data->data(), static_cast<uint32>(len), flags));
- return scoped_ptr<SpdyBuffer>(new SpdyBuffer(frame.Pass()));
+ scoped_ptr<SpdyBuffer> data_buffer(new SpdyBuffer(frame.Pass()));
+
+ if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) {
+ DecreaseSendWindowSize(static_cast<int32>(len));
+ data_buffer->AddConsumeCallback(
+ base::Bind(&SpdySession::OnWriteBufferConsumed,
+ weak_factory_.GetWeakPtr(),
+ static_cast<size_t>(len)));
+ }
+
+ return data_buffer.Pass();
}
void SpdySession::CloseStream(SpdyStreamId stream_id, int status) {
@@ -1447,7 +1454,7 @@ void SpdySession::OnStreamFrameData(SpdyStreamId stream_id,
if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) {
DecreaseRecvWindowSize(static_cast<int32>(len));
buffer->AddConsumeCallback(
- base::Bind(&SpdySession::IncreaseRecvWindowSize,
+ base::Bind(&SpdySession::OnReadBufferConsumed,
weak_factory_.GetWeakPtr()));
}
} else {
@@ -2186,9 +2193,26 @@ SSLClientSocket* SpdySession::GetSSLClientSocket() const {
return ssl_socket;
}
-void SpdySession::IncreaseSendWindowSize(int32 delta_window_size) {
+void SpdySession::OnWriteBufferConsumed(
+ size_t frame_payload_size,
+ size_t consume_size,
+ SpdyBuffer::ConsumeSource consume_source) {
DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
+ if (consume_source == SpdyBuffer::DISCARD) {
+ // If we're discarding a frame or part of it, increase the send
+ // window by the number of discarded bytes. (Although if we're
+ // discarding part of a frame, it's probably because of a write
+ // error and we'll be tearing down the session soon.)
+ size_t remaining_payload_bytes = std::min(consume_size, frame_payload_size);
+ DCHECK_GT(remaining_payload_bytes, 0u);
+ IncreaseSendWindowSize(static_cast<int32>(remaining_payload_bytes));
+ }
+ // For consumed bytes, the send window is increased when we receive
+ // a WINDOW_UPDATE frame.
+}
+void SpdySession::IncreaseSendWindowSize(int32 delta_window_size) {
+ DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
DCHECK_GE(delta_window_size, 1);
// Check for overflow.
@@ -2214,67 +2238,6 @@ void SpdySession::IncreaseSendWindowSize(int32 delta_window_size) {
ResumeSendStalledStreams();
}
-void SpdySession::QueueSendStalledStream(
- const scoped_refptr<SpdyStream>& stream) {
- DCHECK(stream->send_stalled_by_flow_control());
- stream_send_unstall_queue_[stream->priority()].push_back(stream->stream_id());
-}
-
-namespace {
-
-// Helper function to return the total size of an array of objects
-// with .size() member functions.
-template <typename T, size_t N> size_t GetTotalSize(const T (&arr)[N]) {
- size_t total_size = 0;
- for (size_t i = 0; i < N; ++i) {
- total_size += arr[i].size();
- }
- return total_size;
-}
-
-} // namespace
-
-void SpdySession::ResumeSendStalledStreams() {
- DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
-
- // We don't have to worry about new streams being queued, since
- // doing so would cause IsSendStalled() to return true. But we do
- // have to worry about streams being closed, as well as ourselves
- // being closed.
-
- while (!IsClosed() && !IsSendStalled()) {
- size_t old_size = 0;
- if (DCHECK_IS_ON())
- old_size = GetTotalSize(stream_send_unstall_queue_);
-
- SpdyStreamId stream_id = PopStreamToPossiblyResume();
- if (stream_id == 0)
- break;
- ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
- // The stream may actually still be send-stalled after this (due
- // to its own send window) but that's okay -- it'll then be
- // resumed once its send window increases.
- if (it != active_streams_.end())
- it->second->PossiblyResumeIfSendStalled();
-
- // The size should decrease unless we got send-stalled again.
- if (!IsSendStalled())
- DCHECK_LT(GetTotalSize(stream_send_unstall_queue_), old_size);
- }
-}
-
-SpdyStreamId SpdySession::PopStreamToPossiblyResume() {
- for (int i = NUM_PRIORITIES - 1; i >= 0; --i) {
- std::deque<SpdyStreamId>* queue = &stream_send_unstall_queue_[i];
- if (!queue->empty()) {
- SpdyStreamId stream_id = queue->front();
- queue->pop_front();
- return stream_id;
- }
- }
- return 0;
-}
-
void SpdySession::DecreaseSendWindowSize(int32 delta_window_size) {
DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
@@ -2295,16 +2258,22 @@ void SpdySession::DecreaseSendWindowSize(int32 delta_window_size) {
-delta_window_size, session_send_window_size_));
}
-void SpdySession::IncreaseRecvWindowSize(size_t delta_window_size) {
- if (flow_control_state_ < FLOW_CONTROL_STREAM_AND_SESSION)
- return;
+void SpdySession::OnReadBufferConsumed(
+ size_t consume_size,
+ SpdyBuffer::ConsumeSource consume_source) {
+ DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
+ DCHECK_GE(consume_size, 1u);
+ DCHECK_LE(consume_size, static_cast<size_t>(kint32max));
+ IncreaseRecvWindowSize(static_cast<int32>(consume_size));
+}
+void SpdySession::IncreaseRecvWindowSize(int32 delta_window_size) {
+ DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
DCHECK_GE(session_unacked_recv_window_bytes_, 0);
DCHECK_GE(session_recv_window_size_, session_unacked_recv_window_bytes_);
- DCHECK_GE(delta_window_size, 1u);
+ DCHECK_GE(delta_window_size, 1);
// Check for overflow.
- DCHECK_LE(delta_window_size,
- static_cast<size_t>(kint32max - session_recv_window_size_));
+ DCHECK_LE(delta_window_size, kint32max - session_recv_window_size_);
session_recv_window_size_ += delta_window_size;
net_log_.AddEvent(
@@ -2343,4 +2312,65 @@ void SpdySession::DecreaseRecvWindowSize(int32 delta_window_size) {
-delta_window_size, session_recv_window_size_));
}
+void SpdySession::QueueSendStalledStream(
+ const scoped_refptr<SpdyStream>& stream) {
+ DCHECK(stream->send_stalled_by_flow_control());
+ stream_send_unstall_queue_[stream->priority()].push_back(stream->stream_id());
+}
+
+namespace {
+
+// Helper function to return the total size of an array of objects
+// with .size() member functions.
+template <typename T, size_t N> size_t GetTotalSize(const T (&arr)[N]) {
+ size_t total_size = 0;
+ for (size_t i = 0; i < N; ++i) {
+ total_size += arr[i].size();
+ }
+ return total_size;
+}
+
+} // namespace
+
+void SpdySession::ResumeSendStalledStreams() {
+ DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
+
+ // We don't have to worry about new streams being queued, since
+ // doing so would cause IsSendStalled() to return true. But we do
+ // have to worry about streams being closed, as well as ourselves
+ // being closed.
+
+ while (!IsClosed() && !IsSendStalled()) {
+ size_t old_size = 0;
+ if (DCHECK_IS_ON())
+ old_size = GetTotalSize(stream_send_unstall_queue_);
+
+ SpdyStreamId stream_id = PopStreamToPossiblyResume();
+ if (stream_id == 0)
+ break;
+ ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
+ // The stream may actually still be send-stalled after this (due
+ // to its own send window) but that's okay -- it'll then be
+ // resumed once its send window increases.
+ if (it != active_streams_.end())
+ it->second->PossiblyResumeIfSendStalled();
+
+ // The size should decrease unless we got send-stalled again.
+ if (!IsSendStalled())
+ DCHECK_LT(GetTotalSize(stream_send_unstall_queue_), old_size);
+ }
+}
+
+SpdyStreamId SpdySession::PopStreamToPossiblyResume() {
+ for (int i = NUM_PRIORITIES - 1; i >= 0; --i) {
+ std::deque<SpdyStreamId>* queue = &stream_send_unstall_queue_[i];
+ if (!queue->empty()) {
+ SpdyStreamId stream_id = queue->front();
+ queue->pop_front();
+ return stream_id;
+ }
+ }
+ return 0;
+}
+
} // namespace net
diff --git a/net/spdy/spdy_session.h b/net/spdy/spdy_session.h
index 0746398..63d104d 100644
--- a/net/spdy/spdy_session.h
+++ b/net/spdy/spdy_session.h
@@ -458,13 +458,14 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>,
FRIEND_TEST_ALL_PREFIXES(SpdySessionSpdy3Test, ProtocolNegotiation);
FRIEND_TEST_ALL_PREFIXES(SpdySessionSpdy3Test, ProtocolNegotiation31);
FRIEND_TEST_ALL_PREFIXES(SpdySessionSpdy3Test, ProtocolNegotiation4);
- FRIEND_TEST_ALL_PREFIXES(SpdySessionSpdy3Test, IncreaseRecvWindowSize);
FRIEND_TEST_ALL_PREFIXES(SpdySessionSpdy3Test, AdjustRecvWindowSize31);
FRIEND_TEST_ALL_PREFIXES(SpdySessionSpdy3Test, AdjustSendWindowSize31);
FRIEND_TEST_ALL_PREFIXES(SpdySessionSpdy3Test,
SessionFlowControlInactiveStream31);
FRIEND_TEST_ALL_PREFIXES(SpdySessionSpdy3Test,
SessionFlowControlNoReceiveLeaks31);
+ FRIEND_TEST_ALL_PREFIXES(SpdySessionSpdy3Test,
+ SessionFlowControlNoSendLeaks31);
FRIEND_TEST_ALL_PREFIXES(SpdySessionSpdy3Test, SessionFlowControlEndToEnd31);
typedef std::deque<SpdyStreamRequest*> PendingStreamRequestQueue;
@@ -652,14 +653,59 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>,
bool fin,
const SpdyHeaderBlock& headers) OVERRIDE;
- // If session flow control is turned on, called by OnWindowUpdate()
- // (which is in turn called by the framer) to increase this
- // session's send window size by |delta_window_size| from a
- // WINDOW_UPDATE frome, which must be at least 1. If
- // |delta_window_size| would cause this session's send window size
- // to overflow, does nothing.
+ // Called when bytes are consumed from a SpdyBuffer for a DATA frame
+ // that is to be written or is being written. Increases the send
+ // window size accordingly if some or all of the SpdyBuffer is being
+ // discarded.
+ //
+ // If session flow control is turned off, this must not be called.
+ void OnWriteBufferConsumed(size_t frame_payload_size,
+ size_t consume_size,
+ SpdyBuffer::ConsumeSource consume_source);
+
+ // Called by OnWindowUpdate() (which is in turn called by the
+ // framer) to increase this session's send window size by
+ // |delta_window_size| from a WINDOW_UPDATE frome, which must be at
+ // least 1. If |delta_window_size| would cause this session's send
+ // window size to overflow, does nothing.
+ //
+ // If session flow control is turned off, this must not be called.
void IncreaseSendWindowSize(int32 delta_window_size);
+ // If session flow control is turned on, called by CreateDataFrame()
+ // (which is in turn called by a stream) to decrease this session's
+ // send window size by |delta_window_size|, which must be at least 1
+ // and at most kMaxSpdyFrameChunkSize. |delta_window_size| must not
+ // cause this session's send window size to go negative.
+ //
+ // If session flow control is turned off, this must not be called.
+ void DecreaseSendWindowSize(int32 delta_window_size);
+
+ // Called when bytes are consumed by the delegate from a SpdyBuffer
+ // containing received data. Increases the receive window size
+ // accordingly.
+ //
+ // If session flow control is turned off, this must not be called.
+ void OnReadBufferConsumed(size_t consume_size,
+ SpdyBuffer::ConsumeSource consume_source);
+
+ // Called by OnReadBufferConsume to increase this session's receive
+ // window size by |delta_window_size|, which must be at least 1 and
+ // must not cause this session's receive window size to overflow,
+ // possibly also sending a WINDOW_UPDATE frame. Also called during
+ // initialization to set the initial receive window size.
+ //
+ // If session flow control is turned off, this must not be called.
+ void IncreaseRecvWindowSize(int32 delta_window_size);
+
+ // Called by OnStreamFrameData (which is in turn called by the
+ // framer) to decrease this session's receive window size by
+ // |delta_window_size|, which must be at least 1 and must not cause
+ // this session's receive window size to go negative.
+ //
+ // If session flow control is turned off, this must not be called.
+ void DecreaseRecvWindowSize(int32 delta_window_size);
+
// Queue a send-stalled stream for possibly resuming once we're not
// send-stalled anymore.
void QueueSendStalledStream(const scoped_refptr<SpdyStream>& stream);
@@ -672,29 +718,6 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>,
// empty.
SpdyStreamId PopStreamToPossiblyResume();
- // If session flow control is turned on, called by CreateDataFrame()
- // (which is in turn called by a stream) to decrease this session's
- // send window size by |delta_window_size|, which must be at least 1
- // and at most kMaxSpdyFrameChunkSize. |delta_window_size| must not
- // cause this session's send window size to go negative.
- void DecreaseSendWindowSize(int32 delta_window_size);
-
- // Called by SpdyBuffers (via ConsumeCallbacks) to increase this
- // session's receive window size by |delta_window_size|, which must
- // be at least 1 and must not cause this session's receive window
- // size to overflow, possibly also sending a WINDOW_UPDATE
- // frame. Also called during initialization to set the initial
- // receive window size. Does nothing if session flow control is
- // turned off.
- void IncreaseRecvWindowSize(size_t delta_window_size);
-
- // If session flow control is turned on, called by OnStreamFrameData
- // (which is in turn called by the framer) to decrease this
- // session's receive window size by |delta_window_size|, which must
- // be at least 1 and must not cause this session's receive window
- // size to go negative.
- void DecreaseRecvWindowSize(int32 delta_window_size);
-
// --------------------------
// Helper methods for testing
// --------------------------
diff --git a/net/spdy/spdy_session_spdy3_unittest.cc b/net/spdy/spdy_session_spdy3_unittest.cc
index c15d3fd..f927bb4 100644
--- a/net/spdy/spdy_session_spdy3_unittest.cc
+++ b/net/spdy/spdy_session_spdy3_unittest.cc
@@ -2103,34 +2103,6 @@ TEST_F(SpdySessionSpdy3Test, ProtocolNegotiation4) {
EXPECT_EQ(0, session->session_unacked_recv_window_bytes_);
}
-// SpdySession::IncreaseRecvWindowSize should be callable even if
-// session flow control isn't turned on, but it should have no effect.
-TEST_F(SpdySessionSpdy3Test, IncreaseRecvWindowSize) {
- session_deps_.host_resolver->set_synchronous_mode(true);
-
- MockConnect connect_data(SYNCHRONOUS, OK);
- MockRead reads[] = {
- MockRead(SYNCHRONOUS, 0, 0) // EOF
- };
- StaticSocketDataProvider data(reads, arraysize(reads), NULL, 0);
- data.set_connect_data(connect_data);
- session_deps_.socket_factory->AddSocketDataProvider(&data);
-
- CreateNetworkSession();
- scoped_refptr<SpdySession> session = GetSession(pair_);
- InitializeSession(
- http_session_.get(), session.get(), test_host_port_pair_);
- EXPECT_EQ(SpdySession::FLOW_CONTROL_STREAM,
- session->flow_control_state());
-
- EXPECT_EQ(0, session->session_recv_window_size_);
- EXPECT_EQ(0, session->session_unacked_recv_window_bytes_);
-
- session->IncreaseRecvWindowSize(100);
- EXPECT_EQ(0, session->session_recv_window_size_);
- EXPECT_EQ(0, session->session_unacked_recv_window_bytes_);
-}
-
// SpdySession::{Increase,Decrease}RecvWindowSize should properly
// adjust the session receive window size when the "enable_spdy_31"
// flag is set. In addition, SpdySession::IncreaseRecvWindowSize
@@ -2376,6 +2348,87 @@ TEST_F(SpdySessionSpdy3Test, SessionFlowControlNoReceiveLeaks31) {
EXPECT_EQ(msg_data_size, session->session_unacked_recv_window_bytes_);
}
+// Send data back and forth but close the stream before its data frame
+// can be written to the socket. The send window should then increase
+// to its original value, i.e. we shouldn't "leak" send window bytes.
+TEST_F(SpdySessionSpdy3Test, SessionFlowControlNoSendLeaks31) {
+ const char kStreamUrl[] = "http://www.google.com/";
+
+ session_deps_.enable_spdy_31 = true;
+
+ const int32 msg_data_size = 100;
+ const std::string msg_data(msg_data_size, 'a');
+
+ MockConnect connect_data(SYNCHRONOUS, OK);
+
+ scoped_ptr<SpdyFrame> initial_window_update(
+ ConstructSpdyWindowUpdate(
+ kSessionFlowControlStreamId,
+ kDefaultInitialRecvWindowSize - kSpdySessionInitialWindowSize));
+ scoped_ptr<SpdyFrame> req(
+ ConstructSpdyPost(kStreamUrl, 1, msg_data_size, MEDIUM, NULL, 0));
+ MockWrite writes[] = {
+ CreateMockWrite(*initial_window_update, 0),
+ CreateMockWrite(*req, 1),
+ };
+
+ scoped_ptr<SpdyFrame> resp(ConstructSpdyGetSynReply(NULL, 0, 1));
+ MockRead reads[] = {
+ CreateMockRead(*resp, 2),
+ MockRead(ASYNC, 0, 3) // EOF
+ };
+
+ // Create SpdySession and SpdyStream and send the request.
+ DeterministicSocketData data(reads, arraysize(reads),
+ writes, arraysize(writes));
+ data.set_connect_data(connect_data);
+ session_deps_.host_resolver->set_synchronous_mode(true);
+ session_deps_.deterministic_socket_factory->AddSocketDataProvider(&data);
+
+ SSLSocketDataProvider ssl(SYNCHRONOUS, OK);
+ session_deps_.deterministic_socket_factory->AddSSLSocketDataProvider(&ssl);
+
+ CreateDeterministicNetworkSession();
+
+ scoped_refptr<SpdySession> session = CreateInitializedSession();
+
+ GURL url(kStreamUrl);
+ scoped_refptr<SpdyStream> stream =
+ CreateStreamSynchronously(session, url, MEDIUM, BoundNetLog());
+ ASSERT_TRUE(stream.get() != NULL);
+ EXPECT_EQ(0u, stream->stream_id());
+
+ test::StreamDelegateSendImmediate delegate(
+ stream.get(), scoped_ptr<SpdyHeaderBlock>(), msg_data);
+ stream->SetDelegate(&delegate);
+
+ stream->set_spdy_headers(
+ ConstructPostHeaderBlock(url.spec(), msg_data_size));
+ EXPECT_TRUE(stream->HasUrl());
+ EXPECT_EQ(ERR_IO_PENDING, stream->SendRequest(true));
+
+ EXPECT_EQ(kSpdySessionInitialWindowSize, session->session_send_window_size_);
+
+ data.RunFor(2);
+
+ EXPECT_EQ(kSpdySessionInitialWindowSize, session->session_send_window_size_);
+
+ data.RunFor(1);
+
+ EXPECT_TRUE(data.at_write_eof());
+ EXPECT_TRUE(data.at_read_eof());
+
+ EXPECT_EQ(kSpdySessionInitialWindowSize - msg_data_size,
+ session->session_send_window_size_);
+
+ // Closing the stream should increase the session's send window.
+ stream->Close();
+
+ EXPECT_EQ(kSpdySessionInitialWindowSize, session->session_send_window_size_);
+
+ EXPECT_EQ(OK, delegate.WaitForClose());
+}
+
// Send data back and forth; the send and receive windows should
// change appropriately.
TEST_F(SpdySessionSpdy3Test, SessionFlowControlEndToEnd31) {
@@ -2488,8 +2541,9 @@ TEST_F(SpdySessionSpdy3Test, SessionFlowControlEndToEnd31) {
EXPECT_EQ(msg_data, delegate.TakeReceivedData());
- // Draining the delegate's read queue should increase our receive
- // window.
+ // Draining the delegate's read queue should increase the session's
+ // receive window.
+ EXPECT_EQ(kSpdySessionInitialWindowSize, session->session_send_window_size_);
EXPECT_EQ(kDefaultInitialRecvWindowSize, session->session_recv_window_size_);
EXPECT_EQ(msg_data_size, session->session_unacked_recv_window_bytes_);
@@ -2497,6 +2551,7 @@ TEST_F(SpdySessionSpdy3Test, SessionFlowControlEndToEnd31) {
EXPECT_EQ(OK, delegate.WaitForClose());
+ EXPECT_EQ(kSpdySessionInitialWindowSize, session->session_send_window_size_);
EXPECT_EQ(kDefaultInitialRecvWindowSize, session->session_recv_window_size_);
EXPECT_EQ(msg_data_size, session->session_unacked_recv_window_bytes_);
}
diff --git a/net/spdy/spdy_stream.cc b/net/spdy/spdy_stream.cc
index 2d7f5e6..482b12c 100644
--- a/net/spdy/spdy_stream.cc
+++ b/net/spdy/spdy_stream.cc
@@ -256,15 +256,32 @@ void SpdyStream::AdjustSendWindowSize(int32 delta_window_size) {
PossiblyResumeIfSendStalled();
}
+void SpdyStream::OnWriteBufferConsumed(
+ size_t frame_payload_size,
+ size_t consume_size,
+ SpdyBuffer::ConsumeSource consume_source) {
+ DCHECK_GE(session_->flow_control_state(), SpdySession::FLOW_CONTROL_STREAM);
+ if (consume_source == SpdyBuffer::DISCARD) {
+ // If we're discarding a frame or part of it, increase the send
+ // window by the number of discarded bytes. (Although if we're
+ // discarding part of a frame, it's probably because of a write
+ // error and we'll be tearing down the stream soon.)
+ size_t remaining_payload_bytes = std::min(consume_size, frame_payload_size);
+ DCHECK_GT(remaining_payload_bytes, 0u);
+ IncreaseSendWindowSize(static_cast<int32>(remaining_payload_bytes));
+ }
+ // For consumed bytes, the send window is increased when we receive
+ // a WINDOW_UPDATE frame.
+}
+
void SpdyStream::IncreaseSendWindowSize(int32 delta_window_size) {
DCHECK_GE(session_->flow_control_state(), SpdySession::FLOW_CONTROL_STREAM);
+ DCHECK_GE(delta_window_size, 1);
// Ignore late WINDOW_UPDATEs.
if (closed())
return;
- DCHECK_GE(delta_window_size, 1);
-
if (send_window_size_ > 0) {
// Check for overflow.
int32 max_delta_window_size = kint32max - send_window_size_;
@@ -296,7 +313,7 @@ void SpdyStream::DecreaseSendWindowSize(int32 delta_window_size) {
// We only call this method when sending a frame. Therefore,
// |delta_window_size| should be within the valid frame size range.
- DCHECK_GE(delta_window_size, 0);
+ DCHECK_GE(delta_window_size, 1);
DCHECK_LE(delta_window_size, kMaxSpdyFrameChunkSize);
// |send_window_size_| should have been at least |delta_window_size| for
@@ -311,6 +328,68 @@ void SpdyStream::DecreaseSendWindowSize(int32 delta_window_size) {
stream_id_, -delta_window_size, send_window_size_));
}
+void SpdyStream::OnReadBufferConsumed(
+ size_t consume_size,
+ SpdyBuffer::ConsumeSource consume_source) {
+ DCHECK_GE(session_->flow_control_state(), SpdySession::FLOW_CONTROL_STREAM);
+ DCHECK_GE(consume_size, 1u);
+ DCHECK_LE(consume_size, static_cast<size_t>(kint32max));
+ IncreaseRecvWindowSize(static_cast<int32>(consume_size));
+}
+
+void SpdyStream::IncreaseRecvWindowSize(int32 delta_window_size) {
+ DCHECK_GE(session_->flow_control_state(), SpdySession::FLOW_CONTROL_STREAM);
+
+ // By the time a read is processed by the delegate, this stream may
+ // already be inactive.
+ if (!session_->IsStreamActive(stream_id_))
+ return;
+
+ DCHECK_GE(unacked_recv_window_bytes_, 0);
+ DCHECK_GE(recv_window_size_, unacked_recv_window_bytes_);
+ DCHECK_GE(delta_window_size, 1);
+ // Check for overflow.
+ DCHECK_LE(delta_window_size, kint32max - recv_window_size_);
+
+ recv_window_size_ += delta_window_size;
+ net_log_.AddEvent(
+ NetLog::TYPE_SPDY_STREAM_UPDATE_RECV_WINDOW,
+ base::Bind(&NetLogSpdyStreamWindowUpdateCallback,
+ stream_id_, delta_window_size, recv_window_size_));
+
+ unacked_recv_window_bytes_ += delta_window_size;
+ if (unacked_recv_window_bytes_ >
+ session_->stream_initial_recv_window_size() / 2) {
+ session_->SendStreamWindowUpdate(
+ stream_id_, static_cast<uint32>(unacked_recv_window_bytes_));
+ unacked_recv_window_bytes_ = 0;
+ }
+}
+
+void SpdyStream::DecreaseRecvWindowSize(int32 delta_window_size) {
+ DCHECK(session_->IsStreamActive(stream_id_));
+ DCHECK_GE(session_->flow_control_state(), SpdySession::FLOW_CONTROL_STREAM);
+ DCHECK_GE(delta_window_size, 1);
+
+ // Since we never decrease the initial window size,
+ // |delta_window_size| should never cause |recv_window_size_| to go
+ // negative. If we do, it's a client-side bug, so we use
+ // PROTOCOL_ERROR for lack of a better error code.
+ if (delta_window_size > recv_window_size_) {
+ session_->ResetStream(
+ stream_id_, RST_STREAM_PROTOCOL_ERROR,
+ "Invalid delta_window_size for DecreaseRecvWindowSize");
+ NOTREACHED();
+ return;
+ }
+
+ recv_window_size_ -= delta_window_size;
+ net_log_.AddEvent(
+ NetLog::TYPE_SPDY_STREAM_UPDATE_RECV_WINDOW,
+ base::Bind(&NetLogSpdyStreamWindowUpdateCallback,
+ stream_id_, -delta_window_size, recv_window_size_));
+}
+
int SpdyStream::GetPeerAddress(IPEndPoint* address) const {
return session_->GetPeerAddress(address);
}
@@ -456,7 +535,7 @@ void SpdyStream::OnDataReceived(scoped_ptr<SpdyBuffer> buffer) {
if (session_->flow_control_state() >= SpdySession::FLOW_CONTROL_STREAM) {
DecreaseRecvWindowSize(static_cast<int32>(length));
buffer->AddConsumeCallback(
- base::Bind(&SpdyStream::IncreaseRecvWindowSize,
+ base::Bind(&SpdyStream::OnReadBufferConsumed,
weak_ptr_factory_.GetWeakPtr()));
}
@@ -569,6 +648,22 @@ void SpdyStream::QueueStreamData(IOBuffer* data,
if (!data_buffer)
return;
+ if (session_->flow_control_state() >= SpdySession::FLOW_CONTROL_STREAM) {
+ DCHECK_GE(data_buffer->GetRemainingSize(),
+ session_->GetDataFrameMinimumSize());
+ size_t payload_size =
+ data_buffer->GetRemainingSize() - session_->GetDataFrameMinimumSize();
+ DCHECK_LE(payload_size, session_->GetDataFrameMaximumPayload());
+ DecreaseSendWindowSize(static_cast<int32>(payload_size));
+ // This currently isn't strictly needed, since write frames are
+ // discarded only if the stream is about to be closed. But have it
+ // here anyway just in case this changes.
+ data_buffer->AddConsumeCallback(
+ base::Bind(&SpdyStream::OnWriteBufferConsumed,
+ weak_ptr_factory_.GetWeakPtr(),
+ payload_size));
+ }
+
session_->EnqueueStreamWrite(
this, DATA,
scoped_ptr<SpdyBufferProducer>(
@@ -889,59 +984,4 @@ void SpdyStream::UpdateHistograms() {
UMA_HISTOGRAM_COUNTS("Net.SpdyRecvBytes", recv_bytes_);
}
-void SpdyStream::IncreaseRecvWindowSize(size_t delta_window_size) {
- if (session_->flow_control_state() < SpdySession::FLOW_CONTROL_STREAM)
- return;
-
- // By the time a read is processed by the delegate, this stream may
- // already be inactive.
- if (!session_->IsStreamActive(stream_id_))
- return;
-
- DCHECK_GE(unacked_recv_window_bytes_, 0);
- DCHECK_GE(recv_window_size_, unacked_recv_window_bytes_);
- DCHECK_GE(delta_window_size, 1u);
- // Check for overflow.
- DCHECK_LE(delta_window_size,
- static_cast<size_t>(kint32max - recv_window_size_));
-
- recv_window_size_ += delta_window_size;
- net_log_.AddEvent(
- NetLog::TYPE_SPDY_STREAM_UPDATE_RECV_WINDOW,
- base::Bind(&NetLogSpdyStreamWindowUpdateCallback,
- stream_id_, delta_window_size, recv_window_size_));
-
- unacked_recv_window_bytes_ += delta_window_size;
- if (unacked_recv_window_bytes_ >
- session_->stream_initial_recv_window_size() / 2) {
- session_->SendStreamWindowUpdate(
- stream_id_, static_cast<uint32>(unacked_recv_window_bytes_));
- unacked_recv_window_bytes_ = 0;
- }
-}
-
-void SpdyStream::DecreaseRecvWindowSize(int32 delta_window_size) {
- DCHECK(session_->IsStreamActive(stream_id_));
- DCHECK_GE(session_->flow_control_state(), SpdySession::FLOW_CONTROL_STREAM);
- DCHECK_GE(delta_window_size, 1);
-
- // Since we never decrease the initial window size,
- // |delta_window_size| should never cause |recv_window_size_| to go
- // negative. If we do, it's a client-side bug, so we use
- // PROTOCOL_ERROR for lack of a better error code.
- if (delta_window_size > recv_window_size_) {
- session_->ResetStream(
- stream_id_, RST_STREAM_PROTOCOL_ERROR,
- "Invalid delta_window_size for DecreaseRecvWindowSize");
- NOTREACHED();
- return;
- }
-
- recv_window_size_ -= delta_window_size;
- net_log_.AddEvent(
- NetLog::TYPE_SPDY_STREAM_UPDATE_RECV_WINDOW,
- base::Bind(&NetLogSpdyStreamWindowUpdateCallback,
- stream_id_, -delta_window_size, recv_window_size_));
-}
-
} // namespace net
diff --git a/net/spdy/spdy_stream.h b/net/spdy/spdy_stream.h
index 1ae204e..d3ef69f 100644
--- a/net/spdy/spdy_stream.h
+++ b/net/spdy/spdy_stream.h
@@ -147,24 +147,35 @@ class NET_EXPORT_PRIVATE SpdyStream
send_stalled_by_flow_control_ = stalled;
}
- // If stream flow control is turned on, called by the session to
- // adjust this stream's send window size by |delta_window_size|,
- // which is the difference between the SETTINGS_INITIAL_WINDOW_SIZE
- // in the most recent SETTINGS frame and the previous initial send
- // window size, possibly unstalling this stream. Although
- // |delta_window_size| may cause this stream's send window size to
- // go negative, it must not cause it to wrap around in either
- // direction. Does nothing if the stream is already closed.
+ // Called by the session to adjust this stream's send window size by
+ // |delta_window_size|, which is the difference between the
+ // SETTINGS_INITIAL_WINDOW_SIZE in the most recent SETTINGS frame
+ // and the previous initial send window size, possibly unstalling
+ // this stream. Although |delta_window_size| may cause this stream's
+ // send window size to go negative, it must not cause it to wrap
+ // around in either direction. Does nothing if the stream is already
+ // closed.
//
// If stream flow control is turned off, this must not be called.
void AdjustSendWindowSize(int32 delta_window_size);
- // If stream flow control is turned on, called by the session to
- // increase this stream's send window size by |delta_window_size|
- // from a WINDOW_UPDATE frome, which must be at least 1, possibly
- // unstalling this stream. If |delta_window_size| would cause this
- // stream's send window size to overflow, calls into the session to
- // reset this stream. Does nothing if the stream is already closed.
+ // Called when bytes are consumed from a SpdyBuffer for a DATA frame
+ // that is to be written or is being written. Increases the send
+ // window size accordingly if some or all of the SpdyBuffer is being
+ // discarded.
+ //
+ // If stream flow control is turned off, this must not be called.
+ void OnWriteBufferConsumed(size_t frame_payload_size,
+ size_t consume_size,
+ SpdyBuffer::ConsumeSource consume_source);
+
+ // Called by the session to increase this stream's send window size
+ // by |delta_window_size| (which must be at least 1) from a received
+ // WINDOW_UPDATE frame or from a dropped DATA frame that was
+ // intended to be sent, possibly unstalling this stream. If
+ // |delta_window_size| would cause this stream's send window size to
+ // overflow, calls into the session to reset this stream. Does
+ // nothing if the stream is already closed.
//
// If stream flow control is turned off, this must not be called.
void IncreaseSendWindowSize(int32 delta_window_size);
@@ -178,6 +189,32 @@ class NET_EXPORT_PRIVATE SpdyStream
// If stream flow control is turned off, this must not be called.
void DecreaseSendWindowSize(int32 delta_window_size);
+ // Called when bytes are consumed by the delegate from a SpdyBuffer
+ // containing received data. Increases the receive window size
+ // accordingly.
+ //
+ // If stream flow control is turned off, this must not be called.
+ void OnReadBufferConsumed(size_t consume_size,
+ SpdyBuffer::ConsumeSource consume_source);
+
+ // Called by OnReadBufferConsume to increase this stream's receive
+ // window size by |delta_window_size|, which must be at least 1 and
+ // must not cause this stream's receive window size to overflow,
+ // possibly also sending a WINDOW_UPDATE frame. Does nothing if the
+ // stream is not active.
+ //
+ // If stream flow control is turned off, this must not be called.
+ void IncreaseRecvWindowSize(int32 delta_window_size);
+
+ // Called by OnDataReceived (which is in turn called by the session)
+ // to decrease this stream's receive window size by
+ // |delta_window_size|, which must be at least 1 and must not cause
+ // this stream's receive window size to go negative.
+ //
+ // If stream flow control is turned off or the stream is not active,
+ // this must not be called.
+ void DecreaseRecvWindowSize(int32 delta_window_size);
+
int GetPeerAddress(IPEndPoint* address) const;
int GetLocalAddress(IPEndPoint* address) const;
@@ -338,23 +375,6 @@ class NET_EXPORT_PRIVATE SpdyStream
scoped_ptr<SpdyFrame> ProduceHeaderFrame(
scoped_ptr<SpdyHeaderBlock> header_block);
- // Called by SpdyBuffers (via ConsumeCallbacks) to increase this
- // stream's receive window size by |delta_window_size|, which must
- // be at least 1 and must not cause this stream's receive window
- // size to overflow, possibly also sending a WINDOW_UPDATE frame.
- //
- // Unlike the functions above, this may be called even when stream
- // flow control is turned off, although this does nothing in that
- // case (and also if the stream is inactive).
- void IncreaseRecvWindowSize(size_t delta_window_size);
-
- // If the stream is active and stream flow control is turned on,
- // called by OnDataReceived (which is in turn called by the session)
- // to decrease this stream's receive window size by
- // |delta_window_size|, which must be at least 1 and must not cause
- // this stream's receive window size to go negative.
- void DecreaseRecvWindowSize(int32 delta_window_size);
-
base::WeakPtrFactory<SpdyStream> weak_ptr_factory_;
// There is a small period of time between when a server pushed stream is