diff options
Diffstat (limited to 'net')
-rw-r--r-- | net/spdy/spdy_network_transaction_unittest.cc | 104 | ||||
-rw-r--r-- | net/spdy/spdy_stream.cc | 105 | ||||
-rw-r--r-- | net/spdy/spdy_stream.h | 10 |
3 files changed, 191 insertions, 28 deletions
diff --git a/net/spdy/spdy_network_transaction_unittest.cc b/net/spdy/spdy_network_transaction_unittest.cc index 31847ef..0d67e41 100644 --- a/net/spdy/spdy_network_transaction_unittest.cc +++ b/net/spdy/spdy_network_transaction_unittest.cc @@ -1087,4 +1087,108 @@ TEST_F(SpdyNetworkTransactionTest, LoadLog) { net::LoadLog::PHASE_END); } +// Since we buffer the IO from the stream to the renderer, this test verifies +// that when we read out the maximum amount of data (e.g. we received 50 bytes +// on the network, but issued a Read for only 5 of those bytes) that the data +// flow still works correctly. +TEST_F(SpdyNetworkTransactionTest, FullBuffer) { + MockWrite writes[] = { + MockWrite(true, reinterpret_cast<const char*>(kGetSyn), + arraysize(kGetSyn)), + }; + + static const unsigned char kCombinedDataFrames[] = { + 0x00, 0x00, 0x00, 0x01, // header + 0x00, 0x00, 0x00, 0x06, // length + 'g', 'o', 'o', 'd', 'b', 'y', + 0x00, 0x00, 0x00, 0x01, // header + 0x00, 0x00, 0x00, 0x06, // length + 'e', ' ', 'w', 'o', 'r', 'l', + }; + + static const unsigned char kLastFrame[] = { + 0x00, 0x00, 0x00, 0x01, // header + 0x01, 0x00, 0x00, 0x01, // FIN, length + 'd', + }; + + MockRead reads[] = { + MockRead(true, reinterpret_cast<const char*>(kGetSynReply), + arraysize(kGetSynReply)), + MockRead(true, ERR_IO_PENDING), // Force a pause + MockRead(true, reinterpret_cast<const char*>(kCombinedDataFrames), + arraysize(kCombinedDataFrames)), + MockRead(true, ERR_IO_PENDING), // Force a pause + MockRead(true, reinterpret_cast<const char*>(kLastFrame), + arraysize(kLastFrame)), + MockRead(true, 0, 0) // EOF + }; + + HttpRequestInfo request; + request.method = "GET"; + request.url = GURL("http://www.google.com/"); + request.load_flags = 0; + scoped_refptr<DelayedSocketData> data( + new DelayedSocketData(1, reads, arraysize(reads), + writes, arraysize(writes))); + + // For this test, we can't use the TransactionHelper, because we are + // going to tightly control how the IOs fly. + + TransactionHelperResult out; + + // We disable SSL for this test. + SpdySession::SetSSLMode(false); + + SessionDependencies session_deps; + scoped_ptr<SpdyNetworkTransaction> trans( + new SpdyNetworkTransaction(CreateSession(&session_deps))); + + session_deps.socket_factory.AddSocketDataProvider(data); + + TestCompletionCallback callback; + + int rv = trans->Start(&request, &callback, NULL); + EXPECT_EQ(ERR_IO_PENDING, rv); + + out.rv = callback.WaitForResult(); + EXPECT_EQ(out.rv, OK); + + const HttpResponseInfo* response = trans->GetResponseInfo(); + EXPECT_TRUE(response->headers != NULL); + EXPECT_TRUE(response->was_fetched_via_spdy); + out.status_line = response->headers->GetStatusLine(); + out.response_info = *response; // Make a copy so we can verify. + + // Read Data + TestCompletionCallback read_callback; + + std::string content; + do { + // Read small chunks at a time. + const int kSmallReadSize = 3; + scoped_refptr<net::IOBuffer> buf = new net::IOBuffer(kSmallReadSize); + rv = trans->Read(buf, kSmallReadSize, &read_callback); + if (rv == net::ERR_IO_PENDING) { + data->CompleteRead(); + rv = read_callback.WaitForResult(); + } + if (rv > 0) { + content.append(buf->data(), rv); + } else if (rv < 0) { + NOTREACHED(); + } + } while (rv > 0); + + out.response_data.swap(content); + + // Verify that we consumed all test data. + EXPECT_TRUE(data->at_read_eof()); + EXPECT_TRUE(data->at_write_eof()); + + EXPECT_EQ(OK, out.rv); + EXPECT_EQ("HTTP/1.1 200 OK", out.status_line); + EXPECT_EQ("goodbye world", out.response_data); +} + } // namespace net diff --git a/net/spdy/spdy_stream.cc b/net/spdy/spdy_stream.cc index 9625040..23f3b47 100644 --- a/net/spdy/spdy_stream.cc +++ b/net/spdy/spdy_stream.cc @@ -5,6 +5,7 @@ #include "net/spdy/spdy_stream.h" #include "base/logging.h" +#include "base/message_loop.h" #include "net/http/http_request_info.h" #include "net/http/http_response_info.h" #include "net/spdy/spdy_session.h" @@ -31,7 +32,9 @@ SpdyStream::SpdyStream(SpdySession* session, spdy::SpdyStreamId stream_id, load_log_(log), send_bytes_(0), recv_bytes_(0), - histograms_recorded_(false) {} + histograms_recorded_(false), + buffered_read_callback_pending_(false), + more_read_data_pending_(false) {} SpdyStream::~SpdyStream() { DLOG(INFO) << "Deleting SpdyStream for stream " << stream_id_; @@ -201,6 +204,8 @@ bool SpdyStream::OnDataReceived(const char* data, int length) { LOG(INFO) << "SpdyStream: Data (" << length << " bytes) received for " << stream_id_; + CHECK(!response_complete_); + // If we don't have a response, then the SYN_REPLY did not come through. // We cannot pass data up to the caller unless the reply headers have been // received. @@ -209,47 +214,36 @@ bool SpdyStream::OnDataReceived(const char* data, int length) { return false; } - if (length > 0) - recv_bytes_ += length; - recv_last_byte_time_ = base::TimeTicks::Now(); - // A zero-length read means that the stream is being closed. if (!length) { metrics_.StopStream(); download_finished_ = true; + response_complete_ = true; + + // We need to complete any pending buffered read now. + DoBufferedReadCallback(); + OnClose(net::OK); return true; } // Track our bandwidth. metrics_.RecordBytes(length); + recv_bytes_ += length; + recv_last_byte_time_ = base::TimeTicks::Now(); - if (length > 0) { - // TODO(mbelshe): If read is pending, we should copy the data straight into - // the read buffer here. For now, we'll queue it always. - // TODO(mbelshe): We need to have some throttling on this. We shouldn't - // buffer an infinite amount of data. - - IOBufferWithSize* io_buffer = new IOBufferWithSize(length); - memcpy(io_buffer->data(), data, length); - - response_body_.push_back(io_buffer); - } + // Save the received data. + IOBufferWithSize* io_buffer = new IOBufferWithSize(length); + memcpy(io_buffer->data(), data, length); + response_body_.push_back(io_buffer); // Note that data may be received for a SpdyStream prior to the user calling - // ReadResponseBody(), therefore user_callback_ may be NULL. This may often + // ReadResponseBody(), therefore user_buffer_ may be NULL. This may often // happen for server initiated streams. - if (user_callback_) { - int rv; - if (user_buffer_) { - rv = ReadResponseBody(user_buffer_, user_buffer_len_, user_callback_); - CHECK(rv != ERR_IO_PENDING); - user_buffer_ = NULL; - user_buffer_len_ = 0; - } else { - rv = OK; - } - DoCallback(rv); + if (user_buffer_) { + // Handing small chunks of data to the caller creates measurable overhead. + // We buffer data in short time-spans and send a single read notification. + ScheduleBufferedReadCallback(); } return true; @@ -342,6 +336,61 @@ int SpdyStream::DoLoop(int result) { return result; } +void SpdyStream::ScheduleBufferedReadCallback() { + // If there is already a scheduled DoBufferedReadCallback, don't issue + // another one. Mark that we have received more data and return. + if (buffered_read_callback_pending_) { + more_read_data_pending_ = true; + return; + } + + more_read_data_pending_ = false; + buffered_read_callback_pending_ = true; + const int kBufferTimeMs = 1; + MessageLoop::current()->PostDelayedTask(FROM_HERE, NewRunnableMethod( + this, &SpdyStream::DoBufferedReadCallback), kBufferTimeMs); +} + +// Checks to see if we should wait for more buffered data before notifying +// the caller. Returns true if we should wait, false otherwise. +bool SpdyStream::ShouldWaitForMoreBufferedData() const { + // If the response is complete, there is no point in waiting. + if (response_complete_) + return false; + + int bytes_buffered = 0; + std::list<scoped_refptr<IOBufferWithSize> >::const_iterator it; + for (it = response_body_.begin(); + it != response_body_.end() && bytes_buffered < user_buffer_len_; + ++it) + bytes_buffered += (*it)->size(); + + return bytes_buffered < user_buffer_len_; +} + +void SpdyStream::DoBufferedReadCallback() { + buffered_read_callback_pending_ = false; + + // When more_read_data_pending_ is true, it means that more data has + // arrived since we started waiting. Wait a little longer and continue + // to buffer. + if (more_read_data_pending_ && ShouldWaitForMoreBufferedData()) { + ScheduleBufferedReadCallback(); + return; + } + + int rv = 0; + if (user_buffer_) { + rv = ReadResponseBody(user_buffer_, user_buffer_len_, user_callback_); + CHECK(rv != ERR_IO_PENDING); + user_buffer_ = NULL; + user_buffer_len_ = 0; + } + + if (user_callback_) + DoCallback(rv); +} + void SpdyStream::DoCallback(int rv) { CHECK(rv != ERR_IO_PENDING); CHECK(user_callback_); diff --git a/net/spdy/spdy_stream.h b/net/spdy/spdy_stream.h index 1d750c3..5445a5c 100644 --- a/net/spdy/spdy_stream.h +++ b/net/spdy/spdy_stream.h @@ -150,6 +150,10 @@ class SpdyStream : public base::RefCounted<SpdyStream> { // Call the user callback. void DoCallback(int rv); + void ScheduleBufferedReadCallback(); + void DoBufferedReadCallback(); + bool ShouldWaitForMoreBufferedData() const; + // The implementations of each state of the state machine. int DoSendHeaders(); int DoSendHeadersComplete(int result); @@ -203,6 +207,12 @@ class SpdyStream : public base::RefCounted<SpdyStream> { int recv_bytes_; bool histograms_recorded_; + // Is there a scheduled read callback pending. + bool buffered_read_callback_pending_; + // Has more data been received from the network during the wait for the + // scheduled read callback. + bool more_read_data_pending_; + DISALLOW_COPY_AND_ASSIGN(SpdyStream); }; |