summaryrefslogtreecommitdiffstats
path: root/net
diff options
context:
space:
mode:
Diffstat (limited to 'net')
-rw-r--r--net/spdy/spdy_network_transaction_unittest.cc104
-rw-r--r--net/spdy/spdy_stream.cc105
-rw-r--r--net/spdy/spdy_stream.h10
3 files changed, 191 insertions, 28 deletions
diff --git a/net/spdy/spdy_network_transaction_unittest.cc b/net/spdy/spdy_network_transaction_unittest.cc
index 31847ef..0d67e41 100644
--- a/net/spdy/spdy_network_transaction_unittest.cc
+++ b/net/spdy/spdy_network_transaction_unittest.cc
@@ -1087,4 +1087,108 @@ TEST_F(SpdyNetworkTransactionTest, LoadLog) {
net::LoadLog::PHASE_END);
}
+// Since we buffer the IO from the stream to the renderer, this test verifies
+// that when we read out the maximum amount of data (e.g. we received 50 bytes
+// on the network, but issued a Read for only 5 of those bytes) that the data
+// flow still works correctly.
+TEST_F(SpdyNetworkTransactionTest, FullBuffer) {
+ MockWrite writes[] = {
+ MockWrite(true, reinterpret_cast<const char*>(kGetSyn),
+ arraysize(kGetSyn)),
+ };
+
+ static const unsigned char kCombinedDataFrames[] = {
+ 0x00, 0x00, 0x00, 0x01, // header
+ 0x00, 0x00, 0x00, 0x06, // length
+ 'g', 'o', 'o', 'd', 'b', 'y',
+ 0x00, 0x00, 0x00, 0x01, // header
+ 0x00, 0x00, 0x00, 0x06, // length
+ 'e', ' ', 'w', 'o', 'r', 'l',
+ };
+
+ static const unsigned char kLastFrame[] = {
+ 0x00, 0x00, 0x00, 0x01, // header
+ 0x01, 0x00, 0x00, 0x01, // FIN, length
+ 'd',
+ };
+
+ MockRead reads[] = {
+ MockRead(true, reinterpret_cast<const char*>(kGetSynReply),
+ arraysize(kGetSynReply)),
+ MockRead(true, ERR_IO_PENDING), // Force a pause
+ MockRead(true, reinterpret_cast<const char*>(kCombinedDataFrames),
+ arraysize(kCombinedDataFrames)),
+ MockRead(true, ERR_IO_PENDING), // Force a pause
+ MockRead(true, reinterpret_cast<const char*>(kLastFrame),
+ arraysize(kLastFrame)),
+ MockRead(true, 0, 0) // EOF
+ };
+
+ HttpRequestInfo request;
+ request.method = "GET";
+ request.url = GURL("http://www.google.com/");
+ request.load_flags = 0;
+ scoped_refptr<DelayedSocketData> data(
+ new DelayedSocketData(1, reads, arraysize(reads),
+ writes, arraysize(writes)));
+
+ // For this test, we can't use the TransactionHelper, because we are
+ // going to tightly control how the IOs fly.
+
+ TransactionHelperResult out;
+
+ // We disable SSL for this test.
+ SpdySession::SetSSLMode(false);
+
+ SessionDependencies session_deps;
+ scoped_ptr<SpdyNetworkTransaction> trans(
+ new SpdyNetworkTransaction(CreateSession(&session_deps)));
+
+ session_deps.socket_factory.AddSocketDataProvider(data);
+
+ TestCompletionCallback callback;
+
+ int rv = trans->Start(&request, &callback, NULL);
+ EXPECT_EQ(ERR_IO_PENDING, rv);
+
+ out.rv = callback.WaitForResult();
+ EXPECT_EQ(out.rv, OK);
+
+ const HttpResponseInfo* response = trans->GetResponseInfo();
+ EXPECT_TRUE(response->headers != NULL);
+ EXPECT_TRUE(response->was_fetched_via_spdy);
+ out.status_line = response->headers->GetStatusLine();
+ out.response_info = *response; // Make a copy so we can verify.
+
+ // Read Data
+ TestCompletionCallback read_callback;
+
+ std::string content;
+ do {
+ // Read small chunks at a time.
+ const int kSmallReadSize = 3;
+ scoped_refptr<net::IOBuffer> buf = new net::IOBuffer(kSmallReadSize);
+ rv = trans->Read(buf, kSmallReadSize, &read_callback);
+ if (rv == net::ERR_IO_PENDING) {
+ data->CompleteRead();
+ rv = read_callback.WaitForResult();
+ }
+ if (rv > 0) {
+ content.append(buf->data(), rv);
+ } else if (rv < 0) {
+ NOTREACHED();
+ }
+ } while (rv > 0);
+
+ out.response_data.swap(content);
+
+ // Verify that we consumed all test data.
+ EXPECT_TRUE(data->at_read_eof());
+ EXPECT_TRUE(data->at_write_eof());
+
+ EXPECT_EQ(OK, out.rv);
+ EXPECT_EQ("HTTP/1.1 200 OK", out.status_line);
+ EXPECT_EQ("goodbye world", out.response_data);
+}
+
} // namespace net
diff --git a/net/spdy/spdy_stream.cc b/net/spdy/spdy_stream.cc
index 9625040..23f3b47 100644
--- a/net/spdy/spdy_stream.cc
+++ b/net/spdy/spdy_stream.cc
@@ -5,6 +5,7 @@
#include "net/spdy/spdy_stream.h"
#include "base/logging.h"
+#include "base/message_loop.h"
#include "net/http/http_request_info.h"
#include "net/http/http_response_info.h"
#include "net/spdy/spdy_session.h"
@@ -31,7 +32,9 @@ SpdyStream::SpdyStream(SpdySession* session, spdy::SpdyStreamId stream_id,
load_log_(log),
send_bytes_(0),
recv_bytes_(0),
- histograms_recorded_(false) {}
+ histograms_recorded_(false),
+ buffered_read_callback_pending_(false),
+ more_read_data_pending_(false) {}
SpdyStream::~SpdyStream() {
DLOG(INFO) << "Deleting SpdyStream for stream " << stream_id_;
@@ -201,6 +204,8 @@ bool SpdyStream::OnDataReceived(const char* data, int length) {
LOG(INFO) << "SpdyStream: Data (" << length << " bytes) received for "
<< stream_id_;
+ CHECK(!response_complete_);
+
// If we don't have a response, then the SYN_REPLY did not come through.
// We cannot pass data up to the caller unless the reply headers have been
// received.
@@ -209,47 +214,36 @@ bool SpdyStream::OnDataReceived(const char* data, int length) {
return false;
}
- if (length > 0)
- recv_bytes_ += length;
- recv_last_byte_time_ = base::TimeTicks::Now();
-
// A zero-length read means that the stream is being closed.
if (!length) {
metrics_.StopStream();
download_finished_ = true;
+ response_complete_ = true;
+
+ // We need to complete any pending buffered read now.
+ DoBufferedReadCallback();
+
OnClose(net::OK);
return true;
}
// Track our bandwidth.
metrics_.RecordBytes(length);
+ recv_bytes_ += length;
+ recv_last_byte_time_ = base::TimeTicks::Now();
- if (length > 0) {
- // TODO(mbelshe): If read is pending, we should copy the data straight into
- // the read buffer here. For now, we'll queue it always.
- // TODO(mbelshe): We need to have some throttling on this. We shouldn't
- // buffer an infinite amount of data.
-
- IOBufferWithSize* io_buffer = new IOBufferWithSize(length);
- memcpy(io_buffer->data(), data, length);
-
- response_body_.push_back(io_buffer);
- }
+ // Save the received data.
+ IOBufferWithSize* io_buffer = new IOBufferWithSize(length);
+ memcpy(io_buffer->data(), data, length);
+ response_body_.push_back(io_buffer);
// Note that data may be received for a SpdyStream prior to the user calling
- // ReadResponseBody(), therefore user_callback_ may be NULL. This may often
+ // ReadResponseBody(), therefore user_buffer_ may be NULL. This may often
// happen for server initiated streams.
- if (user_callback_) {
- int rv;
- if (user_buffer_) {
- rv = ReadResponseBody(user_buffer_, user_buffer_len_, user_callback_);
- CHECK(rv != ERR_IO_PENDING);
- user_buffer_ = NULL;
- user_buffer_len_ = 0;
- } else {
- rv = OK;
- }
- DoCallback(rv);
+ if (user_buffer_) {
+ // Handing small chunks of data to the caller creates measurable overhead.
+ // We buffer data in short time-spans and send a single read notification.
+ ScheduleBufferedReadCallback();
}
return true;
@@ -342,6 +336,61 @@ int SpdyStream::DoLoop(int result) {
return result;
}
+void SpdyStream::ScheduleBufferedReadCallback() {
+ // If there is already a scheduled DoBufferedReadCallback, don't issue
+ // another one. Mark that we have received more data and return.
+ if (buffered_read_callback_pending_) {
+ more_read_data_pending_ = true;
+ return;
+ }
+
+ more_read_data_pending_ = false;
+ buffered_read_callback_pending_ = true;
+ const int kBufferTimeMs = 1;
+ MessageLoop::current()->PostDelayedTask(FROM_HERE, NewRunnableMethod(
+ this, &SpdyStream::DoBufferedReadCallback), kBufferTimeMs);
+}
+
+// Checks to see if we should wait for more buffered data before notifying
+// the caller. Returns true if we should wait, false otherwise.
+bool SpdyStream::ShouldWaitForMoreBufferedData() const {
+ // If the response is complete, there is no point in waiting.
+ if (response_complete_)
+ return false;
+
+ int bytes_buffered = 0;
+ std::list<scoped_refptr<IOBufferWithSize> >::const_iterator it;
+ for (it = response_body_.begin();
+ it != response_body_.end() && bytes_buffered < user_buffer_len_;
+ ++it)
+ bytes_buffered += (*it)->size();
+
+ return bytes_buffered < user_buffer_len_;
+}
+
+void SpdyStream::DoBufferedReadCallback() {
+ buffered_read_callback_pending_ = false;
+
+ // When more_read_data_pending_ is true, it means that more data has
+ // arrived since we started waiting. Wait a little longer and continue
+ // to buffer.
+ if (more_read_data_pending_ && ShouldWaitForMoreBufferedData()) {
+ ScheduleBufferedReadCallback();
+ return;
+ }
+
+ int rv = 0;
+ if (user_buffer_) {
+ rv = ReadResponseBody(user_buffer_, user_buffer_len_, user_callback_);
+ CHECK(rv != ERR_IO_PENDING);
+ user_buffer_ = NULL;
+ user_buffer_len_ = 0;
+ }
+
+ if (user_callback_)
+ DoCallback(rv);
+}
+
void SpdyStream::DoCallback(int rv) {
CHECK(rv != ERR_IO_PENDING);
CHECK(user_callback_);
diff --git a/net/spdy/spdy_stream.h b/net/spdy/spdy_stream.h
index 1d750c3..5445a5c 100644
--- a/net/spdy/spdy_stream.h
+++ b/net/spdy/spdy_stream.h
@@ -150,6 +150,10 @@ class SpdyStream : public base::RefCounted<SpdyStream> {
// Call the user callback.
void DoCallback(int rv);
+ void ScheduleBufferedReadCallback();
+ void DoBufferedReadCallback();
+ bool ShouldWaitForMoreBufferedData() const;
+
// The implementations of each state of the state machine.
int DoSendHeaders();
int DoSendHeadersComplete(int result);
@@ -203,6 +207,12 @@ class SpdyStream : public base::RefCounted<SpdyStream> {
int recv_bytes_;
bool histograms_recorded_;
+ // Is there a scheduled read callback pending.
+ bool buffered_read_callback_pending_;
+ // Has more data been received from the network during the wait for the
+ // scheduled read callback.
+ bool more_read_data_pending_;
+
DISALLOW_COPY_AND_ASSIGN(SpdyStream);
};