diff options
author | akalin@chromium.org <akalin@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2013-04-17 09:59:38 +0000 |
---|---|---|
committer | akalin@chromium.org <akalin@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2013-04-17 09:59:38 +0000 |
commit | 47fb09645a42f399af3e1380db0dc911a9cd1032 (patch) | |
tree | c54eb3a438f563b4d097c040d8b289dae8db3601 | |
parent | 3203953f3166c4a331e60a80bcc65b4201105457 (diff) | |
download | chromium_src-47fb09645a42f399af3e1380db0dc911a9cd1032.zip chromium_src-47fb09645a42f399af3e1380db0dc911a9cd1032.tar.gz chromium_src-47fb09645a42f399af3e1380db0dc911a9cd1032.tar.bz2 |
[SPDY] Replace SpdyIOBuffer with new SpdyBuffer class
Use SpdyBuffer for both SPDY reads and writes. A future
CL will add hooks to SpdyBuffer so that we keep track of
flow control windows properly.
Replace SpdyFrameProducer with SpdyBufferProducer.
Also introduce new SpdyReadQueue class for delegates
of SpdyStream to use.
BUG=176592
Review URL: https://codereview.chromium.org/13990005
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@194560 0039d316-1c4b-4281-b951-d872f2087c98
34 files changed, 775 insertions, 422 deletions
diff --git a/net/net.gyp b/net/net.gyp index bb7e584..3ba59da 100644 --- a/net/net.gyp +++ b/net/net.gyp @@ -859,14 +859,16 @@ 'spdy/buffered_spdy_framer.cc', 'spdy/buffered_spdy_framer.h', 'spdy/spdy_bitmasks.h', + 'spdy/spdy_buffer.cc', + 'spdy/spdy_buffer.h', + 'spdy/spdy_buffer_producer.cc', + 'spdy/spdy_buffer_producer.h', 'spdy/spdy_credential_builder.cc', 'spdy/spdy_credential_builder.h', 'spdy/spdy_credential_state.cc', 'spdy/spdy_credential_state.h', 'spdy/spdy_frame_builder.cc', 'spdy/spdy_frame_builder.h', - 'spdy/spdy_frame_producer.cc', - 'spdy/spdy_frame_producer.h', 'spdy/spdy_frame_reader.cc', 'spdy/spdy_frame_reader.h', 'spdy/spdy_framer.cc', @@ -877,13 +879,13 @@ 'spdy/spdy_http_stream.h', 'spdy/spdy_http_utils.cc', 'spdy/spdy_http_utils.h', - 'spdy/spdy_io_buffer.cc', - 'spdy/spdy_io_buffer.h', 'spdy/spdy_priority_forest.h', 'spdy/spdy_protocol.cc', 'spdy/spdy_protocol.h', 'spdy/spdy_proxy_client_socket.cc', 'spdy/spdy_proxy_client_socket.h', + 'spdy/spdy_read_queue.cc', + 'spdy/spdy_read_queue.h', 'spdy/spdy_session.cc', 'spdy/spdy_session.h', 'spdy/spdy_session_pool.cc', @@ -1679,6 +1681,7 @@ 'spdy/buffered_spdy_framer_spdy3_unittest.cc', 'spdy/buffered_spdy_framer_spdy2_unittest.cc', 'spdy/spdy_credential_builder_unittest.cc', + 'spdy/spdy_buffer_unittest.cc', 'spdy/spdy_credential_state_unittest.cc', 'spdy/spdy_frame_builder_test.cc', 'spdy/spdy_frame_reader_test.cc', @@ -1693,6 +1696,7 @@ 'spdy/spdy_protocol_test.cc', 'spdy/spdy_proxy_client_socket_spdy3_unittest.cc', 'spdy/spdy_proxy_client_socket_spdy2_unittest.cc', + 'spdy/spdy_read_queue_unittest.cc', 'spdy/spdy_session_spdy3_unittest.cc', 'spdy/spdy_session_spdy2_unittest.cc', 'spdy/spdy_session_test_util.cc', diff --git a/net/spdy/spdy_buffer.cc b/net/spdy/spdy_buffer.cc new file mode 100644 index 0000000..3257bef --- /dev/null +++ b/net/spdy/spdy_buffer.cc @@ -0,0 +1,61 @@ +// Copyright (c) 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "net/spdy/spdy_buffer.h" + +#include <cstring> + +#include "base/logging.h" +#include "net/base/io_buffer.h" +#include "net/spdy/spdy_protocol.h" + +namespace net { + +namespace { + +// Makes a SpdyFrame with |size| bytes of data copied from +// |data|. |data| must be non-NULL and |size| must be positive. +scoped_ptr<SpdyFrame> MakeSpdyFrame(const char* data, size_t size) { + DCHECK(data); + DCHECK_GT(size, 0u); + scoped_array<char> frame_data(new char[size]); + std::memcpy(frame_data.get(), data, size); + scoped_ptr<SpdyFrame> frame( + new SpdyFrame(frame_data.release(), size, true /* owns_buffer */)); + return frame.Pass(); +} + +} // namespace + +SpdyBuffer::SpdyBuffer(scoped_ptr<SpdyFrame> frame) + : frame_(frame.Pass()), + offset_(0) {} + +// The given data may not be strictly a SPDY frame; we (ab)use +// |frame_| just as a container. +SpdyBuffer::SpdyBuffer(const char* data, size_t size) : + frame_(MakeSpdyFrame(data, size)), + offset_(0) {} + +SpdyBuffer::~SpdyBuffer() {} + +const char* SpdyBuffer::GetRemainingData() const { + return frame_->data() + offset_; +} + +size_t SpdyBuffer::GetRemainingSize() const { + return frame_->size() - offset_; +} + +void SpdyBuffer::Consume(size_t consume_size) { + DCHECK_GE(consume_size, 1u); + DCHECK_LE(consume_size, GetRemainingSize()); + offset_ += consume_size; +}; + +IOBuffer* SpdyBuffer::GetIOBufferForRemainingData() { + return new WrappedIOBuffer(GetRemainingData()); +} + +} // namespace net diff --git a/net/spdy/spdy_buffer.h b/net/spdy/spdy_buffer.h new file mode 100644 index 0000000..c4c5b8f --- /dev/null +++ b/net/spdy/spdy_buffer.h @@ -0,0 +1,66 @@ +// Copyright (c) 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef NET_SPDY_SPDY_BUFFER_H_ +#define NET_SPDY_SPDY_BUFFER_H_ + +#include <cstddef> + +#include "base/basictypes.h" +#include "base/memory/scoped_ptr.h" +#include "net/base/net_export.h" + +namespace net { + +class IOBuffer; +class SpdyFrame; + +// SpdyBuffer is a class to hold data read from or to be written to a +// SPDY connection. It is similar to a DrainableIOBuffer but is not +// ref-counted and will include a way to get notified when Consume() +// is called. +// +// NOTE(akalin): This explicitly does not inherit from IOBuffer to +// avoid the needless ref-counting and to avoid working around the +// fact that IOBuffer member functions are not virtual. +class NET_EXPORT_PRIVATE SpdyBuffer { + public: + // Construct with the data in the given frame. Assumes that data is + // owned by |frame| or outlives it. + explicit SpdyBuffer(scoped_ptr<SpdyFrame> frame); + + // Construct with a copy of the given raw data. |data| must be + // non-NULL and |size| must be non-zero. + SpdyBuffer(const char* data, size_t size); + + ~SpdyBuffer(); + + // Returns the remaining (unconsumed) data. + const char* GetRemainingData() const; + + // Returns the number of remaining (unconsumed) bytes. + size_t GetRemainingSize() const; + + // Consume the given number of bytes, which must be positive but not + // greater than GetRemainingSize(). + // + // TODO(akalin): Add a way to get notified when Consume() is called. + void Consume(size_t consume_size); + + // Returns an IOBuffer pointing to the data starting at + // GetRemainingData(). Use with care; the returned IOBuffer must not + // be used past the lifetime of this object, and it is not updated + // when Consume() is called. + IOBuffer* GetIOBufferForRemainingData(); + + private: + const scoped_ptr<SpdyFrame> frame_; + size_t offset_; + + DISALLOW_COPY_AND_ASSIGN(SpdyBuffer); +}; + +} // namespace net + +#endif // NET_SPDY_SPDY_BUFFER_H_ diff --git a/net/spdy/spdy_buffer_producer.cc b/net/spdy/spdy_buffer_producer.cc new file mode 100644 index 0000000..3a9598c --- /dev/null +++ b/net/spdy/spdy_buffer_producer.cc @@ -0,0 +1,27 @@ +// Copyright (c) 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "net/spdy/spdy_buffer_producer.h" + +#include "base/logging.h" +#include "net/spdy/spdy_buffer.h" +#include "net/spdy/spdy_protocol.h" + +namespace net { + +SpdyBufferProducer::SpdyBufferProducer() {} + +SpdyBufferProducer::~SpdyBufferProducer() {} + +SimpleBufferProducer::SimpleBufferProducer(scoped_ptr<SpdyBuffer> buffer) + : buffer_(buffer.Pass()) {} + +SimpleBufferProducer::~SimpleBufferProducer() {} + +scoped_ptr<SpdyBuffer> SimpleBufferProducer::ProduceBuffer() { + DCHECK(buffer_); + return buffer_.Pass(); +} + +} // namespace net diff --git a/net/spdy/spdy_buffer_producer.h b/net/spdy/spdy_buffer_producer.h new file mode 100644 index 0000000..fe82b1a --- /dev/null +++ b/net/spdy/spdy_buffer_producer.h @@ -0,0 +1,50 @@ +// Copyright (c) 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef NET_SPDY_SPDY_BUFFER_PRODUCER_H_ +#define NET_SPDY_SPDY_BUFFER_PRODUCER_H_ + +#include "base/basictypes.h" +#include "base/compiler_specific.h" +#include "base/memory/scoped_ptr.h" +#include "net/base/net_export.h" + +namespace net { + +class SpdyBuffer; + +// An object which provides a SpdyBuffer for writing. We pass these +// around instead of SpdyBuffers since some buffers have to be +// generated "just in time". +class NET_EXPORT_PRIVATE SpdyBufferProducer { + public: + SpdyBufferProducer(); + + // Produces the buffer to be written. Will be called at most once. + virtual scoped_ptr<SpdyBuffer> ProduceBuffer() = 0; + + virtual ~SpdyBufferProducer(); + + private: + DISALLOW_COPY_AND_ASSIGN(SpdyBufferProducer); +}; + +// A simple wrapper around a single SpdyBuffer. +class NET_EXPORT_PRIVATE SimpleBufferProducer : public SpdyBufferProducer { + public: + explicit SimpleBufferProducer(scoped_ptr<SpdyBuffer> buffer); + + virtual ~SimpleBufferProducer(); + + virtual scoped_ptr<SpdyBuffer> ProduceBuffer() OVERRIDE; + + private: + scoped_ptr<SpdyBuffer> buffer_; + + DISALLOW_COPY_AND_ASSIGN(SimpleBufferProducer); +}; + +} // namespace net + +#endif // NET_SPDY_SPDY_BUFFER_PRODUCER_H_ diff --git a/net/spdy/spdy_buffer_unittest.cc b/net/spdy/spdy_buffer_unittest.cc new file mode 100644 index 0000000..fbd7705 --- /dev/null +++ b/net/spdy/spdy_buffer_unittest.cc @@ -0,0 +1,88 @@ +// Copyright (c) 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "net/spdy/spdy_buffer.h" + +#include <cstddef> +#include <string> + +#include "base/basictypes.h" +#include "base/memory/ref_counted.h" +#include "base/memory/scoped_ptr.h" +#include "net/base/io_buffer.h" +#include "net/spdy/spdy_protocol.h" +#include "testing/gtest/include/gtest/gtest.h" + +namespace net { + +namespace { + +const char kData[] = "hello!\0hi."; +const size_t kDataSize = arraysize(kData); + +class SpdyBufferTest : public ::testing::Test {}; + +// Make a string from the data remaining in |buffer|. +std::string BufferToString(const SpdyBuffer& buffer) { + return std::string(buffer.GetRemainingData(), buffer.GetRemainingSize()); +} + +// Construct a SpdyBuffer from a SpdyFrame and make sure its data +// points to the frame's underlying data. +TEST_F(SpdyBufferTest, FrameConstructor) { + SpdyBuffer buffer( + scoped_ptr<SpdyFrame>( + new SpdyFrame(const_cast<char*>(kData), kDataSize, + false /* owns_buffer */))); + + EXPECT_EQ(kData, buffer.GetRemainingData()); + EXPECT_EQ(kDataSize, buffer.GetRemainingSize()); +} + +// Construct a SpdyBuffer from a const char*/size_t pair and make sure +// it makes a copy of the data. +TEST_F(SpdyBufferTest, DataConstructor) { + std::string data(kData, kDataSize); + SpdyBuffer buffer(data.data(), data.size()); + // This mutation shouldn't affect |buffer|'s data. + data[0] = 'H'; + + EXPECT_NE(kData, buffer.GetRemainingData()); + EXPECT_EQ(kDataSize, buffer.GetRemainingSize()); + EXPECT_EQ(std::string(kData, kDataSize), BufferToString(buffer)); +} + +// Construct a SpdyBuffer and call Consume() on it, which should +// update the remaining data pointer and size appropriately. +TEST_F(SpdyBufferTest, Consume) { + SpdyBuffer buffer(kData, kDataSize); + + EXPECT_EQ(std::string(kData, kDataSize), BufferToString(buffer)); + + buffer.Consume(5); + EXPECT_EQ(std::string(kData + 5, kDataSize - 5), BufferToString(buffer)); + + buffer.Consume(kDataSize - 5); + EXPECT_EQ(0u, buffer.GetRemainingSize()); +} + +// Make sure the IOBuffer returned by GetIOBufferForRemainingData() +// points to the buffer's remaining data and isn't updated by +// Consume(). +TEST_F(SpdyBufferTest, GetIOBufferForRemainingData) { + SpdyBuffer buffer(kData, kDataSize); + + buffer.Consume(5); + scoped_refptr<IOBuffer> io_buffer = buffer.GetIOBufferForRemainingData(); + size_t io_buffer_size = buffer.GetRemainingSize(); + const std::string expectedData(kData + 5, kDataSize - 5); + EXPECT_EQ(expectedData, std::string(io_buffer->data(), io_buffer_size)); + + buffer.Consume(kDataSize - 5); + EXPECT_EQ(expectedData, std::string(io_buffer->data(), io_buffer_size)); +} + +} // namespace + +} // namespace net diff --git a/net/spdy/spdy_frame_producer.cc b/net/spdy/spdy_frame_producer.cc deleted file mode 100644 index dc16744..0000000 --- a/net/spdy/spdy_frame_producer.cc +++ /dev/null @@ -1,26 +0,0 @@ -// Copyright (c) 2013 The Chromium Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. - -#include "net/spdy/spdy_frame_producer.h" - -#include "base/logging.h" -#include "net/spdy/spdy_protocol.h" - -namespace net { - -SpdyFrameProducer::SpdyFrameProducer() {} - -SpdyFrameProducer::~SpdyFrameProducer() {} - -SimpleFrameProducer::SimpleFrameProducer(scoped_ptr<SpdyFrame> frame) - : frame_(frame.Pass()) {} - -SimpleFrameProducer::~SimpleFrameProducer() {} - -scoped_ptr<SpdyFrame> SimpleFrameProducer::ProduceFrame() { - DCHECK(frame_); - return frame_.Pass(); -} - -} // namespace net diff --git a/net/spdy/spdy_frame_producer.h b/net/spdy/spdy_frame_producer.h deleted file mode 100644 index 9db20ce..0000000 --- a/net/spdy/spdy_frame_producer.h +++ /dev/null @@ -1,50 +0,0 @@ -// Copyright (c) 2013 The Chromium Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. - -#ifndef NET_SPDY_SPDY_FRAME_PRODUCER_H_ -#define NET_SPDY_SPDY_FRAME_PRODUCER_H_ - -#include "base/basictypes.h" -#include "base/compiler_specific.h" -#include "base/memory/scoped_ptr.h" -#include "net/base/net_export.h" - -namespace net { - -class SpdyFrame; - -// An object which provides a SpdyFrame for writing. We pass these -// around instead of SpdyFrames since some frames have to be generated -// "just in time". -class NET_EXPORT_PRIVATE SpdyFrameProducer { - public: - SpdyFrameProducer(); - - // Produces the frame to be written. Will be called at most once. - virtual scoped_ptr<SpdyFrame> ProduceFrame() = 0; - - virtual ~SpdyFrameProducer(); - - private: - DISALLOW_COPY_AND_ASSIGN(SpdyFrameProducer); -}; - -// A simple wrapper around a single SpdyFrame. -class NET_EXPORT_PRIVATE SimpleFrameProducer : public SpdyFrameProducer { - public: - explicit SimpleFrameProducer(scoped_ptr<SpdyFrame> frame); - - virtual ~SimpleFrameProducer(); - - virtual scoped_ptr<SpdyFrame> ProduceFrame() OVERRIDE; - - private: - scoped_ptr<SpdyFrame> frame_; - - DISALLOW_COPY_AND_ASSIGN(SimpleFrameProducer); -}; - -} // namespace net - -#endif // NET_SPDY_SPDY_FRAME_PRODUCER_H_ diff --git a/net/spdy/spdy_http_stream.cc b/net/spdy/spdy_http_stream.cc index 4eb82bf..9053bc3 100644 --- a/net/spdy/spdy_http_stream.cc +++ b/net/spdy/spdy_http_stream.cc @@ -135,28 +135,11 @@ int SpdyHttpStream::ReadResponseBody( CHECK(!callback.is_null()); // If we have data buffered, complete the IO immediately. - if (!response_body_.empty()) { - int bytes_read = 0; - while (!response_body_.empty() && buf_len > 0) { - scoped_refptr<IOBufferWithSize> data = response_body_.front(); - const int bytes_to_copy = std::min(buf_len, data->size()); - memcpy(&(buf->data()[bytes_read]), data->data(), bytes_to_copy); - buf_len -= bytes_to_copy; - if (bytes_to_copy == data->size()) { - response_body_.pop_front(); - } else { - const int bytes_remaining = data->size() - bytes_to_copy; - IOBufferWithSize* new_buffer = new IOBufferWithSize(bytes_remaining); - memcpy(new_buffer->data(), &(data->data()[bytes_to_copy]), - bytes_remaining); - response_body_.pop_front(); - response_body_.push_front(make_scoped_refptr(new_buffer)); - } - bytes_read += bytes_to_copy; - } + if (!response_body_queue_.IsEmpty()) { + size_t bytes_consumed = response_body_queue_.Dequeue(buf->data(), buf_len); if (stream_) - stream_->IncreaseRecvWindowSize(bytes_read); - return bytes_read; + stream_->IncreaseRecvWindowSize(bytes_consumed); + return bytes_consumed; } else if (stream_closed_) { return closed_stream_status_; } @@ -434,7 +417,7 @@ void SpdyHttpStream::OnHeadersSent() { NOTREACHED(); } -int SpdyHttpStream::OnDataReceived(const char* data, int length) { +int SpdyHttpStream::OnDataReceived(scoped_ptr<SpdyBuffer> buffer) { // SpdyStream won't call us with data if the header block didn't contain a // valid set of headers. So we don't expect to not have headers received // here. @@ -446,11 +429,8 @@ int SpdyHttpStream::OnDataReceived(const char* data, int length) { // happen for server initiated streams. DCHECK(stream_.get()); DCHECK(!stream_->closed() || stream_->pushed()); - if (length > 0) { - // Save the received data. - IOBufferWithSize* io_buffer = new IOBufferWithSize(length); - memcpy(io_buffer->data(), data, length); - response_body_.push_back(make_scoped_refptr(io_buffer)); + if (buffer) { + response_body_queue_.Enqueue(buffer.Pass()); if (user_buffer_) { // Handing small chunks of data to the caller creates measurable overhead. @@ -509,14 +489,9 @@ bool SpdyHttpStream::ShouldWaitForMoreBufferedData() const { if (stream_closed_) return false; - int bytes_buffered = 0; - std::list<scoped_refptr<IOBufferWithSize> >::const_iterator it; - for (it = response_body_.begin(); - it != response_body_.end() && bytes_buffered < user_buffer_len_; - ++it) - bytes_buffered += (*it)->size(); - - return bytes_buffered < user_buffer_len_; + DCHECK_GT(user_buffer_len_, 0); + return response_body_queue_.GetTotalSize() < + static_cast<size_t>(user_buffer_len_); } bool SpdyHttpStream::DoBufferedReadCallback() { diff --git a/net/spdy/spdy_http_stream.h b/net/spdy/spdy_http_stream.h index 226d01c..2af82f0 100644 --- a/net/spdy/spdy_http_stream.h +++ b/net/spdy/spdy_http_stream.h @@ -13,6 +13,7 @@ #include "net/base/completion_callback.h" #include "net/base/net_log.h" #include "net/http/http_stream.h" +#include "net/spdy/spdy_read_queue.h" #include "net/spdy/spdy_stream.h" namespace net { @@ -89,7 +90,7 @@ class NET_EXPORT_PRIVATE SpdyHttpStream : public SpdyStream::Delegate, base::Time response_time, int status) OVERRIDE; virtual void OnHeadersSent() OVERRIDE; - virtual int OnDataReceived(const char* buffer, int bytes) OVERRIDE; + virtual int OnDataReceived(scoped_ptr<SpdyBuffer> buffer) OVERRIDE; virtual void OnDataSent(size_t bytes_sent) OVERRIDE; virtual void OnClose(int status) OVERRIDE; @@ -139,8 +140,7 @@ class NET_EXPORT_PRIVATE SpdyHttpStream : public SpdyStream::Delegate, bool response_headers_received_; // Indicates waiting for more HEADERS. // We buffer the response body as it arrives asynchronously from the stream. - // TODO(mbelshe): is this infinite buffering? - std::list<scoped_refptr<IOBufferWithSize> > response_body_; + SpdyReadQueue response_body_queue_; CompletionCallback callback_; diff --git a/net/spdy/spdy_io_buffer.cc b/net/spdy/spdy_io_buffer.cc deleted file mode 100644 index 441b033..0000000 --- a/net/spdy/spdy_io_buffer.cc +++ /dev/null @@ -1,38 +0,0 @@ -// Copyright (c) 2012 The Chromium Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. - -#include "net/spdy/spdy_io_buffer.h" -#include "net/spdy/spdy_stream.h" - -namespace net { - -SpdyIOBuffer::SpdyIOBuffer() {} - -SpdyIOBuffer::SpdyIOBuffer(IOBuffer* buffer, int size, SpdyStream* stream) - : buffer_(new DrainableIOBuffer(buffer, size)), stream_(stream) {} - -SpdyIOBuffer::SpdyIOBuffer(const SpdyIOBuffer& rhs) { - buffer_ = rhs.buffer_; - stream_ = rhs.stream_; -} - -SpdyIOBuffer::~SpdyIOBuffer() {} - -SpdyIOBuffer& SpdyIOBuffer::operator=(const SpdyIOBuffer& rhs) { - buffer_ = rhs.buffer_; - stream_ = rhs.stream_; - return *this; -} - -void SpdyIOBuffer::Swap(SpdyIOBuffer* other) { - buffer_.swap(other->buffer_); - stream_.swap(other->stream_); -} - -void SpdyIOBuffer::Release() { - buffer_ = NULL; - stream_ = NULL; -} - -} // namespace net diff --git a/net/spdy/spdy_io_buffer.h b/net/spdy/spdy_io_buffer.h deleted file mode 100644 index ed95e70..0000000 --- a/net/spdy/spdy_io_buffer.h +++ /dev/null @@ -1,56 +0,0 @@ -// Copyright (c) 2012 The Chromium Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. - -#ifndef NET_SPDY_SPDY_IO_BUFFER_H_ -#define NET_SPDY_SPDY_IO_BUFFER_H_ - -#include "base/memory/ref_counted.h" -#include "net/base/io_buffer.h" -#include "net/base/net_export.h" - -namespace net { - -class SpdyStream; - -// A class for managing SPDY write buffers. These write buffers need -// to track the SpdyStream which they are associated with so that the -// session can activate the stream lazily and also notify the stream -// on completion of the write. -class NET_EXPORT_PRIVATE SpdyIOBuffer { - public: - SpdyIOBuffer(); - - // Constructor - // |buffer| is the actual data buffer. - // |size| is the size of the data buffer. - // |stream| is a pointer to the stream which is managing this buffer - // (can be NULL if the write is for the session itself). - SpdyIOBuffer(IOBuffer* buffer, int size, SpdyStream* stream); - - // Declare this instead of using the default so that we avoid needing to - // include spdy_stream.h. - SpdyIOBuffer(const SpdyIOBuffer& rhs); - - ~SpdyIOBuffer(); - - // Declare this instead of using the default so that we avoid needing to - // include spdy_stream.h. - SpdyIOBuffer& operator=(const SpdyIOBuffer& rhs); - - void Swap(SpdyIOBuffer* other); - - void Release(); - - // Accessors. - DrainableIOBuffer* buffer() const { return buffer_; } - const scoped_refptr<SpdyStream>& stream() const { return stream_; } - - private: - scoped_refptr<DrainableIOBuffer> buffer_; - scoped_refptr<SpdyStream> stream_; -}; - -} // namespace net - -#endif // NET_SPDY_SPDY_IO_BUFFER_H_ diff --git a/net/spdy/spdy_proxy_client_socket.cc b/net/spdy/spdy_proxy_client_socket.cc index 7978d60..c09a5bd 100644 --- a/net/spdy/spdy_proxy_client_socket.cc +++ b/net/spdy/spdy_proxy_client_socket.cc @@ -39,7 +39,7 @@ SpdyProxyClientSocket::SpdyProxyClientSocket( GURL("https://" + proxy_server.ToString()), auth_cache, auth_handler_factory)), - user_buffer_(NULL), + user_buffer_len_(0), write_buffer_len_(0), write_bytes_outstanding_(0), ALLOW_THIS_IN_INITIALIZER_LIST(weak_factory_(this)), @@ -127,8 +127,9 @@ int SpdyProxyClientSocket::Connect(const CompletionCallback& callback) { } void SpdyProxyClientSocket::Disconnect() { - read_buffer_.clear(); + read_buffer_queue_.Clear(); user_buffer_ = NULL; + user_buffer_len_ = 0; read_callback_.Reset(); write_buffer_len_ = 0; @@ -150,7 +151,8 @@ bool SpdyProxyClientSocket::IsConnected() const { } bool SpdyProxyClientSocket::IsConnectedAndIdle() const { - return IsConnected() && read_buffer_.empty() && spdy_stream_->is_idle(); + return IsConnected() && read_buffer_queue_.IsEmpty() && + spdy_stream_->is_idle(); } const BoundNetLog& SpdyProxyClientSocket::NetLog() const { @@ -196,15 +198,16 @@ int SpdyProxyClientSocket::Read(IOBuffer* buf, int buf_len, if (next_state_ == STATE_DISCONNECTED) return ERR_SOCKET_NOT_CONNECTED; - if (next_state_ == STATE_CLOSED && read_buffer_.empty()) { + if (next_state_ == STATE_CLOSED && read_buffer_queue_.IsEmpty()) { return 0; } DCHECK(next_state_ == STATE_OPEN || next_state_ == STATE_CLOSED); DCHECK(buf); - user_buffer_ = new DrainableIOBuffer(buf, buf_len); - int result = PopulateUserReadBuffer(); + size_t result = PopulateUserReadBuffer(buf->data(), buf_len); if (result == 0) { + user_buffer_ = buf; + user_buffer_len_ = static_cast<size_t>(buf_len); DCHECK(!callback.is_null()); read_callback_ = callback; return ERR_IO_PENDING; @@ -213,30 +216,13 @@ int SpdyProxyClientSocket::Read(IOBuffer* buf, int buf_len, return result; } -int SpdyProxyClientSocket::PopulateUserReadBuffer() { - if (!user_buffer_) - return ERR_IO_PENDING; +size_t SpdyProxyClientSocket::PopulateUserReadBuffer(char* data, size_t len) { + size_t bytes_consumed = read_buffer_queue_.Dequeue(data, len); - int bytes_read = 0; - while (!read_buffer_.empty() && user_buffer_->BytesRemaining() > 0) { - scoped_refptr<DrainableIOBuffer> data = read_buffer_.front(); - const int bytes_to_copy = std::min(user_buffer_->BytesRemaining(), - data->BytesRemaining()); - memcpy(user_buffer_->data(), data->data(), bytes_to_copy); - user_buffer_->DidConsume(bytes_to_copy); - bytes_read += bytes_to_copy; - if (data->BytesRemaining() == bytes_to_copy) { - // Consumed all data from this buffer - read_buffer_.pop_front(); - } else { - data->DidConsume(bytes_to_copy); - } - } + if (bytes_consumed > 0 && spdy_stream_) + spdy_stream_->IncreaseRecvWindowSize(bytes_consumed); - if (bytes_read > 0 && spdy_stream_) - spdy_stream_->IncreaseRecvWindowSize(bytes_read); - - return user_buffer_->BytesConsumed(); + return bytes_consumed; } int SpdyProxyClientSocket::Write(IOBuffer* buf, int buf_len, @@ -526,23 +512,23 @@ void SpdyProxyClientSocket::OnHeadersSent() { NOTREACHED(); } -// Called when data is received. -int SpdyProxyClientSocket::OnDataReceived(const char* data, int length) { - net_log_.AddByteTransferEvent(NetLog::TYPE_SOCKET_BYTES_RECEIVED, - length, data); - if (length > 0) { - // Save the received data. - scoped_refptr<IOBuffer> io_buffer(new IOBuffer(length)); - memcpy(io_buffer->data(), data, length); - read_buffer_.push_back( - make_scoped_refptr(new DrainableIOBuffer(io_buffer, length))); +// Called when data is received or on EOF (if |buffer| is NULL). +int SpdyProxyClientSocket::OnDataReceived(scoped_ptr<SpdyBuffer> buffer) { + if (buffer) { + net_log_.AddByteTransferEvent(NetLog::TYPE_SOCKET_BYTES_RECEIVED, + buffer->GetRemainingSize(), + buffer->GetRemainingData()); + read_buffer_queue_.Enqueue(buffer.Pass()); + } else { + net_log_.AddByteTransferEvent(NetLog::TYPE_SOCKET_BYTES_RECEIVED, 0, NULL); } if (!read_callback_.is_null()) { - int rv = PopulateUserReadBuffer(); + int rv = PopulateUserReadBuffer(user_buffer_->data(), user_buffer_len_); CompletionCallback c = read_callback_; read_callback_.Reset(); user_buffer_ = NULL; + user_buffer_len_ = 0; c.Run(rv); } return OK; @@ -591,7 +577,7 @@ void SpdyProxyClientSocket::OnClose(int status) { read_callback.Run(status); } else if (!read_callback_.is_null()) { // If we have a read_callback_, the we need to make sure we call it back. - OnDataReceived(NULL, 0); + OnDataReceived(scoped_ptr<SpdyBuffer>()); } // This may have been deleted by read_callback_, so check first. if (weak_ptr && !write_callback.is_null()) diff --git a/net/spdy/spdy_proxy_client_socket.h b/net/spdy/spdy_proxy_client_socket.h index 8d3f047..e25e1e3 100644 --- a/net/spdy/spdy_proxy_client_socket.h +++ b/net/spdy/spdy_proxy_client_socket.h @@ -5,8 +5,8 @@ #ifndef NET_SPDY_SPDY_PROXY_CLIENT_SOCKET_H_ #define NET_SPDY_SPDY_PROXY_CLIENT_SOCKET_H_ -#include <string> #include <list> +#include <string> #include "base/basictypes.h" #include "base/memory/ref_counted.h" @@ -21,6 +21,7 @@ #include "net/http/proxy_client_socket.h" #include "net/spdy/spdy_http_stream.h" #include "net/spdy/spdy_protocol.h" +#include "net/spdy/spdy_read_queue.h" #include "net/spdy/spdy_session.h" #include "net/spdy/spdy_stream.h" @@ -97,7 +98,7 @@ class NET_EXPORT_PRIVATE SpdyProxyClientSocket : public ProxyClientSocket, base::Time response_time, int status) OVERRIDE; virtual void OnHeadersSent() OVERRIDE; - virtual int OnDataReceived(const char* data, int length) OVERRIDE; + virtual int OnDataReceived(scoped_ptr<SpdyBuffer> buffer) OVERRIDE; virtual void OnDataSent(size_t bytes_sent) OVERRIDE; virtual void OnClose(int status) OVERRIDE; @@ -126,7 +127,7 @@ class NET_EXPORT_PRIVATE SpdyProxyClientSocket : public ProxyClientSocket, // Populates |user_buffer_| with as much read data as possible // and returns the number of bytes read. - int PopulateUserReadBuffer(); + size_t PopulateUserReadBuffer(char* out, size_t len); State next_state_; @@ -149,10 +150,11 @@ class NET_EXPORT_PRIVATE SpdyProxyClientSocket : public ProxyClientSocket, scoped_refptr<HttpAuthController> auth_; // We buffer the response body as it arrives asynchronously from the stream. - std::list<scoped_refptr<DrainableIOBuffer> > read_buffer_; + SpdyReadQueue read_buffer_queue_; // User provided buffer for the Read() response. - scoped_refptr<DrainableIOBuffer> user_buffer_; + scoped_refptr<IOBuffer> user_buffer_; + size_t user_buffer_len_; // User specified number of bytes to be written. int write_buffer_len_; diff --git a/net/spdy/spdy_read_queue.cc b/net/spdy/spdy_read_queue.cc new file mode 100644 index 0000000..36f1e06 --- /dev/null +++ b/net/spdy/spdy_read_queue.cc @@ -0,0 +1,59 @@ +// Copyright (c) 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "net/spdy/spdy_read_queue.h" + +#include "base/logging.h" +#include "base/stl_util.h" +#include "net/spdy/spdy_buffer.h" + +namespace net { + +SpdyReadQueue::SpdyReadQueue() : total_size_(0) {} + +SpdyReadQueue::~SpdyReadQueue() { + Clear(); +} + +bool SpdyReadQueue::IsEmpty() const { + DCHECK_EQ(queue_.empty(), total_size_ == 0); + return queue_.empty(); +} + +size_t SpdyReadQueue::GetTotalSize() const { + return total_size_; +} + +void SpdyReadQueue::Enqueue(scoped_ptr<SpdyBuffer> buffer) { + DCHECK_GT(buffer->GetRemainingSize(), 0u); + total_size_ += buffer->GetRemainingSize(); + queue_.push_back(buffer.release()); +} + +size_t SpdyReadQueue::Dequeue(char* out, size_t len) { + DCHECK_GT(len, 0u); + size_t bytes_copied = 0; + while (!queue_.empty() && bytes_copied < len) { + SpdyBuffer* buffer = queue_.front(); + size_t bytes_to_copy = + std::min(len - bytes_copied, buffer->GetRemainingSize()); + memcpy(out + bytes_copied, buffer->GetRemainingData(), bytes_to_copy); + bytes_copied += bytes_to_copy; + if (bytes_to_copy == buffer->GetRemainingSize()) { + delete queue_.front(); + queue_.pop_front(); + } else { + buffer->Consume(bytes_to_copy); + } + } + total_size_ -= bytes_copied; + return bytes_copied; +} + +void SpdyReadQueue::Clear() { + STLDeleteElements(&queue_); + queue_.clear(); +} + +} // namespace diff --git a/net/spdy/spdy_read_queue.h b/net/spdy/spdy_read_queue.h new file mode 100644 index 0000000..65f3daf --- /dev/null +++ b/net/spdy/spdy_read_queue.h @@ -0,0 +1,51 @@ +// Copyright (c) 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef NET_SPDY_SPDY_BUFFER_QUEUE_H_ +#define NET_SPDY_SPDY_BUFFER_QUEUE_H_ + +#include <cstddef> +#include <deque> + +#include "base/basictypes.h" +#include "base/memory/scoped_ptr.h" +#include "net/base/net_export.h" + +namespace net { + +class SpdyBuffer; + +// A FIFO queue of incoming data from a SPDY connection. Useful for +// SpdyStream delegates. +class NET_EXPORT_PRIVATE SpdyReadQueue { + public: + SpdyReadQueue(); + ~SpdyReadQueue(); + + // Returns whether there's anything in the queue. + bool IsEmpty() const; + + // Returns the total number of bytes in the queue. + size_t GetTotalSize() const; + + // Enqueues the bytes in |buffer|. + void Enqueue(scoped_ptr<SpdyBuffer> buffer); + + // Dequeues up to |len| (which must be positive) bytes into + // |out|. Returns the number of bytes dequeued. + size_t Dequeue(char* out, size_t len); + + // Removes all bytes from the queue. + void Clear(); + + private: + std::deque<SpdyBuffer*> queue_; + size_t total_size_; + + DISALLOW_COPY_AND_ASSIGN(SpdyReadQueue); +}; + +} // namespace net + +#endif // NET_SPDY_SPDY_BUFFER_QUEUE_H_ diff --git a/net/spdy/spdy_read_queue_unittest.cc b/net/spdy/spdy_read_queue_unittest.cc new file mode 100644 index 0000000..7281f68 --- /dev/null +++ b/net/spdy/spdy_read_queue_unittest.cc @@ -0,0 +1,106 @@ +// Copyright (c) 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "net/spdy/spdy_read_queue.h" + +#include <algorithm> +#include <cstddef> +#include <string> + +#include "base/memory/scoped_ptr.h" +#include "base/stl_util.h" +#include "net/spdy/spdy_buffer.h" +#include "testing/gtest/include/gtest/gtest.h" + +namespace net { + +namespace { + +const char kData[] = "SPDY read queue test data.\0Some more data."; +const size_t kDataSize = arraysize(kData); + +// Enqueues |data| onto |queue| in chunks of at most |max_buffer_size| +// bytes. +void EnqueueString(const std::string& data, + size_t max_buffer_size, + SpdyReadQueue* queue) { + ASSERT_GT(data.size(), 0u); + ASSERT_GT(max_buffer_size, 0u); + size_t old_total_size = queue->GetTotalSize(); + for (size_t i = 0; i < data.size();) { + size_t buffer_size = std::min(data.size() - i, max_buffer_size); + queue->Enqueue( + scoped_ptr<SpdyBuffer>(new SpdyBuffer(data.data() + i, buffer_size))); + i += buffer_size; + EXPECT_FALSE(queue->IsEmpty()); + EXPECT_EQ(old_total_size + i, queue->GetTotalSize()); + } +} + +// Dequeues all bytes in |queue| in chunks of at most +// |max_buffer_size| bytes and returns the data as a string. +std::string DrainToString(size_t max_buffer_size, SpdyReadQueue* queue) { + std::string data; + + // Pad the buffer so we can detect out-of-bound writes. + size_t padding = std::max(static_cast<size_t>(4096), queue->GetTotalSize()); + size_t buffer_size_with_padding = padding + max_buffer_size + padding; + scoped_ptr<char[]> buffer(new char[buffer_size_with_padding]); + std::memset(buffer.get(), 0, buffer_size_with_padding); + char* buffer_data = buffer.get() + padding; + + while (!queue->IsEmpty()) { + size_t old_total_size = queue->GetTotalSize(); + EXPECT_GT(old_total_size, 0u); + size_t dequeued_bytes = queue->Dequeue(buffer_data, max_buffer_size); + + // Make sure |queue| doesn't write past either end of its given + // boundaries. + for (int i = 1; i <= static_cast<int>(padding); ++i) { + EXPECT_EQ('\0', buffer_data[-i]) << -i; + } + for (size_t i = 0; i < padding; ++i) { + EXPECT_EQ('\0', buffer_data[max_buffer_size + i]) << i; + } + + data.append(buffer_data, dequeued_bytes); + EXPECT_EQ(dequeued_bytes, std::min(max_buffer_size, dequeued_bytes)); + EXPECT_EQ(queue->GetTotalSize(), old_total_size - dequeued_bytes); + } + EXPECT_TRUE(queue->IsEmpty()); + return data; +} + +// Enqueue a test string with the given enqueue/dequeue max buffer +// sizes. +void RunEnqueueDequeueTest(size_t enqueue_max_buffer_size, + size_t dequeue_max_buffer_size) { + std::string data(kData, kDataSize); + SpdyReadQueue read_queue; + EnqueueString(data, enqueue_max_buffer_size, &read_queue); + const std::string& drained_data = + DrainToString(dequeue_max_buffer_size, &read_queue); + EXPECT_EQ(data, drained_data); +} + +class SpdyReadQueueTest : public ::testing::Test {}; + +// Call RunEnqueueDequeueTest() with various buffer size combinatinos. + +TEST_F(SpdyReadQueueTest, LargeEnqueueAndDequeueBuffers) { + RunEnqueueDequeueTest(2 * kDataSize, 2 * kDataSize); +} + +TEST_F(SpdyReadQueueTest, OneByteEnqueueAndDequeueBuffers) { + RunEnqueueDequeueTest(1, 1); +} + +TEST_F(SpdyReadQueueTest, CoprimeBufferSizes) { + RunEnqueueDequeueTest(2, 3); + RunEnqueueDequeueTest(3, 2); +} + +} // namespace + +} // namespace net diff --git a/net/spdy/spdy_session.cc b/net/spdy/spdy_session.cc index 1b59ec7..f705731 100644 --- a/net/spdy/spdy_session.cc +++ b/net/spdy/spdy_session.cc @@ -31,9 +31,9 @@ #include "net/cert/asn1_util.h" #include "net/http/http_network_session.h" #include "net/http/http_server_properties.h" +#include "net/spdy/spdy_buffer_producer.h" #include "net/spdy/spdy_credential_builder.h" #include "net/spdy/spdy_frame_builder.h" -#include "net/spdy/spdy_frame_producer.h" #include "net/spdy/spdy_http_utils.h" #include "net/spdy/spdy_protocol.h" #include "net/spdy/spdy_session_pool.h" @@ -312,6 +312,7 @@ SpdySession::SpdySession(const HostPortProxyPair& host_port_proxy_pair, stream_hi_water_mark_(kFirstStreamId), write_pending_(false), in_flight_write_frame_type_(DATA), + in_flight_write_frame_size_(0), delayed_write_pending_(false), is_secure_(false), certificate_error_code_(OK), @@ -636,7 +637,7 @@ int SpdySession::GetProtocolVersion() const { void SpdySession::EnqueueStreamWrite( SpdyStream* stream, SpdyFrameType frame_type, - scoped_ptr<SpdyFrameProducer> producer) { + scoped_ptr<SpdyBufferProducer> producer) { DCHECK(frame_type == HEADERS || frame_type == DATA || frame_type == CREDENTIAL || @@ -738,18 +739,18 @@ scoped_ptr<SpdyFrame> SpdySession::CreateHeadersFrame( return frame.Pass(); } -scoped_ptr<SpdyFrame> SpdySession::CreateDataFrame(SpdyStreamId stream_id, - net::IOBuffer* data, - int len, - SpdyDataFlags flags) { - // Find our stream +scoped_ptr<SpdyBuffer> SpdySession::CreateDataBuffer(SpdyStreamId stream_id, + net::IOBuffer* data, + int len, + SpdyDataFlags flags) { + // Find our stream. CHECK(IsStreamActive(stream_id)); scoped_refptr<SpdyStream> stream = active_streams_[stream_id]; CHECK_EQ(stream->stream_id(), stream_id); if (len < 0) { NOTREACHED(); - return scoped_ptr<SpdyFrame>(); + return scoped_ptr<SpdyBuffer>(); } if (len > kMaxSpdyFrameChunkSize) { @@ -772,7 +773,7 @@ scoped_ptr<SpdyFrame> SpdySession::CreateDataFrame(SpdyStreamId stream_id, net_log().AddEvent( NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_ON_STREAM_SEND_WINDOW, NetLog::IntegerCallback("stream_id", stream_id)); - return scoped_ptr<SpdyFrame>(); + return scoped_ptr<SpdyBuffer>(); } if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) { effective_window_size = @@ -784,7 +785,7 @@ scoped_ptr<SpdyFrame> SpdySession::CreateDataFrame(SpdyStreamId stream_id, net_log().AddEvent( NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_ON_SESSION_SEND_WINDOW, NetLog::IntegerCallback("stream_id", stream_id)); - return scoped_ptr<SpdyFrame>(); + return scoped_ptr<SpdyBuffer>(); } } @@ -815,7 +816,7 @@ scoped_ptr<SpdyFrame> SpdySession::CreateDataFrame(SpdyStreamId stream_id, buffered_spdy_framer_->CreateDataFrame( stream_id, data->data(), static_cast<uint32>(len), flags)); - return frame.Pass(); + return scoped_ptr<SpdyBuffer>(new SpdyBuffer(frame.Pass())); } void SpdySession::CloseStream(SpdyStreamId stream_id, int status) { @@ -982,42 +983,45 @@ void SpdySession::OnWriteComplete(int result) { scoped_refptr<SpdySession> self(this); DCHECK(write_pending_); - DCHECK_GT(in_flight_write_.buffer()->BytesRemaining(), 0); + DCHECK_GT(in_flight_write_->GetRemainingSize(), 0u); last_activity_time_ = base::TimeTicks::Now(); write_pending_ = false; if (result < 0) { - in_flight_write_.Release(); + in_flight_write_.reset(); in_flight_write_frame_type_ = DATA; - CloseSessionOnError(static_cast<net::Error>(result), true, "Write error"); + in_flight_write_frame_size_ = 0; + in_flight_write_stream_ = NULL; + CloseSessionOnError(static_cast<Error>(result), true, "Write error"); return; } // It should not be possible to have written more bytes than our // in_flight_write_. - DCHECK_LE(result, in_flight_write_.buffer()->BytesRemaining()); - - in_flight_write_.buffer()->DidConsume(result); - - // We only notify the stream when we've fully written the pending frame. - if (in_flight_write_.buffer()->BytesRemaining() == 0) { - DCHECK_GT(result, 0); - - scoped_refptr<SpdyStream> stream = in_flight_write_.stream(); + DCHECK_LE(static_cast<size_t>(result), + in_flight_write_->GetRemainingSize()); + + if (result > 0) { + in_flight_write_->Consume(static_cast<size_t>(result)); + + // We only notify the stream when we've fully written the pending frame. + if (in_flight_write_->GetRemainingSize() == 0) { + // It is possible that the stream was cancelled while we were + // writing to the socket. + if (in_flight_write_stream_ && !in_flight_write_stream_->cancelled()) { + DCHECK_GT(in_flight_write_frame_size_, 0u); + in_flight_write_stream_->OnFrameWriteComplete( + in_flight_write_frame_type_, + in_flight_write_frame_size_); + } - // It is possible that the stream was cancelled while we were writing - // to the socket. - if (stream && !stream->cancelled()) { - DCHECK_GT(in_flight_write_.buffer()->size(), 0); - stream->OnFrameWriteComplete( - in_flight_write_frame_type_, - static_cast<size_t>(in_flight_write_.buffer()->size())); + // Cleanup the write which just completed. + in_flight_write_.reset(); + in_flight_write_frame_type_ = DATA; + in_flight_write_frame_size_ = 0; + in_flight_write_stream_ = NULL; } - - // Cleanup the write which just completed. - in_flight_write_.Release(); - in_flight_write_frame_type_ = DATA; } // Write more data. We're already in a continuation, so we can go @@ -1057,12 +1061,12 @@ void SpdySession::WriteSocket() { // returns error (or ERR_IO_PENDING). DCHECK(buffered_spdy_framer_.get()); while (true) { - if (in_flight_write_.buffer()) { - DCHECK_GT(in_flight_write_.buffer()->BytesRemaining(), 0); + if (in_flight_write_) { + DCHECK_GT(in_flight_write_->GetRemainingSize(), 0u); } else { // Grab the next frame to send. SpdyFrameType frame_type = DATA; - scoped_ptr<SpdyFrameProducer> producer; + scoped_ptr<SpdyBufferProducer> producer; scoped_refptr<SpdyStream> stream; if (!write_queue_.Dequeue(&frame_type, &producer, &stream)) break; @@ -1086,25 +1090,25 @@ void SpdySession::WriteSocket() { } } - scoped_ptr<SpdyFrame> frame = producer->ProduceFrame(); - if (!frame) { + in_flight_write_ = producer->ProduceBuffer(); + if (!in_flight_write_) { NOTREACHED(); continue; } - DCHECK_GT(frame->size(), 0u); - - // TODO(mbelshe): We have too much copying of data here. - scoped_refptr<IOBufferWithSize> buffer = - new IOBufferWithSize(frame->size()); - memcpy(buffer->data(), frame->data(), frame->size()); - in_flight_write_ = SpdyIOBuffer(buffer, frame->size(), stream); in_flight_write_frame_type_ = frame_type; + in_flight_write_frame_size_ = in_flight_write_->GetRemainingSize(); + DCHECK_GE(in_flight_write_frame_size_, + buffered_spdy_framer_->GetFrameMinimumSize()); + in_flight_write_stream_ = stream; } write_pending_ = true; + // We keep |in_flight_write_| alive until OnWriteComplete(), so + // it's okay to use GetIOBufferForRemainingData() since the socket + // doesn't use the IOBuffer past OnWriteComplete(). int rv = connection_->socket()->Write( - in_flight_write_.buffer(), - in_flight_write_.buffer()->BytesRemaining(), + in_flight_write_->GetIOBufferForRemainingData(), + in_flight_write_->GetRemainingSize(), base::Bind(&SpdySession::OnWriteComplete, weak_factory_.GetWeakPtr())); if (rv == net::ERR_IO_PENDING) break; @@ -1294,13 +1298,15 @@ void SpdySession::EnqueueSessionWrite(RequestPriority priority, frame_type == PING); EnqueueWrite( priority, frame_type, - scoped_ptr<SpdyFrameProducer>(new SimpleFrameProducer(frame.Pass())), + scoped_ptr<SpdyBufferProducer>( + new SimpleBufferProducer( + scoped_ptr<SpdyBuffer>(new SpdyBuffer(frame.Pass())))), NULL); } void SpdySession::EnqueueWrite(RequestPriority priority, SpdyFrameType frame_type, - scoped_ptr<SpdyFrameProducer> producer, + scoped_ptr<SpdyBufferProducer> producer, const scoped_refptr<SpdyStream>& stream) { write_queue_.Enqueue(priority, frame_type, producer.Pass(), stream); WriteSocketLater(); @@ -1418,16 +1424,24 @@ void SpdySession::OnStreamFrameData(SpdyStreamId stream_id, base::Bind(&NetLogSpdyDataCallback, stream_id, len, fin)); } + ActiveStreamMap::const_iterator it = active_streams_.find(stream_id); + // By the time data comes in, the stream may already be inactive. - if (!IsStreamActive(stream_id)) + if (it == active_streams_.end()) return; // Only decrease the window size for data for active streams. if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION && len > 0) DecreaseRecvWindowSize(static_cast<int32>(len)); - scoped_refptr<SpdyStream> stream = active_streams_[stream_id]; - stream->OnDataReceived(data, len); + scoped_ptr<SpdyBuffer> buffer; + if (data) { + DCHECK_GT(len, 0u); + buffer.reset(new SpdyBuffer(data, len)); + } else { + DCHECK_EQ(len, 0u); + } + it->second->OnDataReceived(buffer.Pass()); } void SpdySession::OnSetting(SpdySettingsIds id, @@ -1696,7 +1710,7 @@ void SpdySession::OnRstStream(SpdyStreamId stream_id, CHECK(!stream->cancelled()); if (status == 0) { - stream->OnDataReceived(NULL, 0); + stream->OnDataReceived(scoped_ptr<SpdyBuffer>()); } else if (status == RST_STREAM_REFUSED_STREAM) { DeleteStream(stream_id, ERR_SPDY_SERVER_REFUSED_STREAM); } else { diff --git a/net/spdy/spdy_session.h b/net/spdy/spdy_session.h index b4e7335..55ac9f9 100644 --- a/net/spdy/spdy_session.h +++ b/net/spdy/spdy_session.h @@ -26,9 +26,9 @@ #include "net/socket/ssl_client_socket.h" #include "net/socket/stream_socket.h" #include "net/spdy/buffered_spdy_framer.h" +#include "net/spdy/spdy_buffer.h" #include "net/spdy/spdy_credential_state.h" #include "net/spdy/spdy_header_block.h" -#include "net/spdy/spdy_io_buffer.h" #include "net/spdy/spdy_protocol.h" #include "net/spdy/spdy_session_pool.h" #include "net/spdy/spdy_write_queue.h" @@ -246,7 +246,7 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, // producer is used to produce its frame. void EnqueueStreamWrite(SpdyStream* stream, SpdyFrameType frame_type, - scoped_ptr<SpdyFrameProducer> producer); + scoped_ptr<SpdyBufferProducer> producer); // Creates and returns a SYN frame for |stream_id|. scoped_ptr<SpdyFrame> CreateSynStream( @@ -271,12 +271,12 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, const SpdyHeaderBlock& headers, SpdyControlFlags flags); - // Creates and returns a data frame. May return NULL if stalled by - // flow control. - scoped_ptr<SpdyFrame> CreateDataFrame(SpdyStreamId stream_id, - net::IOBuffer* data, - int len, - SpdyDataFlags flags); + // Creates and returns a SpdyBuffer holding a data frame with the + // given data. May return NULL if stalled by flow control. + scoped_ptr<SpdyBuffer> CreateDataBuffer(SpdyStreamId stream_id, + net::IOBuffer* data, + int len, + SpdyDataFlags flags); // Close a stream. void CloseStream(SpdyStreamId stream_id, int status); @@ -584,7 +584,7 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, // with the given priority. void EnqueueWrite(RequestPriority priority, SpdyFrameType frame_type, - scoped_ptr<SpdyFrameProducer> producer, + scoped_ptr<SpdyBufferProducer> producer, const scoped_refptr<SpdyStream>& stream); // Track active streams in the active stream list. @@ -771,10 +771,18 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>, // The write queue. SpdyWriteQueue write_queue_; - // The packet we are currently sending. - bool write_pending_; // Will be true when a write is in progress. - SpdyIOBuffer in_flight_write_; // This is the write buffer in progress. + // Data for the frame we are currently sending. + // Whether we have a socket write pending completion. + bool write_pending_; + // The buffer we're currently writing. + scoped_ptr<SpdyBuffer> in_flight_write_; + // The type of the frame in |in_flight_write_|. SpdyFrameType in_flight_write_frame_type_; + // The size of the frame in |in_flight_write_|. + size_t in_flight_write_frame_size_; + // The stream to notify when |in_flight_write_| has been written to + // the socket completely. + scoped_refptr<SpdyStream> in_flight_write_stream_; // Flag if we have a pending message scheduled for WriteSocket. bool delayed_write_pending_; diff --git a/net/spdy/spdy_session_spdy2_unittest.cc b/net/spdy/spdy_session_spdy2_unittest.cc index 5069a22..e483e35 100644 --- a/net/spdy/spdy_session_spdy2_unittest.cc +++ b/net/spdy/spdy_session_spdy2_unittest.cc @@ -10,9 +10,7 @@ #include "net/base/net_log_unittest.h" #include "net/base/request_priority.h" #include "net/base/test_data_directory.h" -#include "net/base/test_data_stream.h" #include "net/dns/host_cache.h" -#include "net/spdy/spdy_io_buffer.h" #include "net/spdy/spdy_session_pool.h" #include "net/spdy/spdy_session_test_util.h" #include "net/spdy/spdy_stream.h" diff --git a/net/spdy/spdy_session_spdy3_unittest.cc b/net/spdy/spdy_session_spdy3_unittest.cc index 40e805f..1c63960 100644 --- a/net/spdy/spdy_session_spdy3_unittest.cc +++ b/net/spdy/spdy_session_spdy3_unittest.cc @@ -13,7 +13,6 @@ #include "net/base/test_data_stream.h" #include "net/dns/host_cache.h" #include "net/spdy/spdy_http_utils.h" -#include "net/spdy/spdy_io_buffer.h" #include "net/spdy/spdy_session_pool.h" #include "net/spdy/spdy_session_test_util.h" #include "net/spdy/spdy_stream.h" diff --git a/net/spdy/spdy_stream.cc b/net/spdy/spdy_stream.cc index 69d9cfa..3de3716 100644 --- a/net/spdy/spdy_stream.cc +++ b/net/spdy/spdy_stream.cc @@ -12,7 +12,7 @@ #include "base/message_loop.h" #include "base/stringprintf.h" #include "base/values.h" -#include "net/spdy/spdy_frame_producer.h" +#include "net/spdy/spdy_buffer_producer.h" #include "net/spdy/spdy_http_utils.h" #include "net/spdy/spdy_session.h" @@ -54,22 +54,23 @@ bool ContainsUpperAscii(const std::string& str) { } // namespace // A wrapper around a stream that calls into ProduceSynStreamFrame(). -class SpdyStream::SynStreamFrameProducer : public SpdyFrameProducer { +class SpdyStream::SynStreamBufferProducer : public SpdyBufferProducer { public: - SynStreamFrameProducer(const base::WeakPtr<SpdyStream>& stream) + SynStreamBufferProducer(const base::WeakPtr<SpdyStream>& stream) : stream_(stream) { DCHECK(stream_); } - virtual ~SynStreamFrameProducer() {} + virtual ~SynStreamBufferProducer() {} - virtual scoped_ptr<SpdyFrame> ProduceFrame() OVERRIDE { + virtual scoped_ptr<SpdyBuffer> ProduceBuffer() OVERRIDE { if (!stream_) { NOTREACHED(); - return scoped_ptr<SpdyFrame>(); + return scoped_ptr<SpdyBuffer>(); } DCHECK_GT(stream_->stream_id(), 0u); - return stream_->ProduceSynStreamFrame(); + return scoped_ptr<SpdyBuffer>( + new SpdyBuffer(stream_->ProduceSynStreamFrame())); } private: @@ -78,9 +79,9 @@ class SpdyStream::SynStreamFrameProducer : public SpdyFrameProducer { // A wrapper around a stream that calls into ProduceHeaderFrame() with // a given header block. -class SpdyStream::HeaderFrameProducer : public SpdyFrameProducer { +class SpdyStream::HeaderBufferProducer : public SpdyBufferProducer { public: - HeaderFrameProducer(const base::WeakPtr<SpdyStream>& stream, + HeaderBufferProducer(const base::WeakPtr<SpdyStream>& stream, scoped_ptr<SpdyHeaderBlock> headers) : stream_(stream), headers_(headers.Pass()) { @@ -88,15 +89,16 @@ class SpdyStream::HeaderFrameProducer : public SpdyFrameProducer { DCHECK(headers_); } - virtual ~HeaderFrameProducer() {} + virtual ~HeaderBufferProducer() {} - virtual scoped_ptr<SpdyFrame> ProduceFrame() OVERRIDE { + virtual scoped_ptr<SpdyBuffer> ProduceBuffer() OVERRIDE { if (!stream_) { NOTREACHED(); - return scoped_ptr<SpdyFrame>(); + return scoped_ptr<SpdyBuffer>(); } DCHECK_GT(stream_->stream_id(), 0u); - return stream_->ProduceHeaderFrame(headers_.Pass()); + return scoped_ptr<SpdyBuffer>( + new SpdyBuffer(stream_->ProduceHeaderFrame(headers_.Pass()))); } private: @@ -175,17 +177,17 @@ void SpdyStream::PushedStreamReplayData() { return; } - std::vector<scoped_refptr<IOBufferWithSize> > buffers; - buffers.swap(pending_buffers_); + std::vector<SpdyBuffer*> buffers; + pending_buffers_.release(&buffers); for (size_t i = 0; i < buffers.size(); ++i) { // It is always possible that a callback to the delegate results in // the delegate no longer being available. if (!delegate_) break; if (buffers[i]) { - delegate_->OnDataReceived(buffers[i]->data(), buffers[i]->size()); + delegate_->OnDataReceived(scoped_ptr<SpdyBuffer>(buffers[i])); } else { - delegate_->OnDataReceived(NULL, 0); + delegate_->OnDataReceived(scoped_ptr<SpdyBuffer>()); session_->CloseStream(stream_id_, net::OK); // Note: |this| may be deleted after calling CloseStream. DCHECK_EQ(buffers.size() - 1, i); @@ -450,9 +452,8 @@ int SpdyStream::OnHeaders(const SpdyHeaderBlock& headers) { return rv; } -void SpdyStream::OnDataReceived(const char* data, size_t length) { +void SpdyStream::OnDataReceived(scoped_ptr<SpdyBuffer> buffer) { DCHECK(session_->IsStreamActive(stream_id_)); - DCHECK_LT(length, 1u << 24); // If we don't have a response, then the SYN_REPLY did not come through. // We cannot pass data up to the caller unless the reply headers have been // received. @@ -465,10 +466,8 @@ void SpdyStream::OnDataReceived(const char* data, size_t length) { if (!delegate_ || continue_buffering_data_) { // It should be valid for this to happen in the server push case. // We'll return received data when delegate gets attached to the stream. - if (length > 0) { - IOBufferWithSize* buf = new IOBufferWithSize(length); - memcpy(buf->data(), data, length); - pending_buffers_.push_back(make_scoped_refptr(buf)); + if (buffer) { + pending_buffers_.push_back(buffer.release()); } else { pending_buffers_.push_back(NULL); metrics_.StopStream(); @@ -480,14 +479,15 @@ void SpdyStream::OnDataReceived(const char* data, size_t length) { CHECK(!closed()); - // A zero-length read means that the stream is being closed. - if (length == 0) { + if (!buffer) { metrics_.StopStream(); session_->CloseStream(stream_id_, net::OK); // Note: |this| may be deleted after calling CloseStream. return; } + size_t length = buffer->GetRemainingSize(); + DCHECK_LE(length, session_->GetDataFrameMaximumPayload()); if (session_->flow_control_state() >= SpdySession::FLOW_CONTROL_STREAM) DecreaseRecvWindowSize(static_cast<int32>(length)); @@ -496,7 +496,7 @@ void SpdyStream::OnDataReceived(const char* data, size_t length) { recv_bytes_ += length; recv_last_byte_time_ = base::TimeTicks::Now(); - if (delegate_->OnDataReceived(data, length) != net::OK) { + if (delegate_->OnDataReceived(buffer.Pass()) != net::OK) { // |delegate_| rejected the data. LogStreamError(ERR_SPDY_PROTOCOL_ERROR, "Delegate rejected the data"); session_->CloseStream(stream_id_, ERR_SPDY_PROTOCOL_ERROR); @@ -580,8 +580,8 @@ void SpdyStream::QueueHeaders(scoped_ptr<SpdyHeaderBlock> headers) { session_->EnqueueStreamWrite( this, HEADERS, - scoped_ptr<SpdyFrameProducer>( - new HeaderFrameProducer( + scoped_ptr<SpdyBufferProducer>( + new HeaderBufferProducer( weak_ptr_factory_.GetWeakPtr(), headers.Pass()))); } @@ -594,15 +594,16 @@ void SpdyStream::QueueStreamData(IOBuffer* data, CHECK_GT(stream_id_, 0u); CHECK(!cancelled()); - scoped_ptr<SpdyFrame> data_frame(session_->CreateDataFrame( + scoped_ptr<SpdyBuffer> data_buffer(session_->CreateDataBuffer( stream_id_, data, length, flags)); - if (!data_frame) + // We'll get called again by PossiblyResumeIfSendStalled(). + if (!data_buffer) return; session_->EnqueueStreamWrite( this, DATA, - scoped_ptr<SpdyFrameProducer>( - new SimpleFrameProducer(data_frame.Pass()))); + scoped_ptr<SpdyBufferProducer>( + new SimpleBufferProducer(data_buffer.Pass()))); } bool SpdyStream::GetSSLInfo(SSLInfo* ssl_info, @@ -784,7 +785,9 @@ int SpdyStream::DoSendDomainBoundCert() { // the state machine appropriately. session_->EnqueueStreamWrite( this, CREDENTIAL, - scoped_ptr<SpdyFrameProducer>(new SimpleFrameProducer(frame.Pass()))); + scoped_ptr<SpdyBufferProducer>( + new SimpleBufferProducer( + scoped_ptr<SpdyBuffer>(new SpdyBuffer(frame.Pass()))))); return ERR_IO_PENDING; } @@ -800,8 +803,8 @@ int SpdyStream::DoSendHeaders() { session_->EnqueueStreamWrite( this, SYN_STREAM, - scoped_ptr<SpdyFrameProducer>( - new SynStreamFrameProducer(weak_ptr_factory_.GetWeakPtr()))); + scoped_ptr<SpdyBufferProducer>( + new SynStreamBufferProducer(weak_ptr_factory_.GetWeakPtr()))); return ERR_IO_PENDING; } diff --git a/net/spdy/spdy_stream.h b/net/spdy/spdy_stream.h index 881b48e..fd145ea 100644 --- a/net/spdy/spdy_stream.h +++ b/net/spdy/spdy_stream.h @@ -12,6 +12,7 @@ #include "base/basictypes.h" #include "base/memory/ref_counted.h" #include "base/memory/scoped_ptr.h" +#include "base/memory/scoped_vector.h" #include "base/memory/weak_ptr.h" #include "googleurl/src/gurl.h" #include "net/base/bandwidth_metrics.h" @@ -83,9 +84,10 @@ class NET_EXPORT_PRIVATE SpdyStream // Called when a HEADERS frame is sent. virtual void OnHeadersSent() = 0; - // Called when data is received. - // Returns network error code. OK when it successfully receives data. - virtual int OnDataReceived(const char* data, int length) = 0; + // Called when data is received. |buffer| may be NULL, which + // signals EOF. Must return OK if the data was received + // successfully, or a network error code otherwise. + virtual int OnDataReceived(scoped_ptr<SpdyBuffer> buffer) = 0; // Called when data is sent. virtual void OnDataSent(size_t bytes_sent) = 0; @@ -218,7 +220,7 @@ class NET_EXPORT_PRIVATE SpdyStream // // |length| is the number of bytes received (at most 2^24 - 1) or 0 if // the stream is being closed. - void OnDataReceived(const char* buffer, size_t length); + void OnDataReceived(scoped_ptr<SpdyBuffer> buffer); // Called by the SpdySession when a frame has been successfully and // completely written. |frame_size| is the total size of the frame @@ -289,8 +291,8 @@ class NET_EXPORT_PRIVATE SpdyStream int GetProtocolVersion() const; private: - class SynStreamFrameProducer; - class HeaderFrameProducer; + class SynStreamBufferProducer; + class HeaderBufferProducer; enum State { STATE_NONE, @@ -411,7 +413,7 @@ class NET_EXPORT_PRIVATE SpdyStream int recv_bytes_; // Data received before delegate is attached. - std::vector<scoped_refptr<IOBufferWithSize> > pending_buffers_; + ScopedVector<SpdyBuffer> pending_buffers_; SSLClientCertType domain_bound_cert_type_; std::string domain_bound_private_key_; diff --git a/net/spdy/spdy_stream_test_util.cc b/net/spdy/spdy_stream_test_util.cc index ac96cf9..0165c20 100644 --- a/net/spdy/spdy_stream_test_util.cc +++ b/net/spdy/spdy_stream_test_util.cc @@ -37,7 +37,7 @@ int ClosingDelegate::OnResponseReceived(const SpdyHeaderBlock& response, void ClosingDelegate::OnHeadersSent() {} -int ClosingDelegate::OnDataReceived(const char* data, int length) { +int ClosingDelegate::OnDataReceived(scoped_ptr<SpdyBuffer> buffer) { return OK; } @@ -77,8 +77,11 @@ void StreamDelegateBase::OnHeadersSent() { headers_sent_++; } -int StreamDelegateBase::OnDataReceived(const char* buffer, int bytes) { - received_data_ += std::string(buffer, bytes); +int StreamDelegateBase::OnDataReceived(scoped_ptr<SpdyBuffer> buffer) { + if (buffer) { + received_data_ += std::string(buffer->GetRemainingData(), + buffer->GetRemainingSize()); + } return OK; } diff --git a/net/spdy/spdy_stream_test_util.h b/net/spdy/spdy_stream_test_util.h index 5fc74af..a7ca912 100644 --- a/net/spdy/spdy_stream_test_util.h +++ b/net/spdy/spdy_stream_test_util.h @@ -32,7 +32,7 @@ class ClosingDelegate : public SpdyStream::Delegate { base::Time response_time, int status) OVERRIDE; virtual void OnHeadersSent() OVERRIDE; - virtual int OnDataReceived(const char* data, int length) OVERRIDE; + virtual int OnDataReceived(scoped_ptr<SpdyBuffer> buffer) OVERRIDE; virtual void OnDataSent(size_t bytes_sent) OVERRIDE; virtual void OnClose(int status) OVERRIDE; @@ -54,7 +54,7 @@ class StreamDelegateBase : public SpdyStream::Delegate { base::Time response_time, int status) OVERRIDE; virtual void OnHeadersSent() OVERRIDE; - virtual int OnDataReceived(const char* buffer, int bytes) OVERRIDE; + virtual int OnDataReceived(scoped_ptr<SpdyBuffer> buffer) OVERRIDE; virtual void OnDataSent(size_t bytes_sent) OVERRIDE; virtual void OnClose(int status) OVERRIDE; diff --git a/net/spdy/spdy_websocket_stream.cc b/net/spdy/spdy_websocket_stream.cc index 1f80d48..a17ff73 100644 --- a/net/spdy/spdy_websocket_stream.cc +++ b/net/spdy/spdy_websocket_stream.cc @@ -8,8 +8,8 @@ #include "base/bind_helpers.h" #include "base/compiler_specific.h" #include "googleurl/src/gurl.h" -#include "net/base/net_errors.h" #include "net/base/io_buffer.h" +#include "net/base/net_errors.h" #include "net/spdy/spdy_framer.h" #include "net/spdy/spdy_protocol.h" #include "net/spdy/spdy_session.h" @@ -116,9 +116,9 @@ void SpdyWebSocketStream::OnHeadersSent() { NOTREACHED(); } -int SpdyWebSocketStream::OnDataReceived(const char* data, int length) { +int SpdyWebSocketStream::OnDataReceived(scoped_ptr<SpdyBuffer> buffer) { DCHECK(delegate_); - delegate_->OnReceivedSpdyData(data, length); + delegate_->OnReceivedSpdyData(buffer.Pass()); return OK; } diff --git a/net/spdy/spdy_websocket_stream.h b/net/spdy/spdy_websocket_stream.h index 2cef1c6..02f6c7c 100644 --- a/net/spdy/spdy_websocket_stream.h +++ b/net/spdy/spdy_websocket_stream.h @@ -49,7 +49,7 @@ class NET_EXPORT_PRIVATE SpdyWebSocketStream virtual void OnSentSpdyData(size_t bytes_sent) = 0; // Called when data is received. - virtual void OnReceivedSpdyData(const char* data, int length) = 0; + virtual void OnReceivedSpdyData(scoped_ptr<SpdyBuffer> buffer) = 0; // Called when SpdyStream is closed. virtual void OnCloseSpdyStream() = 0; @@ -81,7 +81,7 @@ class NET_EXPORT_PRIVATE SpdyWebSocketStream base::Time response_time, int status) OVERRIDE; virtual void OnHeadersSent() OVERRIDE; - virtual int OnDataReceived(const char* data, int length) OVERRIDE; + virtual int OnDataReceived(scoped_ptr<SpdyBuffer> buffer) OVERRIDE; virtual void OnDataSent(size_t bytes_sent) OVERRIDE; virtual void OnClose(int status) OVERRIDE; diff --git a/net/spdy/spdy_websocket_stream_spdy2_unittest.cc b/net/spdy/spdy_websocket_stream_spdy2_unittest.cc index b9b8ce3..695359f 100644 --- a/net/spdy/spdy_websocket_stream_spdy2_unittest.cc +++ b/net/spdy/spdy_websocket_stream_spdy2_unittest.cc @@ -116,13 +116,19 @@ class SpdyWebSocketStreamEventRecorder : public SpdyWebSocketStream::Delegate { if (!on_sent_data_.is_null()) on_sent_data_.Run(&events_.back()); } - virtual void OnReceivedSpdyData(const char* data, int length) OVERRIDE { + virtual void OnReceivedSpdyData(scoped_ptr<SpdyBuffer> buffer) OVERRIDE { + std::string buffer_data; + size_t buffer_len = 0; + if (buffer) { + buffer_len = buffer->GetRemainingSize(); + buffer_data.append(buffer->GetRemainingData(), buffer_len); + } events_.push_back( SpdyWebSocketStreamEvent( SpdyWebSocketStreamEvent::EVENT_RECEIVED_DATA, SpdyHeaderBlock(), - length, - std::string(data, length))); + buffer_len, + buffer_data)); if (!on_received_data_.is_null()) on_received_data_.Run(&events_.back()); } diff --git a/net/spdy/spdy_websocket_stream_spdy3_unittest.cc b/net/spdy/spdy_websocket_stream_spdy3_unittest.cc index d008b1a..5b30a43 100644 --- a/net/spdy/spdy_websocket_stream_spdy3_unittest.cc +++ b/net/spdy/spdy_websocket_stream_spdy3_unittest.cc @@ -116,13 +116,19 @@ class SpdyWebSocketStreamEventRecorder : public SpdyWebSocketStream::Delegate { if (!on_sent_data_.is_null()) on_sent_data_.Run(&events_.back()); } - virtual void OnReceivedSpdyData(const char* data, int length) OVERRIDE { + virtual void OnReceivedSpdyData(scoped_ptr<SpdyBuffer> buffer) OVERRIDE { + std::string buffer_data; + size_t buffer_len = 0; + if (buffer) { + buffer_len = buffer->GetRemainingSize(); + buffer_data.append(buffer->GetRemainingData(), buffer_len); + } events_.push_back( SpdyWebSocketStreamEvent( SpdyWebSocketStreamEvent::EVENT_RECEIVED_DATA, SpdyHeaderBlock(), - length, - std::string(data, length))); + buffer_len, + buffer_data)); if (!on_received_data_.is_null()) on_received_data_.Run(&events_.back()); } diff --git a/net/spdy/spdy_write_queue.cc b/net/spdy/spdy_write_queue.cc index f33cf62..18ddc39 100644 --- a/net/spdy/spdy_write_queue.cc +++ b/net/spdy/spdy_write_queue.cc @@ -7,7 +7,8 @@ #include <cstddef> #include "base/logging.h" -#include "net/spdy/spdy_frame_producer.h" +#include "net/spdy/spdy_buffer.h" +#include "net/spdy/spdy_buffer_producer.h" #include "net/spdy/spdy_stream.h" namespace net { @@ -16,7 +17,7 @@ SpdyWriteQueue::PendingWrite::PendingWrite() : frame_producer(NULL) {} SpdyWriteQueue::PendingWrite::PendingWrite( SpdyFrameType frame_type, - SpdyFrameProducer* frame_producer, + SpdyBufferProducer* frame_producer, const scoped_refptr<SpdyStream>& stream) : frame_type(frame_type), frame_producer(frame_producer), @@ -32,7 +33,7 @@ SpdyWriteQueue::~SpdyWriteQueue() { void SpdyWriteQueue::Enqueue(RequestPriority priority, SpdyFrameType frame_type, - scoped_ptr<SpdyFrameProducer> frame_producer, + scoped_ptr<SpdyBufferProducer> frame_producer, const scoped_refptr<SpdyStream>& stream) { if (stream.get()) { DCHECK_EQ(stream->priority(), priority); @@ -42,7 +43,7 @@ void SpdyWriteQueue::Enqueue(RequestPriority priority, } bool SpdyWriteQueue::Dequeue(SpdyFrameType* frame_type, - scoped_ptr<SpdyFrameProducer>* frame_producer, + scoped_ptr<SpdyBufferProducer>* frame_producer, scoped_refptr<SpdyStream>* stream) { for (int i = NUM_PRIORITIES - 1; i >= 0; --i) { if (!queue_[i].empty()) { diff --git a/net/spdy/spdy_write_queue.h b/net/spdy/spdy_write_queue.h index 996fca75..1b8f1b8 100644 --- a/net/spdy/spdy_write_queue.h +++ b/net/spdy/spdy_write_queue.h @@ -16,10 +16,11 @@ namespace net { -class SpdyFrameProducer; +class SpdyBuffer; +class SpdyBufferProducer; class SpdyStream; -// A queue of SpdyFrameProducers to produce frames to write. Ordered +// A queue of SpdyBufferProducers to produce frames to write. Ordered // by priority, and then FIFO. class NET_EXPORT_PRIVATE SpdyWriteQueue { public: @@ -32,7 +33,7 @@ class NET_EXPORT_PRIVATE SpdyWriteQueue { // is non-NULL, its priority must be equal to |priority|. void Enqueue(RequestPriority priority, SpdyFrameType frame_type, - scoped_ptr<SpdyFrameProducer> frame_producer, + scoped_ptr<SpdyBufferProducer> frame_producer, const scoped_refptr<SpdyStream>& stream); // Dequeues the frame producer with the highest priority that was @@ -40,7 +41,7 @@ class NET_EXPORT_PRIVATE SpdyWriteQueue { // fills in |frame_type|, |frame_producer|, and |stream| if // successful -- otherwise, just returns false. bool Dequeue(SpdyFrameType* frame_type, - scoped_ptr<SpdyFrameProducer>* frame_producer, + scoped_ptr<SpdyBufferProducer>* frame_producer, scoped_refptr<SpdyStream>* stream); // Removes all pending writes for the given stream, which must be @@ -56,12 +57,12 @@ class NET_EXPORT_PRIVATE SpdyWriteQueue { SpdyFrameType frame_type; // This has to be a raw pointer since we store this in an STL // container. - SpdyFrameProducer* frame_producer; + SpdyBufferProducer* frame_producer; scoped_refptr<SpdyStream> stream; PendingWrite(); PendingWrite(SpdyFrameType frame_type, - SpdyFrameProducer* frame_producer, + SpdyBufferProducer* frame_producer, const scoped_refptr<SpdyStream>& stream); ~PendingWrite(); }; diff --git a/net/spdy/spdy_write_queue_unittest.cc b/net/spdy/spdy_write_queue_unittest.cc index e284619..c98074f 100644 --- a/net/spdy/spdy_write_queue_unittest.cc +++ b/net/spdy/spdy_write_queue_unittest.cc @@ -14,7 +14,7 @@ #include "base/strings/string_number_conversions.h" #include "net/base/net_log.h" #include "net/base/request_priority.h" -#include "net/spdy/spdy_frame_producer.h" +#include "net/spdy/spdy_buffer_producer.h" #include "net/spdy/spdy_stream.h" #include "testing/gtest/include/gtest/gtest.h" @@ -26,31 +26,33 @@ class SpdyWriteQueueTest : public ::testing::Test {}; // Makes a SpdyFrameProducer producing a frame with the data in the // given string. -scoped_ptr<SpdyFrameProducer> StringToProducer(const std::string& s) { +scoped_ptr<SpdyBufferProducer> StringToProducer(const std::string& s) { scoped_ptr<char[]> data(new char[s.size()]); std::memcpy(data.get(), s.data(), s.size()); - return scoped_ptr<SpdyFrameProducer>( - new SimpleFrameProducer( - scoped_ptr<SpdyFrame>( - new SpdyFrame(data.release(), s.size(), true)))); + return scoped_ptr<SpdyBufferProducer>( + new SimpleBufferProducer( + scoped_ptr<SpdyBuffer>( + new SpdyBuffer( + scoped_ptr<SpdyFrame>( + new SpdyFrame(data.release(), s.size(), true)))))); } -// Makes a SpdyFrameProducer producing a frame with the data in the +// Makes a SpdyBufferProducer producing a frame with the data in the // given int (converted to a string). -scoped_ptr<SpdyFrameProducer> IntToProducer(int i) { +scoped_ptr<SpdyBufferProducer> IntToProducer(int i) { return StringToProducer(base::IntToString(i)); } // Produces a frame with the given producer and returns a copy of its // data as a string. -std::string ProducerToString(scoped_ptr<SpdyFrameProducer> producer) { - scoped_ptr<SpdyFrame> frame = producer->ProduceFrame(); - return std::string(frame->data(), frame->size()); +std::string ProducerToString(scoped_ptr<SpdyBufferProducer> producer) { + scoped_ptr<SpdyBuffer> buffer = producer->ProduceBuffer(); + return std::string(buffer->GetRemainingData(), buffer->GetRemainingSize()); } // Produces a frame with the given producer and returns a copy of its // data as an int (converted from a string). -int ProducerToInt(scoped_ptr<SpdyFrameProducer> producer) { +int ProducerToInt(scoped_ptr<SpdyBufferProducer> producer) { int i = 0; EXPECT_TRUE(base::StringToInt(ProducerToString(producer.Pass()), &i)); return i; @@ -69,9 +71,9 @@ SpdyStream* MakeTestStream(RequestPriority priority) { TEST_F(SpdyWriteQueueTest, DequeuesByPriority) { SpdyWriteQueue write_queue; - scoped_ptr<SpdyFrameProducer> producer_low = StringToProducer("LOW"); - scoped_ptr<SpdyFrameProducer> producer_medium = StringToProducer("MEDIUM"); - scoped_ptr<SpdyFrameProducer> producer_highest = StringToProducer("HIGHEST"); + scoped_ptr<SpdyBufferProducer> producer_low = StringToProducer("LOW"); + scoped_ptr<SpdyBufferProducer> producer_medium = StringToProducer("MEDIUM"); + scoped_ptr<SpdyBufferProducer> producer_highest = StringToProducer("HIGHEST"); // A NULL stream should still work. scoped_refptr<SpdyStream> stream_low(NULL); @@ -86,7 +88,7 @@ TEST_F(SpdyWriteQueueTest, DequeuesByPriority) { HIGHEST, RST_STREAM, producer_highest.Pass(), stream_highest); SpdyFrameType frame_type = DATA; - scoped_ptr<SpdyFrameProducer> frame_producer; + scoped_ptr<SpdyBufferProducer> frame_producer; scoped_refptr<SpdyStream> stream; ASSERT_TRUE(write_queue.Dequeue(&frame_type, &frame_producer, &stream)); EXPECT_EQ(RST_STREAM, frame_type); @@ -111,9 +113,9 @@ TEST_F(SpdyWriteQueueTest, DequeuesByPriority) { TEST_F(SpdyWriteQueueTest, DequeuesFIFO) { SpdyWriteQueue write_queue; - scoped_ptr<SpdyFrameProducer> producer1 = IntToProducer(1); - scoped_ptr<SpdyFrameProducer> producer2 = IntToProducer(2); - scoped_ptr<SpdyFrameProducer> producer3 = IntToProducer(3); + scoped_ptr<SpdyBufferProducer> producer1 = IntToProducer(1); + scoped_ptr<SpdyBufferProducer> producer2 = IntToProducer(2); + scoped_ptr<SpdyBufferProducer> producer3 = IntToProducer(3); scoped_refptr<SpdyStream> stream1(MakeTestStream(DEFAULT_PRIORITY)); scoped_refptr<SpdyStream> stream2(MakeTestStream(DEFAULT_PRIORITY)); @@ -124,7 +126,7 @@ TEST_F(SpdyWriteQueueTest, DequeuesFIFO) { write_queue.Enqueue(DEFAULT_PRIORITY, RST_STREAM, producer3.Pass(), stream3); SpdyFrameType frame_type = DATA; - scoped_ptr<SpdyFrameProducer> frame_producer; + scoped_ptr<SpdyBufferProducer> frame_producer; scoped_refptr<SpdyStream> stream; ASSERT_TRUE(write_queue.Dequeue(&frame_type, &frame_producer, &stream)); EXPECT_EQ(SYN_STREAM, frame_type); @@ -162,7 +164,7 @@ TEST_F(SpdyWriteQueueTest, RemovePendingWritesForStream) { for (int i = 0; i < 100; i += 3) { SpdyFrameType frame_type = DATA; - scoped_ptr<SpdyFrameProducer> frame_producer; + scoped_ptr<SpdyBufferProducer> frame_producer; scoped_refptr<SpdyStream> stream; ASSERT_TRUE(write_queue.Dequeue(&frame_type, &frame_producer, &stream)); EXPECT_EQ(SYN_STREAM, frame_type); @@ -171,7 +173,7 @@ TEST_F(SpdyWriteQueueTest, RemovePendingWritesForStream) { } SpdyFrameType frame_type = DATA; - scoped_ptr<SpdyFrameProducer> frame_producer; + scoped_ptr<SpdyBufferProducer> frame_producer; scoped_refptr<SpdyStream> stream; EXPECT_FALSE(write_queue.Dequeue(&frame_type, &frame_producer, &stream)); } @@ -189,11 +191,11 @@ TEST_F(SpdyWriteQueueTest, Clear) { write_queue.Clear(); SpdyFrameType frame_type = DATA; - scoped_ptr<SpdyFrameProducer> frame_producer; + scoped_ptr<SpdyBufferProducer> frame_producer; scoped_refptr<SpdyStream> stream; EXPECT_FALSE(write_queue.Dequeue(&frame_type, &frame_producer, &stream)); } -} +} // namespace } // namespace net diff --git a/net/websockets/websocket_job.cc b/net/websockets/websocket_job.cc index 3aeb8aa..53e0eda 100644 --- a/net/websockets/websocket_job.cc +++ b/net/websockets/websocket_job.cc @@ -9,10 +9,10 @@ #include "base/bind.h" #include "base/lazy_instance.h" #include "googleurl/src/gurl.h" +#include "net/base/io_buffer.h" #include "net/base/net_errors.h" #include "net/base/net_log.h" #include "net/cookies/cookie_store.h" -#include "net/base/io_buffer.h" #include "net/http/http_network_session.h" #include "net/http/http_transaction_factory.h" #include "net/http/http_util.h" @@ -330,14 +330,19 @@ void WebSocketJob::OnSentSpdyData(size_t bytes_sent) { OnSentData(socket_, static_cast<int>(bytes_sent)); } -void WebSocketJob::OnReceivedSpdyData(const char* data, int length) { +void WebSocketJob::OnReceivedSpdyData(scoped_ptr<SpdyBuffer> buffer) { DCHECK_NE(INITIALIZED, state_); DCHECK_NE(CONNECTING, state_); if (state_ == CLOSED) return; if (!spdy_websocket_stream_.get()) return; - OnReceivedData(socket_, data, length); + if (buffer) { + OnReceivedData(socket_, buffer->GetRemainingData(), + buffer->GetRemainingSize()); + } else { + OnReceivedData(socket_, NULL, 0); + } } void WebSocketJob::OnCloseSpdyStream() { diff --git a/net/websockets/websocket_job.h b/net/websockets/websocket_job.h index 29f327c..02e4dfd 100644 --- a/net/websockets/websocket_job.h +++ b/net/websockets/websocket_job.h @@ -83,7 +83,7 @@ class NET_EXPORT WebSocketJob virtual int OnReceivedSpdyResponseHeader( const SpdyHeaderBlock& headers, int status) OVERRIDE; virtual void OnSentSpdyData(size_t bytes_sent) OVERRIDE; - virtual void OnReceivedSpdyData(const char* data, int length) OVERRIDE; + virtual void OnReceivedSpdyData(scoped_ptr<SpdyBuffer> buffer) OVERRIDE; virtual void OnCloseSpdyStream() OVERRIDE; private: |