summaryrefslogtreecommitdiffstats
path: root/net/spdy
diff options
context:
space:
mode:
Diffstat (limited to 'net/spdy')
-rw-r--r--net/spdy/spdy_buffer.cc61
-rw-r--r--net/spdy/spdy_buffer.h66
-rw-r--r--net/spdy/spdy_buffer_producer.cc27
-rw-r--r--net/spdy/spdy_buffer_producer.h50
-rw-r--r--net/spdy/spdy_buffer_unittest.cc88
-rw-r--r--net/spdy/spdy_frame_producer.cc26
-rw-r--r--net/spdy/spdy_frame_producer.h50
-rw-r--r--net/spdy/spdy_http_stream.cc45
-rw-r--r--net/spdy/spdy_http_stream.h6
-rw-r--r--net/spdy/spdy_io_buffer.cc38
-rw-r--r--net/spdy/spdy_io_buffer.h56
-rw-r--r--net/spdy/spdy_proxy_client_socket.cc66
-rw-r--r--net/spdy/spdy_proxy_client_socket.h12
-rw-r--r--net/spdy/spdy_read_queue.cc59
-rw-r--r--net/spdy/spdy_read_queue.h51
-rw-r--r--net/spdy/spdy_read_queue_unittest.cc106
-rw-r--r--net/spdy/spdy_session.cc122
-rw-r--r--net/spdy/spdy_session.h32
-rw-r--r--net/spdy/spdy_session_spdy2_unittest.cc1
-rw-r--r--net/spdy/spdy_session_spdy3_unittest.cc1
-rw-r--r--net/spdy/spdy_stream.cc73
-rw-r--r--net/spdy/spdy_stream.h16
-rw-r--r--net/spdy/spdy_stream_test_util.cc9
-rw-r--r--net/spdy/spdy_stream_test_util.h4
-rw-r--r--net/spdy/spdy_websocket_stream.cc6
-rw-r--r--net/spdy/spdy_websocket_stream.h4
-rw-r--r--net/spdy/spdy_websocket_stream_spdy2_unittest.cc12
-rw-r--r--net/spdy/spdy_websocket_stream_spdy3_unittest.cc12
-rw-r--r--net/spdy/spdy_write_queue.cc9
-rw-r--r--net/spdy/spdy_write_queue.h13
-rw-r--r--net/spdy/spdy_write_queue_unittest.cc50
31 files changed, 758 insertions, 413 deletions
diff --git a/net/spdy/spdy_buffer.cc b/net/spdy/spdy_buffer.cc
new file mode 100644
index 0000000..3257bef
--- /dev/null
+++ b/net/spdy/spdy_buffer.cc
@@ -0,0 +1,61 @@
+// Copyright (c) 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "net/spdy/spdy_buffer.h"
+
+#include <cstring>
+
+#include "base/logging.h"
+#include "net/base/io_buffer.h"
+#include "net/spdy/spdy_protocol.h"
+
+namespace net {
+
+namespace {
+
+// Makes a SpdyFrame with |size| bytes of data copied from
+// |data|. |data| must be non-NULL and |size| must be positive.
+scoped_ptr<SpdyFrame> MakeSpdyFrame(const char* data, size_t size) {
+ DCHECK(data);
+ DCHECK_GT(size, 0u);
+ scoped_array<char> frame_data(new char[size]);
+ std::memcpy(frame_data.get(), data, size);
+ scoped_ptr<SpdyFrame> frame(
+ new SpdyFrame(frame_data.release(), size, true /* owns_buffer */));
+ return frame.Pass();
+}
+
+} // namespace
+
+SpdyBuffer::SpdyBuffer(scoped_ptr<SpdyFrame> frame)
+ : frame_(frame.Pass()),
+ offset_(0) {}
+
+// The given data may not be strictly a SPDY frame; we (ab)use
+// |frame_| just as a container.
+SpdyBuffer::SpdyBuffer(const char* data, size_t size) :
+ frame_(MakeSpdyFrame(data, size)),
+ offset_(0) {}
+
+SpdyBuffer::~SpdyBuffer() {}
+
+const char* SpdyBuffer::GetRemainingData() const {
+ return frame_->data() + offset_;
+}
+
+size_t SpdyBuffer::GetRemainingSize() const {
+ return frame_->size() - offset_;
+}
+
+void SpdyBuffer::Consume(size_t consume_size) {
+ DCHECK_GE(consume_size, 1u);
+ DCHECK_LE(consume_size, GetRemainingSize());
+ offset_ += consume_size;
+};
+
+IOBuffer* SpdyBuffer::GetIOBufferForRemainingData() {
+ return new WrappedIOBuffer(GetRemainingData());
+}
+
+} // namespace net
diff --git a/net/spdy/spdy_buffer.h b/net/spdy/spdy_buffer.h
new file mode 100644
index 0000000..c4c5b8f
--- /dev/null
+++ b/net/spdy/spdy_buffer.h
@@ -0,0 +1,66 @@
+// Copyright (c) 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef NET_SPDY_SPDY_BUFFER_H_
+#define NET_SPDY_SPDY_BUFFER_H_
+
+#include <cstddef>
+
+#include "base/basictypes.h"
+#include "base/memory/scoped_ptr.h"
+#include "net/base/net_export.h"
+
+namespace net {
+
+class IOBuffer;
+class SpdyFrame;
+
+// SpdyBuffer is a class to hold data read from or to be written to a
+// SPDY connection. It is similar to a DrainableIOBuffer but is not
+// ref-counted and will include a way to get notified when Consume()
+// is called.
+//
+// NOTE(akalin): This explicitly does not inherit from IOBuffer to
+// avoid the needless ref-counting and to avoid working around the
+// fact that IOBuffer member functions are not virtual.
+class NET_EXPORT_PRIVATE SpdyBuffer {
+ public:
+ // Construct with the data in the given frame. Assumes that data is
+ // owned by |frame| or outlives it.
+ explicit SpdyBuffer(scoped_ptr<SpdyFrame> frame);
+
+ // Construct with a copy of the given raw data. |data| must be
+ // non-NULL and |size| must be non-zero.
+ SpdyBuffer(const char* data, size_t size);
+
+ ~SpdyBuffer();
+
+ // Returns the remaining (unconsumed) data.
+ const char* GetRemainingData() const;
+
+ // Returns the number of remaining (unconsumed) bytes.
+ size_t GetRemainingSize() const;
+
+ // Consume the given number of bytes, which must be positive but not
+ // greater than GetRemainingSize().
+ //
+ // TODO(akalin): Add a way to get notified when Consume() is called.
+ void Consume(size_t consume_size);
+
+ // Returns an IOBuffer pointing to the data starting at
+ // GetRemainingData(). Use with care; the returned IOBuffer must not
+ // be used past the lifetime of this object, and it is not updated
+ // when Consume() is called.
+ IOBuffer* GetIOBufferForRemainingData();
+
+ private:
+ const scoped_ptr<SpdyFrame> frame_;
+ size_t offset_;
+
+ DISALLOW_COPY_AND_ASSIGN(SpdyBuffer);
+};
+
+} // namespace net
+
+#endif // NET_SPDY_SPDY_BUFFER_H_
diff --git a/net/spdy/spdy_buffer_producer.cc b/net/spdy/spdy_buffer_producer.cc
new file mode 100644
index 0000000..3a9598c
--- /dev/null
+++ b/net/spdy/spdy_buffer_producer.cc
@@ -0,0 +1,27 @@
+// Copyright (c) 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "net/spdy/spdy_buffer_producer.h"
+
+#include "base/logging.h"
+#include "net/spdy/spdy_buffer.h"
+#include "net/spdy/spdy_protocol.h"
+
+namespace net {
+
+SpdyBufferProducer::SpdyBufferProducer() {}
+
+SpdyBufferProducer::~SpdyBufferProducer() {}
+
+SimpleBufferProducer::SimpleBufferProducer(scoped_ptr<SpdyBuffer> buffer)
+ : buffer_(buffer.Pass()) {}
+
+SimpleBufferProducer::~SimpleBufferProducer() {}
+
+scoped_ptr<SpdyBuffer> SimpleBufferProducer::ProduceBuffer() {
+ DCHECK(buffer_);
+ return buffer_.Pass();
+}
+
+} // namespace net
diff --git a/net/spdy/spdy_buffer_producer.h b/net/spdy/spdy_buffer_producer.h
new file mode 100644
index 0000000..fe82b1a
--- /dev/null
+++ b/net/spdy/spdy_buffer_producer.h
@@ -0,0 +1,50 @@
+// Copyright (c) 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef NET_SPDY_SPDY_BUFFER_PRODUCER_H_
+#define NET_SPDY_SPDY_BUFFER_PRODUCER_H_
+
+#include "base/basictypes.h"
+#include "base/compiler_specific.h"
+#include "base/memory/scoped_ptr.h"
+#include "net/base/net_export.h"
+
+namespace net {
+
+class SpdyBuffer;
+
+// An object which provides a SpdyBuffer for writing. We pass these
+// around instead of SpdyBuffers since some buffers have to be
+// generated "just in time".
+class NET_EXPORT_PRIVATE SpdyBufferProducer {
+ public:
+ SpdyBufferProducer();
+
+ // Produces the buffer to be written. Will be called at most once.
+ virtual scoped_ptr<SpdyBuffer> ProduceBuffer() = 0;
+
+ virtual ~SpdyBufferProducer();
+
+ private:
+ DISALLOW_COPY_AND_ASSIGN(SpdyBufferProducer);
+};
+
+// A simple wrapper around a single SpdyBuffer.
+class NET_EXPORT_PRIVATE SimpleBufferProducer : public SpdyBufferProducer {
+ public:
+ explicit SimpleBufferProducer(scoped_ptr<SpdyBuffer> buffer);
+
+ virtual ~SimpleBufferProducer();
+
+ virtual scoped_ptr<SpdyBuffer> ProduceBuffer() OVERRIDE;
+
+ private:
+ scoped_ptr<SpdyBuffer> buffer_;
+
+ DISALLOW_COPY_AND_ASSIGN(SimpleBufferProducer);
+};
+
+} // namespace net
+
+#endif // NET_SPDY_SPDY_BUFFER_PRODUCER_H_
diff --git a/net/spdy/spdy_buffer_unittest.cc b/net/spdy/spdy_buffer_unittest.cc
new file mode 100644
index 0000000..fbd7705
--- /dev/null
+++ b/net/spdy/spdy_buffer_unittest.cc
@@ -0,0 +1,88 @@
+// Copyright (c) 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "net/spdy/spdy_buffer.h"
+
+#include <cstddef>
+#include <string>
+
+#include "base/basictypes.h"
+#include "base/memory/ref_counted.h"
+#include "base/memory/scoped_ptr.h"
+#include "net/base/io_buffer.h"
+#include "net/spdy/spdy_protocol.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+namespace net {
+
+namespace {
+
+const char kData[] = "hello!\0hi.";
+const size_t kDataSize = arraysize(kData);
+
+class SpdyBufferTest : public ::testing::Test {};
+
+// Make a string from the data remaining in |buffer|.
+std::string BufferToString(const SpdyBuffer& buffer) {
+ return std::string(buffer.GetRemainingData(), buffer.GetRemainingSize());
+}
+
+// Construct a SpdyBuffer from a SpdyFrame and make sure its data
+// points to the frame's underlying data.
+TEST_F(SpdyBufferTest, FrameConstructor) {
+ SpdyBuffer buffer(
+ scoped_ptr<SpdyFrame>(
+ new SpdyFrame(const_cast<char*>(kData), kDataSize,
+ false /* owns_buffer */)));
+
+ EXPECT_EQ(kData, buffer.GetRemainingData());
+ EXPECT_EQ(kDataSize, buffer.GetRemainingSize());
+}
+
+// Construct a SpdyBuffer from a const char*/size_t pair and make sure
+// it makes a copy of the data.
+TEST_F(SpdyBufferTest, DataConstructor) {
+ std::string data(kData, kDataSize);
+ SpdyBuffer buffer(data.data(), data.size());
+ // This mutation shouldn't affect |buffer|'s data.
+ data[0] = 'H';
+
+ EXPECT_NE(kData, buffer.GetRemainingData());
+ EXPECT_EQ(kDataSize, buffer.GetRemainingSize());
+ EXPECT_EQ(std::string(kData, kDataSize), BufferToString(buffer));
+}
+
+// Construct a SpdyBuffer and call Consume() on it, which should
+// update the remaining data pointer and size appropriately.
+TEST_F(SpdyBufferTest, Consume) {
+ SpdyBuffer buffer(kData, kDataSize);
+
+ EXPECT_EQ(std::string(kData, kDataSize), BufferToString(buffer));
+
+ buffer.Consume(5);
+ EXPECT_EQ(std::string(kData + 5, kDataSize - 5), BufferToString(buffer));
+
+ buffer.Consume(kDataSize - 5);
+ EXPECT_EQ(0u, buffer.GetRemainingSize());
+}
+
+// Make sure the IOBuffer returned by GetIOBufferForRemainingData()
+// points to the buffer's remaining data and isn't updated by
+// Consume().
+TEST_F(SpdyBufferTest, GetIOBufferForRemainingData) {
+ SpdyBuffer buffer(kData, kDataSize);
+
+ buffer.Consume(5);
+ scoped_refptr<IOBuffer> io_buffer = buffer.GetIOBufferForRemainingData();
+ size_t io_buffer_size = buffer.GetRemainingSize();
+ const std::string expectedData(kData + 5, kDataSize - 5);
+ EXPECT_EQ(expectedData, std::string(io_buffer->data(), io_buffer_size));
+
+ buffer.Consume(kDataSize - 5);
+ EXPECT_EQ(expectedData, std::string(io_buffer->data(), io_buffer_size));
+}
+
+} // namespace
+
+} // namespace net
diff --git a/net/spdy/spdy_frame_producer.cc b/net/spdy/spdy_frame_producer.cc
deleted file mode 100644
index dc16744..0000000
--- a/net/spdy/spdy_frame_producer.cc
+++ /dev/null
@@ -1,26 +0,0 @@
-// Copyright (c) 2013 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#include "net/spdy/spdy_frame_producer.h"
-
-#include "base/logging.h"
-#include "net/spdy/spdy_protocol.h"
-
-namespace net {
-
-SpdyFrameProducer::SpdyFrameProducer() {}
-
-SpdyFrameProducer::~SpdyFrameProducer() {}
-
-SimpleFrameProducer::SimpleFrameProducer(scoped_ptr<SpdyFrame> frame)
- : frame_(frame.Pass()) {}
-
-SimpleFrameProducer::~SimpleFrameProducer() {}
-
-scoped_ptr<SpdyFrame> SimpleFrameProducer::ProduceFrame() {
- DCHECK(frame_);
- return frame_.Pass();
-}
-
-} // namespace net
diff --git a/net/spdy/spdy_frame_producer.h b/net/spdy/spdy_frame_producer.h
deleted file mode 100644
index 9db20ce..0000000
--- a/net/spdy/spdy_frame_producer.h
+++ /dev/null
@@ -1,50 +0,0 @@
-// Copyright (c) 2013 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#ifndef NET_SPDY_SPDY_FRAME_PRODUCER_H_
-#define NET_SPDY_SPDY_FRAME_PRODUCER_H_
-
-#include "base/basictypes.h"
-#include "base/compiler_specific.h"
-#include "base/memory/scoped_ptr.h"
-#include "net/base/net_export.h"
-
-namespace net {
-
-class SpdyFrame;
-
-// An object which provides a SpdyFrame for writing. We pass these
-// around instead of SpdyFrames since some frames have to be generated
-// "just in time".
-class NET_EXPORT_PRIVATE SpdyFrameProducer {
- public:
- SpdyFrameProducer();
-
- // Produces the frame to be written. Will be called at most once.
- virtual scoped_ptr<SpdyFrame> ProduceFrame() = 0;
-
- virtual ~SpdyFrameProducer();
-
- private:
- DISALLOW_COPY_AND_ASSIGN(SpdyFrameProducer);
-};
-
-// A simple wrapper around a single SpdyFrame.
-class NET_EXPORT_PRIVATE SimpleFrameProducer : public SpdyFrameProducer {
- public:
- explicit SimpleFrameProducer(scoped_ptr<SpdyFrame> frame);
-
- virtual ~SimpleFrameProducer();
-
- virtual scoped_ptr<SpdyFrame> ProduceFrame() OVERRIDE;
-
- private:
- scoped_ptr<SpdyFrame> frame_;
-
- DISALLOW_COPY_AND_ASSIGN(SimpleFrameProducer);
-};
-
-} // namespace net
-
-#endif // NET_SPDY_SPDY_FRAME_PRODUCER_H_
diff --git a/net/spdy/spdy_http_stream.cc b/net/spdy/spdy_http_stream.cc
index 4eb82bf..9053bc3 100644
--- a/net/spdy/spdy_http_stream.cc
+++ b/net/spdy/spdy_http_stream.cc
@@ -135,28 +135,11 @@ int SpdyHttpStream::ReadResponseBody(
CHECK(!callback.is_null());
// If we have data buffered, complete the IO immediately.
- if (!response_body_.empty()) {
- int bytes_read = 0;
- while (!response_body_.empty() && buf_len > 0) {
- scoped_refptr<IOBufferWithSize> data = response_body_.front();
- const int bytes_to_copy = std::min(buf_len, data->size());
- memcpy(&(buf->data()[bytes_read]), data->data(), bytes_to_copy);
- buf_len -= bytes_to_copy;
- if (bytes_to_copy == data->size()) {
- response_body_.pop_front();
- } else {
- const int bytes_remaining = data->size() - bytes_to_copy;
- IOBufferWithSize* new_buffer = new IOBufferWithSize(bytes_remaining);
- memcpy(new_buffer->data(), &(data->data()[bytes_to_copy]),
- bytes_remaining);
- response_body_.pop_front();
- response_body_.push_front(make_scoped_refptr(new_buffer));
- }
- bytes_read += bytes_to_copy;
- }
+ if (!response_body_queue_.IsEmpty()) {
+ size_t bytes_consumed = response_body_queue_.Dequeue(buf->data(), buf_len);
if (stream_)
- stream_->IncreaseRecvWindowSize(bytes_read);
- return bytes_read;
+ stream_->IncreaseRecvWindowSize(bytes_consumed);
+ return bytes_consumed;
} else if (stream_closed_) {
return closed_stream_status_;
}
@@ -434,7 +417,7 @@ void SpdyHttpStream::OnHeadersSent() {
NOTREACHED();
}
-int SpdyHttpStream::OnDataReceived(const char* data, int length) {
+int SpdyHttpStream::OnDataReceived(scoped_ptr<SpdyBuffer> buffer) {
// SpdyStream won't call us with data if the header block didn't contain a
// valid set of headers. So we don't expect to not have headers received
// here.
@@ -446,11 +429,8 @@ int SpdyHttpStream::OnDataReceived(const char* data, int length) {
// happen for server initiated streams.
DCHECK(stream_.get());
DCHECK(!stream_->closed() || stream_->pushed());
- if (length > 0) {
- // Save the received data.
- IOBufferWithSize* io_buffer = new IOBufferWithSize(length);
- memcpy(io_buffer->data(), data, length);
- response_body_.push_back(make_scoped_refptr(io_buffer));
+ if (buffer) {
+ response_body_queue_.Enqueue(buffer.Pass());
if (user_buffer_) {
// Handing small chunks of data to the caller creates measurable overhead.
@@ -509,14 +489,9 @@ bool SpdyHttpStream::ShouldWaitForMoreBufferedData() const {
if (stream_closed_)
return false;
- int bytes_buffered = 0;
- std::list<scoped_refptr<IOBufferWithSize> >::const_iterator it;
- for (it = response_body_.begin();
- it != response_body_.end() && bytes_buffered < user_buffer_len_;
- ++it)
- bytes_buffered += (*it)->size();
-
- return bytes_buffered < user_buffer_len_;
+ DCHECK_GT(user_buffer_len_, 0);
+ return response_body_queue_.GetTotalSize() <
+ static_cast<size_t>(user_buffer_len_);
}
bool SpdyHttpStream::DoBufferedReadCallback() {
diff --git a/net/spdy/spdy_http_stream.h b/net/spdy/spdy_http_stream.h
index 226d01c..2af82f0 100644
--- a/net/spdy/spdy_http_stream.h
+++ b/net/spdy/spdy_http_stream.h
@@ -13,6 +13,7 @@
#include "net/base/completion_callback.h"
#include "net/base/net_log.h"
#include "net/http/http_stream.h"
+#include "net/spdy/spdy_read_queue.h"
#include "net/spdy/spdy_stream.h"
namespace net {
@@ -89,7 +90,7 @@ class NET_EXPORT_PRIVATE SpdyHttpStream : public SpdyStream::Delegate,
base::Time response_time,
int status) OVERRIDE;
virtual void OnHeadersSent() OVERRIDE;
- virtual int OnDataReceived(const char* buffer, int bytes) OVERRIDE;
+ virtual int OnDataReceived(scoped_ptr<SpdyBuffer> buffer) OVERRIDE;
virtual void OnDataSent(size_t bytes_sent) OVERRIDE;
virtual void OnClose(int status) OVERRIDE;
@@ -139,8 +140,7 @@ class NET_EXPORT_PRIVATE SpdyHttpStream : public SpdyStream::Delegate,
bool response_headers_received_; // Indicates waiting for more HEADERS.
// We buffer the response body as it arrives asynchronously from the stream.
- // TODO(mbelshe): is this infinite buffering?
- std::list<scoped_refptr<IOBufferWithSize> > response_body_;
+ SpdyReadQueue response_body_queue_;
CompletionCallback callback_;
diff --git a/net/spdy/spdy_io_buffer.cc b/net/spdy/spdy_io_buffer.cc
deleted file mode 100644
index 441b033..0000000
--- a/net/spdy/spdy_io_buffer.cc
+++ /dev/null
@@ -1,38 +0,0 @@
-// Copyright (c) 2012 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#include "net/spdy/spdy_io_buffer.h"
-#include "net/spdy/spdy_stream.h"
-
-namespace net {
-
-SpdyIOBuffer::SpdyIOBuffer() {}
-
-SpdyIOBuffer::SpdyIOBuffer(IOBuffer* buffer, int size, SpdyStream* stream)
- : buffer_(new DrainableIOBuffer(buffer, size)), stream_(stream) {}
-
-SpdyIOBuffer::SpdyIOBuffer(const SpdyIOBuffer& rhs) {
- buffer_ = rhs.buffer_;
- stream_ = rhs.stream_;
-}
-
-SpdyIOBuffer::~SpdyIOBuffer() {}
-
-SpdyIOBuffer& SpdyIOBuffer::operator=(const SpdyIOBuffer& rhs) {
- buffer_ = rhs.buffer_;
- stream_ = rhs.stream_;
- return *this;
-}
-
-void SpdyIOBuffer::Swap(SpdyIOBuffer* other) {
- buffer_.swap(other->buffer_);
- stream_.swap(other->stream_);
-}
-
-void SpdyIOBuffer::Release() {
- buffer_ = NULL;
- stream_ = NULL;
-}
-
-} // namespace net
diff --git a/net/spdy/spdy_io_buffer.h b/net/spdy/spdy_io_buffer.h
deleted file mode 100644
index ed95e70..0000000
--- a/net/spdy/spdy_io_buffer.h
+++ /dev/null
@@ -1,56 +0,0 @@
-// Copyright (c) 2012 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#ifndef NET_SPDY_SPDY_IO_BUFFER_H_
-#define NET_SPDY_SPDY_IO_BUFFER_H_
-
-#include "base/memory/ref_counted.h"
-#include "net/base/io_buffer.h"
-#include "net/base/net_export.h"
-
-namespace net {
-
-class SpdyStream;
-
-// A class for managing SPDY write buffers. These write buffers need
-// to track the SpdyStream which they are associated with so that the
-// session can activate the stream lazily and also notify the stream
-// on completion of the write.
-class NET_EXPORT_PRIVATE SpdyIOBuffer {
- public:
- SpdyIOBuffer();
-
- // Constructor
- // |buffer| is the actual data buffer.
- // |size| is the size of the data buffer.
- // |stream| is a pointer to the stream which is managing this buffer
- // (can be NULL if the write is for the session itself).
- SpdyIOBuffer(IOBuffer* buffer, int size, SpdyStream* stream);
-
- // Declare this instead of using the default so that we avoid needing to
- // include spdy_stream.h.
- SpdyIOBuffer(const SpdyIOBuffer& rhs);
-
- ~SpdyIOBuffer();
-
- // Declare this instead of using the default so that we avoid needing to
- // include spdy_stream.h.
- SpdyIOBuffer& operator=(const SpdyIOBuffer& rhs);
-
- void Swap(SpdyIOBuffer* other);
-
- void Release();
-
- // Accessors.
- DrainableIOBuffer* buffer() const { return buffer_; }
- const scoped_refptr<SpdyStream>& stream() const { return stream_; }
-
- private:
- scoped_refptr<DrainableIOBuffer> buffer_;
- scoped_refptr<SpdyStream> stream_;
-};
-
-} // namespace net
-
-#endif // NET_SPDY_SPDY_IO_BUFFER_H_
diff --git a/net/spdy/spdy_proxy_client_socket.cc b/net/spdy/spdy_proxy_client_socket.cc
index 7978d60..c09a5bd 100644
--- a/net/spdy/spdy_proxy_client_socket.cc
+++ b/net/spdy/spdy_proxy_client_socket.cc
@@ -39,7 +39,7 @@ SpdyProxyClientSocket::SpdyProxyClientSocket(
GURL("https://" + proxy_server.ToString()),
auth_cache,
auth_handler_factory)),
- user_buffer_(NULL),
+ user_buffer_len_(0),
write_buffer_len_(0),
write_bytes_outstanding_(0),
ALLOW_THIS_IN_INITIALIZER_LIST(weak_factory_(this)),
@@ -127,8 +127,9 @@ int SpdyProxyClientSocket::Connect(const CompletionCallback& callback) {
}
void SpdyProxyClientSocket::Disconnect() {
- read_buffer_.clear();
+ read_buffer_queue_.Clear();
user_buffer_ = NULL;
+ user_buffer_len_ = 0;
read_callback_.Reset();
write_buffer_len_ = 0;
@@ -150,7 +151,8 @@ bool SpdyProxyClientSocket::IsConnected() const {
}
bool SpdyProxyClientSocket::IsConnectedAndIdle() const {
- return IsConnected() && read_buffer_.empty() && spdy_stream_->is_idle();
+ return IsConnected() && read_buffer_queue_.IsEmpty() &&
+ spdy_stream_->is_idle();
}
const BoundNetLog& SpdyProxyClientSocket::NetLog() const {
@@ -196,15 +198,16 @@ int SpdyProxyClientSocket::Read(IOBuffer* buf, int buf_len,
if (next_state_ == STATE_DISCONNECTED)
return ERR_SOCKET_NOT_CONNECTED;
- if (next_state_ == STATE_CLOSED && read_buffer_.empty()) {
+ if (next_state_ == STATE_CLOSED && read_buffer_queue_.IsEmpty()) {
return 0;
}
DCHECK(next_state_ == STATE_OPEN || next_state_ == STATE_CLOSED);
DCHECK(buf);
- user_buffer_ = new DrainableIOBuffer(buf, buf_len);
- int result = PopulateUserReadBuffer();
+ size_t result = PopulateUserReadBuffer(buf->data(), buf_len);
if (result == 0) {
+ user_buffer_ = buf;
+ user_buffer_len_ = static_cast<size_t>(buf_len);
DCHECK(!callback.is_null());
read_callback_ = callback;
return ERR_IO_PENDING;
@@ -213,30 +216,13 @@ int SpdyProxyClientSocket::Read(IOBuffer* buf, int buf_len,
return result;
}
-int SpdyProxyClientSocket::PopulateUserReadBuffer() {
- if (!user_buffer_)
- return ERR_IO_PENDING;
+size_t SpdyProxyClientSocket::PopulateUserReadBuffer(char* data, size_t len) {
+ size_t bytes_consumed = read_buffer_queue_.Dequeue(data, len);
- int bytes_read = 0;
- while (!read_buffer_.empty() && user_buffer_->BytesRemaining() > 0) {
- scoped_refptr<DrainableIOBuffer> data = read_buffer_.front();
- const int bytes_to_copy = std::min(user_buffer_->BytesRemaining(),
- data->BytesRemaining());
- memcpy(user_buffer_->data(), data->data(), bytes_to_copy);
- user_buffer_->DidConsume(bytes_to_copy);
- bytes_read += bytes_to_copy;
- if (data->BytesRemaining() == bytes_to_copy) {
- // Consumed all data from this buffer
- read_buffer_.pop_front();
- } else {
- data->DidConsume(bytes_to_copy);
- }
- }
+ if (bytes_consumed > 0 && spdy_stream_)
+ spdy_stream_->IncreaseRecvWindowSize(bytes_consumed);
- if (bytes_read > 0 && spdy_stream_)
- spdy_stream_->IncreaseRecvWindowSize(bytes_read);
-
- return user_buffer_->BytesConsumed();
+ return bytes_consumed;
}
int SpdyProxyClientSocket::Write(IOBuffer* buf, int buf_len,
@@ -526,23 +512,23 @@ void SpdyProxyClientSocket::OnHeadersSent() {
NOTREACHED();
}
-// Called when data is received.
-int SpdyProxyClientSocket::OnDataReceived(const char* data, int length) {
- net_log_.AddByteTransferEvent(NetLog::TYPE_SOCKET_BYTES_RECEIVED,
- length, data);
- if (length > 0) {
- // Save the received data.
- scoped_refptr<IOBuffer> io_buffer(new IOBuffer(length));
- memcpy(io_buffer->data(), data, length);
- read_buffer_.push_back(
- make_scoped_refptr(new DrainableIOBuffer(io_buffer, length)));
+// Called when data is received or on EOF (if |buffer| is NULL).
+int SpdyProxyClientSocket::OnDataReceived(scoped_ptr<SpdyBuffer> buffer) {
+ if (buffer) {
+ net_log_.AddByteTransferEvent(NetLog::TYPE_SOCKET_BYTES_RECEIVED,
+ buffer->GetRemainingSize(),
+ buffer->GetRemainingData());
+ read_buffer_queue_.Enqueue(buffer.Pass());
+ } else {
+ net_log_.AddByteTransferEvent(NetLog::TYPE_SOCKET_BYTES_RECEIVED, 0, NULL);
}
if (!read_callback_.is_null()) {
- int rv = PopulateUserReadBuffer();
+ int rv = PopulateUserReadBuffer(user_buffer_->data(), user_buffer_len_);
CompletionCallback c = read_callback_;
read_callback_.Reset();
user_buffer_ = NULL;
+ user_buffer_len_ = 0;
c.Run(rv);
}
return OK;
@@ -591,7 +577,7 @@ void SpdyProxyClientSocket::OnClose(int status) {
read_callback.Run(status);
} else if (!read_callback_.is_null()) {
// If we have a read_callback_, the we need to make sure we call it back.
- OnDataReceived(NULL, 0);
+ OnDataReceived(scoped_ptr<SpdyBuffer>());
}
// This may have been deleted by read_callback_, so check first.
if (weak_ptr && !write_callback.is_null())
diff --git a/net/spdy/spdy_proxy_client_socket.h b/net/spdy/spdy_proxy_client_socket.h
index 8d3f047..e25e1e3 100644
--- a/net/spdy/spdy_proxy_client_socket.h
+++ b/net/spdy/spdy_proxy_client_socket.h
@@ -5,8 +5,8 @@
#ifndef NET_SPDY_SPDY_PROXY_CLIENT_SOCKET_H_
#define NET_SPDY_SPDY_PROXY_CLIENT_SOCKET_H_
-#include <string>
#include <list>
+#include <string>
#include "base/basictypes.h"
#include "base/memory/ref_counted.h"
@@ -21,6 +21,7 @@
#include "net/http/proxy_client_socket.h"
#include "net/spdy/spdy_http_stream.h"
#include "net/spdy/spdy_protocol.h"
+#include "net/spdy/spdy_read_queue.h"
#include "net/spdy/spdy_session.h"
#include "net/spdy/spdy_stream.h"
@@ -97,7 +98,7 @@ class NET_EXPORT_PRIVATE SpdyProxyClientSocket : public ProxyClientSocket,
base::Time response_time,
int status) OVERRIDE;
virtual void OnHeadersSent() OVERRIDE;
- virtual int OnDataReceived(const char* data, int length) OVERRIDE;
+ virtual int OnDataReceived(scoped_ptr<SpdyBuffer> buffer) OVERRIDE;
virtual void OnDataSent(size_t bytes_sent) OVERRIDE;
virtual void OnClose(int status) OVERRIDE;
@@ -126,7 +127,7 @@ class NET_EXPORT_PRIVATE SpdyProxyClientSocket : public ProxyClientSocket,
// Populates |user_buffer_| with as much read data as possible
// and returns the number of bytes read.
- int PopulateUserReadBuffer();
+ size_t PopulateUserReadBuffer(char* out, size_t len);
State next_state_;
@@ -149,10 +150,11 @@ class NET_EXPORT_PRIVATE SpdyProxyClientSocket : public ProxyClientSocket,
scoped_refptr<HttpAuthController> auth_;
// We buffer the response body as it arrives asynchronously from the stream.
- std::list<scoped_refptr<DrainableIOBuffer> > read_buffer_;
+ SpdyReadQueue read_buffer_queue_;
// User provided buffer for the Read() response.
- scoped_refptr<DrainableIOBuffer> user_buffer_;
+ scoped_refptr<IOBuffer> user_buffer_;
+ size_t user_buffer_len_;
// User specified number of bytes to be written.
int write_buffer_len_;
diff --git a/net/spdy/spdy_read_queue.cc b/net/spdy/spdy_read_queue.cc
new file mode 100644
index 0000000..36f1e06
--- /dev/null
+++ b/net/spdy/spdy_read_queue.cc
@@ -0,0 +1,59 @@
+// Copyright (c) 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "net/spdy/spdy_read_queue.h"
+
+#include "base/logging.h"
+#include "base/stl_util.h"
+#include "net/spdy/spdy_buffer.h"
+
+namespace net {
+
+SpdyReadQueue::SpdyReadQueue() : total_size_(0) {}
+
+SpdyReadQueue::~SpdyReadQueue() {
+ Clear();
+}
+
+bool SpdyReadQueue::IsEmpty() const {
+ DCHECK_EQ(queue_.empty(), total_size_ == 0);
+ return queue_.empty();
+}
+
+size_t SpdyReadQueue::GetTotalSize() const {
+ return total_size_;
+}
+
+void SpdyReadQueue::Enqueue(scoped_ptr<SpdyBuffer> buffer) {
+ DCHECK_GT(buffer->GetRemainingSize(), 0u);
+ total_size_ += buffer->GetRemainingSize();
+ queue_.push_back(buffer.release());
+}
+
+size_t SpdyReadQueue::Dequeue(char* out, size_t len) {
+ DCHECK_GT(len, 0u);
+ size_t bytes_copied = 0;
+ while (!queue_.empty() && bytes_copied < len) {
+ SpdyBuffer* buffer = queue_.front();
+ size_t bytes_to_copy =
+ std::min(len - bytes_copied, buffer->GetRemainingSize());
+ memcpy(out + bytes_copied, buffer->GetRemainingData(), bytes_to_copy);
+ bytes_copied += bytes_to_copy;
+ if (bytes_to_copy == buffer->GetRemainingSize()) {
+ delete queue_.front();
+ queue_.pop_front();
+ } else {
+ buffer->Consume(bytes_to_copy);
+ }
+ }
+ total_size_ -= bytes_copied;
+ return bytes_copied;
+}
+
+void SpdyReadQueue::Clear() {
+ STLDeleteElements(&queue_);
+ queue_.clear();
+}
+
+} // namespace
diff --git a/net/spdy/spdy_read_queue.h b/net/spdy/spdy_read_queue.h
new file mode 100644
index 0000000..65f3daf
--- /dev/null
+++ b/net/spdy/spdy_read_queue.h
@@ -0,0 +1,51 @@
+// Copyright (c) 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef NET_SPDY_SPDY_BUFFER_QUEUE_H_
+#define NET_SPDY_SPDY_BUFFER_QUEUE_H_
+
+#include <cstddef>
+#include <deque>
+
+#include "base/basictypes.h"
+#include "base/memory/scoped_ptr.h"
+#include "net/base/net_export.h"
+
+namespace net {
+
+class SpdyBuffer;
+
+// A FIFO queue of incoming data from a SPDY connection. Useful for
+// SpdyStream delegates.
+class NET_EXPORT_PRIVATE SpdyReadQueue {
+ public:
+ SpdyReadQueue();
+ ~SpdyReadQueue();
+
+ // Returns whether there's anything in the queue.
+ bool IsEmpty() const;
+
+ // Returns the total number of bytes in the queue.
+ size_t GetTotalSize() const;
+
+ // Enqueues the bytes in |buffer|.
+ void Enqueue(scoped_ptr<SpdyBuffer> buffer);
+
+ // Dequeues up to |len| (which must be positive) bytes into
+ // |out|. Returns the number of bytes dequeued.
+ size_t Dequeue(char* out, size_t len);
+
+ // Removes all bytes from the queue.
+ void Clear();
+
+ private:
+ std::deque<SpdyBuffer*> queue_;
+ size_t total_size_;
+
+ DISALLOW_COPY_AND_ASSIGN(SpdyReadQueue);
+};
+
+} // namespace net
+
+#endif // NET_SPDY_SPDY_BUFFER_QUEUE_H_
diff --git a/net/spdy/spdy_read_queue_unittest.cc b/net/spdy/spdy_read_queue_unittest.cc
new file mode 100644
index 0000000..7281f68
--- /dev/null
+++ b/net/spdy/spdy_read_queue_unittest.cc
@@ -0,0 +1,106 @@
+// Copyright (c) 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "net/spdy/spdy_read_queue.h"
+
+#include <algorithm>
+#include <cstddef>
+#include <string>
+
+#include "base/memory/scoped_ptr.h"
+#include "base/stl_util.h"
+#include "net/spdy/spdy_buffer.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+namespace net {
+
+namespace {
+
+const char kData[] = "SPDY read queue test data.\0Some more data.";
+const size_t kDataSize = arraysize(kData);
+
+// Enqueues |data| onto |queue| in chunks of at most |max_buffer_size|
+// bytes.
+void EnqueueString(const std::string& data,
+ size_t max_buffer_size,
+ SpdyReadQueue* queue) {
+ ASSERT_GT(data.size(), 0u);
+ ASSERT_GT(max_buffer_size, 0u);
+ size_t old_total_size = queue->GetTotalSize();
+ for (size_t i = 0; i < data.size();) {
+ size_t buffer_size = std::min(data.size() - i, max_buffer_size);
+ queue->Enqueue(
+ scoped_ptr<SpdyBuffer>(new SpdyBuffer(data.data() + i, buffer_size)));
+ i += buffer_size;
+ EXPECT_FALSE(queue->IsEmpty());
+ EXPECT_EQ(old_total_size + i, queue->GetTotalSize());
+ }
+}
+
+// Dequeues all bytes in |queue| in chunks of at most
+// |max_buffer_size| bytes and returns the data as a string.
+std::string DrainToString(size_t max_buffer_size, SpdyReadQueue* queue) {
+ std::string data;
+
+ // Pad the buffer so we can detect out-of-bound writes.
+ size_t padding = std::max(static_cast<size_t>(4096), queue->GetTotalSize());
+ size_t buffer_size_with_padding = padding + max_buffer_size + padding;
+ scoped_ptr<char[]> buffer(new char[buffer_size_with_padding]);
+ std::memset(buffer.get(), 0, buffer_size_with_padding);
+ char* buffer_data = buffer.get() + padding;
+
+ while (!queue->IsEmpty()) {
+ size_t old_total_size = queue->GetTotalSize();
+ EXPECT_GT(old_total_size, 0u);
+ size_t dequeued_bytes = queue->Dequeue(buffer_data, max_buffer_size);
+
+ // Make sure |queue| doesn't write past either end of its given
+ // boundaries.
+ for (int i = 1; i <= static_cast<int>(padding); ++i) {
+ EXPECT_EQ('\0', buffer_data[-i]) << -i;
+ }
+ for (size_t i = 0; i < padding; ++i) {
+ EXPECT_EQ('\0', buffer_data[max_buffer_size + i]) << i;
+ }
+
+ data.append(buffer_data, dequeued_bytes);
+ EXPECT_EQ(dequeued_bytes, std::min(max_buffer_size, dequeued_bytes));
+ EXPECT_EQ(queue->GetTotalSize(), old_total_size - dequeued_bytes);
+ }
+ EXPECT_TRUE(queue->IsEmpty());
+ return data;
+}
+
+// Enqueue a test string with the given enqueue/dequeue max buffer
+// sizes.
+void RunEnqueueDequeueTest(size_t enqueue_max_buffer_size,
+ size_t dequeue_max_buffer_size) {
+ std::string data(kData, kDataSize);
+ SpdyReadQueue read_queue;
+ EnqueueString(data, enqueue_max_buffer_size, &read_queue);
+ const std::string& drained_data =
+ DrainToString(dequeue_max_buffer_size, &read_queue);
+ EXPECT_EQ(data, drained_data);
+}
+
+class SpdyReadQueueTest : public ::testing::Test {};
+
+// Call RunEnqueueDequeueTest() with various buffer size combinatinos.
+
+TEST_F(SpdyReadQueueTest, LargeEnqueueAndDequeueBuffers) {
+ RunEnqueueDequeueTest(2 * kDataSize, 2 * kDataSize);
+}
+
+TEST_F(SpdyReadQueueTest, OneByteEnqueueAndDequeueBuffers) {
+ RunEnqueueDequeueTest(1, 1);
+}
+
+TEST_F(SpdyReadQueueTest, CoprimeBufferSizes) {
+ RunEnqueueDequeueTest(2, 3);
+ RunEnqueueDequeueTest(3, 2);
+}
+
+} // namespace
+
+} // namespace net
diff --git a/net/spdy/spdy_session.cc b/net/spdy/spdy_session.cc
index 1b59ec7..f705731 100644
--- a/net/spdy/spdy_session.cc
+++ b/net/spdy/spdy_session.cc
@@ -31,9 +31,9 @@
#include "net/cert/asn1_util.h"
#include "net/http/http_network_session.h"
#include "net/http/http_server_properties.h"
+#include "net/spdy/spdy_buffer_producer.h"
#include "net/spdy/spdy_credential_builder.h"
#include "net/spdy/spdy_frame_builder.h"
-#include "net/spdy/spdy_frame_producer.h"
#include "net/spdy/spdy_http_utils.h"
#include "net/spdy/spdy_protocol.h"
#include "net/spdy/spdy_session_pool.h"
@@ -312,6 +312,7 @@ SpdySession::SpdySession(const HostPortProxyPair& host_port_proxy_pair,
stream_hi_water_mark_(kFirstStreamId),
write_pending_(false),
in_flight_write_frame_type_(DATA),
+ in_flight_write_frame_size_(0),
delayed_write_pending_(false),
is_secure_(false),
certificate_error_code_(OK),
@@ -636,7 +637,7 @@ int SpdySession::GetProtocolVersion() const {
void SpdySession::EnqueueStreamWrite(
SpdyStream* stream,
SpdyFrameType frame_type,
- scoped_ptr<SpdyFrameProducer> producer) {
+ scoped_ptr<SpdyBufferProducer> producer) {
DCHECK(frame_type == HEADERS ||
frame_type == DATA ||
frame_type == CREDENTIAL ||
@@ -738,18 +739,18 @@ scoped_ptr<SpdyFrame> SpdySession::CreateHeadersFrame(
return frame.Pass();
}
-scoped_ptr<SpdyFrame> SpdySession::CreateDataFrame(SpdyStreamId stream_id,
- net::IOBuffer* data,
- int len,
- SpdyDataFlags flags) {
- // Find our stream
+scoped_ptr<SpdyBuffer> SpdySession::CreateDataBuffer(SpdyStreamId stream_id,
+ net::IOBuffer* data,
+ int len,
+ SpdyDataFlags flags) {
+ // Find our stream.
CHECK(IsStreamActive(stream_id));
scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
CHECK_EQ(stream->stream_id(), stream_id);
if (len < 0) {
NOTREACHED();
- return scoped_ptr<SpdyFrame>();
+ return scoped_ptr<SpdyBuffer>();
}
if (len > kMaxSpdyFrameChunkSize) {
@@ -772,7 +773,7 @@ scoped_ptr<SpdyFrame> SpdySession::CreateDataFrame(SpdyStreamId stream_id,
net_log().AddEvent(
NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_ON_STREAM_SEND_WINDOW,
NetLog::IntegerCallback("stream_id", stream_id));
- return scoped_ptr<SpdyFrame>();
+ return scoped_ptr<SpdyBuffer>();
}
if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) {
effective_window_size =
@@ -784,7 +785,7 @@ scoped_ptr<SpdyFrame> SpdySession::CreateDataFrame(SpdyStreamId stream_id,
net_log().AddEvent(
NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_ON_SESSION_SEND_WINDOW,
NetLog::IntegerCallback("stream_id", stream_id));
- return scoped_ptr<SpdyFrame>();
+ return scoped_ptr<SpdyBuffer>();
}
}
@@ -815,7 +816,7 @@ scoped_ptr<SpdyFrame> SpdySession::CreateDataFrame(SpdyStreamId stream_id,
buffered_spdy_framer_->CreateDataFrame(
stream_id, data->data(), static_cast<uint32>(len), flags));
- return frame.Pass();
+ return scoped_ptr<SpdyBuffer>(new SpdyBuffer(frame.Pass()));
}
void SpdySession::CloseStream(SpdyStreamId stream_id, int status) {
@@ -982,42 +983,45 @@ void SpdySession::OnWriteComplete(int result) {
scoped_refptr<SpdySession> self(this);
DCHECK(write_pending_);
- DCHECK_GT(in_flight_write_.buffer()->BytesRemaining(), 0);
+ DCHECK_GT(in_flight_write_->GetRemainingSize(), 0u);
last_activity_time_ = base::TimeTicks::Now();
write_pending_ = false;
if (result < 0) {
- in_flight_write_.Release();
+ in_flight_write_.reset();
in_flight_write_frame_type_ = DATA;
- CloseSessionOnError(static_cast<net::Error>(result), true, "Write error");
+ in_flight_write_frame_size_ = 0;
+ in_flight_write_stream_ = NULL;
+ CloseSessionOnError(static_cast<Error>(result), true, "Write error");
return;
}
// It should not be possible to have written more bytes than our
// in_flight_write_.
- DCHECK_LE(result, in_flight_write_.buffer()->BytesRemaining());
-
- in_flight_write_.buffer()->DidConsume(result);
-
- // We only notify the stream when we've fully written the pending frame.
- if (in_flight_write_.buffer()->BytesRemaining() == 0) {
- DCHECK_GT(result, 0);
-
- scoped_refptr<SpdyStream> stream = in_flight_write_.stream();
+ DCHECK_LE(static_cast<size_t>(result),
+ in_flight_write_->GetRemainingSize());
+
+ if (result > 0) {
+ in_flight_write_->Consume(static_cast<size_t>(result));
+
+ // We only notify the stream when we've fully written the pending frame.
+ if (in_flight_write_->GetRemainingSize() == 0) {
+ // It is possible that the stream was cancelled while we were
+ // writing to the socket.
+ if (in_flight_write_stream_ && !in_flight_write_stream_->cancelled()) {
+ DCHECK_GT(in_flight_write_frame_size_, 0u);
+ in_flight_write_stream_->OnFrameWriteComplete(
+ in_flight_write_frame_type_,
+ in_flight_write_frame_size_);
+ }
- // It is possible that the stream was cancelled while we were writing
- // to the socket.
- if (stream && !stream->cancelled()) {
- DCHECK_GT(in_flight_write_.buffer()->size(), 0);
- stream->OnFrameWriteComplete(
- in_flight_write_frame_type_,
- static_cast<size_t>(in_flight_write_.buffer()->size()));
+ // Cleanup the write which just completed.
+ in_flight_write_.reset();
+ in_flight_write_frame_type_ = DATA;
+ in_flight_write_frame_size_ = 0;
+ in_flight_write_stream_ = NULL;
}
-
- // Cleanup the write which just completed.
- in_flight_write_.Release();
- in_flight_write_frame_type_ = DATA;
}
// Write more data. We're already in a continuation, so we can go
@@ -1057,12 +1061,12 @@ void SpdySession::WriteSocket() {
// returns error (or ERR_IO_PENDING).
DCHECK(buffered_spdy_framer_.get());
while (true) {
- if (in_flight_write_.buffer()) {
- DCHECK_GT(in_flight_write_.buffer()->BytesRemaining(), 0);
+ if (in_flight_write_) {
+ DCHECK_GT(in_flight_write_->GetRemainingSize(), 0u);
} else {
// Grab the next frame to send.
SpdyFrameType frame_type = DATA;
- scoped_ptr<SpdyFrameProducer> producer;
+ scoped_ptr<SpdyBufferProducer> producer;
scoped_refptr<SpdyStream> stream;
if (!write_queue_.Dequeue(&frame_type, &producer, &stream))
break;
@@ -1086,25 +1090,25 @@ void SpdySession::WriteSocket() {
}
}
- scoped_ptr<SpdyFrame> frame = producer->ProduceFrame();
- if (!frame) {
+ in_flight_write_ = producer->ProduceBuffer();
+ if (!in_flight_write_) {
NOTREACHED();
continue;
}
- DCHECK_GT(frame->size(), 0u);
-
- // TODO(mbelshe): We have too much copying of data here.
- scoped_refptr<IOBufferWithSize> buffer =
- new IOBufferWithSize(frame->size());
- memcpy(buffer->data(), frame->data(), frame->size());
- in_flight_write_ = SpdyIOBuffer(buffer, frame->size(), stream);
in_flight_write_frame_type_ = frame_type;
+ in_flight_write_frame_size_ = in_flight_write_->GetRemainingSize();
+ DCHECK_GE(in_flight_write_frame_size_,
+ buffered_spdy_framer_->GetFrameMinimumSize());
+ in_flight_write_stream_ = stream;
}
write_pending_ = true;
+ // We keep |in_flight_write_| alive until OnWriteComplete(), so
+ // it's okay to use GetIOBufferForRemainingData() since the socket
+ // doesn't use the IOBuffer past OnWriteComplete().
int rv = connection_->socket()->Write(
- in_flight_write_.buffer(),
- in_flight_write_.buffer()->BytesRemaining(),
+ in_flight_write_->GetIOBufferForRemainingData(),
+ in_flight_write_->GetRemainingSize(),
base::Bind(&SpdySession::OnWriteComplete, weak_factory_.GetWeakPtr()));
if (rv == net::ERR_IO_PENDING)
break;
@@ -1294,13 +1298,15 @@ void SpdySession::EnqueueSessionWrite(RequestPriority priority,
frame_type == PING);
EnqueueWrite(
priority, frame_type,
- scoped_ptr<SpdyFrameProducer>(new SimpleFrameProducer(frame.Pass())),
+ scoped_ptr<SpdyBufferProducer>(
+ new SimpleBufferProducer(
+ scoped_ptr<SpdyBuffer>(new SpdyBuffer(frame.Pass())))),
NULL);
}
void SpdySession::EnqueueWrite(RequestPriority priority,
SpdyFrameType frame_type,
- scoped_ptr<SpdyFrameProducer> producer,
+ scoped_ptr<SpdyBufferProducer> producer,
const scoped_refptr<SpdyStream>& stream) {
write_queue_.Enqueue(priority, frame_type, producer.Pass(), stream);
WriteSocketLater();
@@ -1418,16 +1424,24 @@ void SpdySession::OnStreamFrameData(SpdyStreamId stream_id,
base::Bind(&NetLogSpdyDataCallback, stream_id, len, fin));
}
+ ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
+
// By the time data comes in, the stream may already be inactive.
- if (!IsStreamActive(stream_id))
+ if (it == active_streams_.end())
return;
// Only decrease the window size for data for active streams.
if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION && len > 0)
DecreaseRecvWindowSize(static_cast<int32>(len));
- scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
- stream->OnDataReceived(data, len);
+ scoped_ptr<SpdyBuffer> buffer;
+ if (data) {
+ DCHECK_GT(len, 0u);
+ buffer.reset(new SpdyBuffer(data, len));
+ } else {
+ DCHECK_EQ(len, 0u);
+ }
+ it->second->OnDataReceived(buffer.Pass());
}
void SpdySession::OnSetting(SpdySettingsIds id,
@@ -1696,7 +1710,7 @@ void SpdySession::OnRstStream(SpdyStreamId stream_id,
CHECK(!stream->cancelled());
if (status == 0) {
- stream->OnDataReceived(NULL, 0);
+ stream->OnDataReceived(scoped_ptr<SpdyBuffer>());
} else if (status == RST_STREAM_REFUSED_STREAM) {
DeleteStream(stream_id, ERR_SPDY_SERVER_REFUSED_STREAM);
} else {
diff --git a/net/spdy/spdy_session.h b/net/spdy/spdy_session.h
index b4e7335..55ac9f9 100644
--- a/net/spdy/spdy_session.h
+++ b/net/spdy/spdy_session.h
@@ -26,9 +26,9 @@
#include "net/socket/ssl_client_socket.h"
#include "net/socket/stream_socket.h"
#include "net/spdy/buffered_spdy_framer.h"
+#include "net/spdy/spdy_buffer.h"
#include "net/spdy/spdy_credential_state.h"
#include "net/spdy/spdy_header_block.h"
-#include "net/spdy/spdy_io_buffer.h"
#include "net/spdy/spdy_protocol.h"
#include "net/spdy/spdy_session_pool.h"
#include "net/spdy/spdy_write_queue.h"
@@ -246,7 +246,7 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>,
// producer is used to produce its frame.
void EnqueueStreamWrite(SpdyStream* stream,
SpdyFrameType frame_type,
- scoped_ptr<SpdyFrameProducer> producer);
+ scoped_ptr<SpdyBufferProducer> producer);
// Creates and returns a SYN frame for |stream_id|.
scoped_ptr<SpdyFrame> CreateSynStream(
@@ -271,12 +271,12 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>,
const SpdyHeaderBlock& headers,
SpdyControlFlags flags);
- // Creates and returns a data frame. May return NULL if stalled by
- // flow control.
- scoped_ptr<SpdyFrame> CreateDataFrame(SpdyStreamId stream_id,
- net::IOBuffer* data,
- int len,
- SpdyDataFlags flags);
+ // Creates and returns a SpdyBuffer holding a data frame with the
+ // given data. May return NULL if stalled by flow control.
+ scoped_ptr<SpdyBuffer> CreateDataBuffer(SpdyStreamId stream_id,
+ net::IOBuffer* data,
+ int len,
+ SpdyDataFlags flags);
// Close a stream.
void CloseStream(SpdyStreamId stream_id, int status);
@@ -584,7 +584,7 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>,
// with the given priority.
void EnqueueWrite(RequestPriority priority,
SpdyFrameType frame_type,
- scoped_ptr<SpdyFrameProducer> producer,
+ scoped_ptr<SpdyBufferProducer> producer,
const scoped_refptr<SpdyStream>& stream);
// Track active streams in the active stream list.
@@ -771,10 +771,18 @@ class NET_EXPORT SpdySession : public base::RefCounted<SpdySession>,
// The write queue.
SpdyWriteQueue write_queue_;
- // The packet we are currently sending.
- bool write_pending_; // Will be true when a write is in progress.
- SpdyIOBuffer in_flight_write_; // This is the write buffer in progress.
+ // Data for the frame we are currently sending.
+ // Whether we have a socket write pending completion.
+ bool write_pending_;
+ // The buffer we're currently writing.
+ scoped_ptr<SpdyBuffer> in_flight_write_;
+ // The type of the frame in |in_flight_write_|.
SpdyFrameType in_flight_write_frame_type_;
+ // The size of the frame in |in_flight_write_|.
+ size_t in_flight_write_frame_size_;
+ // The stream to notify when |in_flight_write_| has been written to
+ // the socket completely.
+ scoped_refptr<SpdyStream> in_flight_write_stream_;
// Flag if we have a pending message scheduled for WriteSocket.
bool delayed_write_pending_;
diff --git a/net/spdy/spdy_session_spdy2_unittest.cc b/net/spdy/spdy_session_spdy2_unittest.cc
index 5069a22..cd676bc 100644
--- a/net/spdy/spdy_session_spdy2_unittest.cc
+++ b/net/spdy/spdy_session_spdy2_unittest.cc
@@ -12,7 +12,6 @@
#include "net/base/test_data_directory.h"
#include "net/base/test_data_stream.h"
#include "net/dns/host_cache.h"
-#include "net/spdy/spdy_io_buffer.h"
#include "net/spdy/spdy_session_pool.h"
#include "net/spdy/spdy_session_test_util.h"
#include "net/spdy/spdy_stream.h"
diff --git a/net/spdy/spdy_session_spdy3_unittest.cc b/net/spdy/spdy_session_spdy3_unittest.cc
index 40e805f..1c63960 100644
--- a/net/spdy/spdy_session_spdy3_unittest.cc
+++ b/net/spdy/spdy_session_spdy3_unittest.cc
@@ -13,7 +13,6 @@
#include "net/base/test_data_stream.h"
#include "net/dns/host_cache.h"
#include "net/spdy/spdy_http_utils.h"
-#include "net/spdy/spdy_io_buffer.h"
#include "net/spdy/spdy_session_pool.h"
#include "net/spdy/spdy_session_test_util.h"
#include "net/spdy/spdy_stream.h"
diff --git a/net/spdy/spdy_stream.cc b/net/spdy/spdy_stream.cc
index 69d9cfa..3de3716 100644
--- a/net/spdy/spdy_stream.cc
+++ b/net/spdy/spdy_stream.cc
@@ -12,7 +12,7 @@
#include "base/message_loop.h"
#include "base/stringprintf.h"
#include "base/values.h"
-#include "net/spdy/spdy_frame_producer.h"
+#include "net/spdy/spdy_buffer_producer.h"
#include "net/spdy/spdy_http_utils.h"
#include "net/spdy/spdy_session.h"
@@ -54,22 +54,23 @@ bool ContainsUpperAscii(const std::string& str) {
} // namespace
// A wrapper around a stream that calls into ProduceSynStreamFrame().
-class SpdyStream::SynStreamFrameProducer : public SpdyFrameProducer {
+class SpdyStream::SynStreamBufferProducer : public SpdyBufferProducer {
public:
- SynStreamFrameProducer(const base::WeakPtr<SpdyStream>& stream)
+ SynStreamBufferProducer(const base::WeakPtr<SpdyStream>& stream)
: stream_(stream) {
DCHECK(stream_);
}
- virtual ~SynStreamFrameProducer() {}
+ virtual ~SynStreamBufferProducer() {}
- virtual scoped_ptr<SpdyFrame> ProduceFrame() OVERRIDE {
+ virtual scoped_ptr<SpdyBuffer> ProduceBuffer() OVERRIDE {
if (!stream_) {
NOTREACHED();
- return scoped_ptr<SpdyFrame>();
+ return scoped_ptr<SpdyBuffer>();
}
DCHECK_GT(stream_->stream_id(), 0u);
- return stream_->ProduceSynStreamFrame();
+ return scoped_ptr<SpdyBuffer>(
+ new SpdyBuffer(stream_->ProduceSynStreamFrame()));
}
private:
@@ -78,9 +79,9 @@ class SpdyStream::SynStreamFrameProducer : public SpdyFrameProducer {
// A wrapper around a stream that calls into ProduceHeaderFrame() with
// a given header block.
-class SpdyStream::HeaderFrameProducer : public SpdyFrameProducer {
+class SpdyStream::HeaderBufferProducer : public SpdyBufferProducer {
public:
- HeaderFrameProducer(const base::WeakPtr<SpdyStream>& stream,
+ HeaderBufferProducer(const base::WeakPtr<SpdyStream>& stream,
scoped_ptr<SpdyHeaderBlock> headers)
: stream_(stream),
headers_(headers.Pass()) {
@@ -88,15 +89,16 @@ class SpdyStream::HeaderFrameProducer : public SpdyFrameProducer {
DCHECK(headers_);
}
- virtual ~HeaderFrameProducer() {}
+ virtual ~HeaderBufferProducer() {}
- virtual scoped_ptr<SpdyFrame> ProduceFrame() OVERRIDE {
+ virtual scoped_ptr<SpdyBuffer> ProduceBuffer() OVERRIDE {
if (!stream_) {
NOTREACHED();
- return scoped_ptr<SpdyFrame>();
+ return scoped_ptr<SpdyBuffer>();
}
DCHECK_GT(stream_->stream_id(), 0u);
- return stream_->ProduceHeaderFrame(headers_.Pass());
+ return scoped_ptr<SpdyBuffer>(
+ new SpdyBuffer(stream_->ProduceHeaderFrame(headers_.Pass())));
}
private:
@@ -175,17 +177,17 @@ void SpdyStream::PushedStreamReplayData() {
return;
}
- std::vector<scoped_refptr<IOBufferWithSize> > buffers;
- buffers.swap(pending_buffers_);
+ std::vector<SpdyBuffer*> buffers;
+ pending_buffers_.release(&buffers);
for (size_t i = 0; i < buffers.size(); ++i) {
// It is always possible that a callback to the delegate results in
// the delegate no longer being available.
if (!delegate_)
break;
if (buffers[i]) {
- delegate_->OnDataReceived(buffers[i]->data(), buffers[i]->size());
+ delegate_->OnDataReceived(scoped_ptr<SpdyBuffer>(buffers[i]));
} else {
- delegate_->OnDataReceived(NULL, 0);
+ delegate_->OnDataReceived(scoped_ptr<SpdyBuffer>());
session_->CloseStream(stream_id_, net::OK);
// Note: |this| may be deleted after calling CloseStream.
DCHECK_EQ(buffers.size() - 1, i);
@@ -450,9 +452,8 @@ int SpdyStream::OnHeaders(const SpdyHeaderBlock& headers) {
return rv;
}
-void SpdyStream::OnDataReceived(const char* data, size_t length) {
+void SpdyStream::OnDataReceived(scoped_ptr<SpdyBuffer> buffer) {
DCHECK(session_->IsStreamActive(stream_id_));
- DCHECK_LT(length, 1u << 24);
// If we don't have a response, then the SYN_REPLY did not come through.
// We cannot pass data up to the caller unless the reply headers have been
// received.
@@ -465,10 +466,8 @@ void SpdyStream::OnDataReceived(const char* data, size_t length) {
if (!delegate_ || continue_buffering_data_) {
// It should be valid for this to happen in the server push case.
// We'll return received data when delegate gets attached to the stream.
- if (length > 0) {
- IOBufferWithSize* buf = new IOBufferWithSize(length);
- memcpy(buf->data(), data, length);
- pending_buffers_.push_back(make_scoped_refptr(buf));
+ if (buffer) {
+ pending_buffers_.push_back(buffer.release());
} else {
pending_buffers_.push_back(NULL);
metrics_.StopStream();
@@ -480,14 +479,15 @@ void SpdyStream::OnDataReceived(const char* data, size_t length) {
CHECK(!closed());
- // A zero-length read means that the stream is being closed.
- if (length == 0) {
+ if (!buffer) {
metrics_.StopStream();
session_->CloseStream(stream_id_, net::OK);
// Note: |this| may be deleted after calling CloseStream.
return;
}
+ size_t length = buffer->GetRemainingSize();
+ DCHECK_LE(length, session_->GetDataFrameMaximumPayload());
if (session_->flow_control_state() >= SpdySession::FLOW_CONTROL_STREAM)
DecreaseRecvWindowSize(static_cast<int32>(length));
@@ -496,7 +496,7 @@ void SpdyStream::OnDataReceived(const char* data, size_t length) {
recv_bytes_ += length;
recv_last_byte_time_ = base::TimeTicks::Now();
- if (delegate_->OnDataReceived(data, length) != net::OK) {
+ if (delegate_->OnDataReceived(buffer.Pass()) != net::OK) {
// |delegate_| rejected the data.
LogStreamError(ERR_SPDY_PROTOCOL_ERROR, "Delegate rejected the data");
session_->CloseStream(stream_id_, ERR_SPDY_PROTOCOL_ERROR);
@@ -580,8 +580,8 @@ void SpdyStream::QueueHeaders(scoped_ptr<SpdyHeaderBlock> headers) {
session_->EnqueueStreamWrite(
this, HEADERS,
- scoped_ptr<SpdyFrameProducer>(
- new HeaderFrameProducer(
+ scoped_ptr<SpdyBufferProducer>(
+ new HeaderBufferProducer(
weak_ptr_factory_.GetWeakPtr(), headers.Pass())));
}
@@ -594,15 +594,16 @@ void SpdyStream::QueueStreamData(IOBuffer* data,
CHECK_GT(stream_id_, 0u);
CHECK(!cancelled());
- scoped_ptr<SpdyFrame> data_frame(session_->CreateDataFrame(
+ scoped_ptr<SpdyBuffer> data_buffer(session_->CreateDataBuffer(
stream_id_, data, length, flags));
- if (!data_frame)
+ // We'll get called again by PossiblyResumeIfSendStalled().
+ if (!data_buffer)
return;
session_->EnqueueStreamWrite(
this, DATA,
- scoped_ptr<SpdyFrameProducer>(
- new SimpleFrameProducer(data_frame.Pass())));
+ scoped_ptr<SpdyBufferProducer>(
+ new SimpleBufferProducer(data_buffer.Pass())));
}
bool SpdyStream::GetSSLInfo(SSLInfo* ssl_info,
@@ -784,7 +785,9 @@ int SpdyStream::DoSendDomainBoundCert() {
// the state machine appropriately.
session_->EnqueueStreamWrite(
this, CREDENTIAL,
- scoped_ptr<SpdyFrameProducer>(new SimpleFrameProducer(frame.Pass())));
+ scoped_ptr<SpdyBufferProducer>(
+ new SimpleBufferProducer(
+ scoped_ptr<SpdyBuffer>(new SpdyBuffer(frame.Pass())))));
return ERR_IO_PENDING;
}
@@ -800,8 +803,8 @@ int SpdyStream::DoSendHeaders() {
session_->EnqueueStreamWrite(
this, SYN_STREAM,
- scoped_ptr<SpdyFrameProducer>(
- new SynStreamFrameProducer(weak_ptr_factory_.GetWeakPtr())));
+ scoped_ptr<SpdyBufferProducer>(
+ new SynStreamBufferProducer(weak_ptr_factory_.GetWeakPtr())));
return ERR_IO_PENDING;
}
diff --git a/net/spdy/spdy_stream.h b/net/spdy/spdy_stream.h
index 881b48e..fd145ea 100644
--- a/net/spdy/spdy_stream.h
+++ b/net/spdy/spdy_stream.h
@@ -12,6 +12,7 @@
#include "base/basictypes.h"
#include "base/memory/ref_counted.h"
#include "base/memory/scoped_ptr.h"
+#include "base/memory/scoped_vector.h"
#include "base/memory/weak_ptr.h"
#include "googleurl/src/gurl.h"
#include "net/base/bandwidth_metrics.h"
@@ -83,9 +84,10 @@ class NET_EXPORT_PRIVATE SpdyStream
// Called when a HEADERS frame is sent.
virtual void OnHeadersSent() = 0;
- // Called when data is received.
- // Returns network error code. OK when it successfully receives data.
- virtual int OnDataReceived(const char* data, int length) = 0;
+ // Called when data is received. |buffer| may be NULL, which
+ // signals EOF. Must return OK if the data was received
+ // successfully, or a network error code otherwise.
+ virtual int OnDataReceived(scoped_ptr<SpdyBuffer> buffer) = 0;
// Called when data is sent.
virtual void OnDataSent(size_t bytes_sent) = 0;
@@ -218,7 +220,7 @@ class NET_EXPORT_PRIVATE SpdyStream
//
// |length| is the number of bytes received (at most 2^24 - 1) or 0 if
// the stream is being closed.
- void OnDataReceived(const char* buffer, size_t length);
+ void OnDataReceived(scoped_ptr<SpdyBuffer> buffer);
// Called by the SpdySession when a frame has been successfully and
// completely written. |frame_size| is the total size of the frame
@@ -289,8 +291,8 @@ class NET_EXPORT_PRIVATE SpdyStream
int GetProtocolVersion() const;
private:
- class SynStreamFrameProducer;
- class HeaderFrameProducer;
+ class SynStreamBufferProducer;
+ class HeaderBufferProducer;
enum State {
STATE_NONE,
@@ -411,7 +413,7 @@ class NET_EXPORT_PRIVATE SpdyStream
int recv_bytes_;
// Data received before delegate is attached.
- std::vector<scoped_refptr<IOBufferWithSize> > pending_buffers_;
+ ScopedVector<SpdyBuffer> pending_buffers_;
SSLClientCertType domain_bound_cert_type_;
std::string domain_bound_private_key_;
diff --git a/net/spdy/spdy_stream_test_util.cc b/net/spdy/spdy_stream_test_util.cc
index ac96cf9..0165c20 100644
--- a/net/spdy/spdy_stream_test_util.cc
+++ b/net/spdy/spdy_stream_test_util.cc
@@ -37,7 +37,7 @@ int ClosingDelegate::OnResponseReceived(const SpdyHeaderBlock& response,
void ClosingDelegate::OnHeadersSent() {}
-int ClosingDelegate::OnDataReceived(const char* data, int length) {
+int ClosingDelegate::OnDataReceived(scoped_ptr<SpdyBuffer> buffer) {
return OK;
}
@@ -77,8 +77,11 @@ void StreamDelegateBase::OnHeadersSent() {
headers_sent_++;
}
-int StreamDelegateBase::OnDataReceived(const char* buffer, int bytes) {
- received_data_ += std::string(buffer, bytes);
+int StreamDelegateBase::OnDataReceived(scoped_ptr<SpdyBuffer> buffer) {
+ if (buffer) {
+ received_data_ += std::string(buffer->GetRemainingData(),
+ buffer->GetRemainingSize());
+ }
return OK;
}
diff --git a/net/spdy/spdy_stream_test_util.h b/net/spdy/spdy_stream_test_util.h
index 5fc74af..a7ca912 100644
--- a/net/spdy/spdy_stream_test_util.h
+++ b/net/spdy/spdy_stream_test_util.h
@@ -32,7 +32,7 @@ class ClosingDelegate : public SpdyStream::Delegate {
base::Time response_time,
int status) OVERRIDE;
virtual void OnHeadersSent() OVERRIDE;
- virtual int OnDataReceived(const char* data, int length) OVERRIDE;
+ virtual int OnDataReceived(scoped_ptr<SpdyBuffer> buffer) OVERRIDE;
virtual void OnDataSent(size_t bytes_sent) OVERRIDE;
virtual void OnClose(int status) OVERRIDE;
@@ -54,7 +54,7 @@ class StreamDelegateBase : public SpdyStream::Delegate {
base::Time response_time,
int status) OVERRIDE;
virtual void OnHeadersSent() OVERRIDE;
- virtual int OnDataReceived(const char* buffer, int bytes) OVERRIDE;
+ virtual int OnDataReceived(scoped_ptr<SpdyBuffer> buffer) OVERRIDE;
virtual void OnDataSent(size_t bytes_sent) OVERRIDE;
virtual void OnClose(int status) OVERRIDE;
diff --git a/net/spdy/spdy_websocket_stream.cc b/net/spdy/spdy_websocket_stream.cc
index 1f80d48..a17ff73 100644
--- a/net/spdy/spdy_websocket_stream.cc
+++ b/net/spdy/spdy_websocket_stream.cc
@@ -8,8 +8,8 @@
#include "base/bind_helpers.h"
#include "base/compiler_specific.h"
#include "googleurl/src/gurl.h"
-#include "net/base/net_errors.h"
#include "net/base/io_buffer.h"
+#include "net/base/net_errors.h"
#include "net/spdy/spdy_framer.h"
#include "net/spdy/spdy_protocol.h"
#include "net/spdy/spdy_session.h"
@@ -116,9 +116,9 @@ void SpdyWebSocketStream::OnHeadersSent() {
NOTREACHED();
}
-int SpdyWebSocketStream::OnDataReceived(const char* data, int length) {
+int SpdyWebSocketStream::OnDataReceived(scoped_ptr<SpdyBuffer> buffer) {
DCHECK(delegate_);
- delegate_->OnReceivedSpdyData(data, length);
+ delegate_->OnReceivedSpdyData(buffer.Pass());
return OK;
}
diff --git a/net/spdy/spdy_websocket_stream.h b/net/spdy/spdy_websocket_stream.h
index 2cef1c6..02f6c7c 100644
--- a/net/spdy/spdy_websocket_stream.h
+++ b/net/spdy/spdy_websocket_stream.h
@@ -49,7 +49,7 @@ class NET_EXPORT_PRIVATE SpdyWebSocketStream
virtual void OnSentSpdyData(size_t bytes_sent) = 0;
// Called when data is received.
- virtual void OnReceivedSpdyData(const char* data, int length) = 0;
+ virtual void OnReceivedSpdyData(scoped_ptr<SpdyBuffer> buffer) = 0;
// Called when SpdyStream is closed.
virtual void OnCloseSpdyStream() = 0;
@@ -81,7 +81,7 @@ class NET_EXPORT_PRIVATE SpdyWebSocketStream
base::Time response_time,
int status) OVERRIDE;
virtual void OnHeadersSent() OVERRIDE;
- virtual int OnDataReceived(const char* data, int length) OVERRIDE;
+ virtual int OnDataReceived(scoped_ptr<SpdyBuffer> buffer) OVERRIDE;
virtual void OnDataSent(size_t bytes_sent) OVERRIDE;
virtual void OnClose(int status) OVERRIDE;
diff --git a/net/spdy/spdy_websocket_stream_spdy2_unittest.cc b/net/spdy/spdy_websocket_stream_spdy2_unittest.cc
index b9b8ce3..695359f 100644
--- a/net/spdy/spdy_websocket_stream_spdy2_unittest.cc
+++ b/net/spdy/spdy_websocket_stream_spdy2_unittest.cc
@@ -116,13 +116,19 @@ class SpdyWebSocketStreamEventRecorder : public SpdyWebSocketStream::Delegate {
if (!on_sent_data_.is_null())
on_sent_data_.Run(&events_.back());
}
- virtual void OnReceivedSpdyData(const char* data, int length) OVERRIDE {
+ virtual void OnReceivedSpdyData(scoped_ptr<SpdyBuffer> buffer) OVERRIDE {
+ std::string buffer_data;
+ size_t buffer_len = 0;
+ if (buffer) {
+ buffer_len = buffer->GetRemainingSize();
+ buffer_data.append(buffer->GetRemainingData(), buffer_len);
+ }
events_.push_back(
SpdyWebSocketStreamEvent(
SpdyWebSocketStreamEvent::EVENT_RECEIVED_DATA,
SpdyHeaderBlock(),
- length,
- std::string(data, length)));
+ buffer_len,
+ buffer_data));
if (!on_received_data_.is_null())
on_received_data_.Run(&events_.back());
}
diff --git a/net/spdy/spdy_websocket_stream_spdy3_unittest.cc b/net/spdy/spdy_websocket_stream_spdy3_unittest.cc
index d008b1a..5b30a43 100644
--- a/net/spdy/spdy_websocket_stream_spdy3_unittest.cc
+++ b/net/spdy/spdy_websocket_stream_spdy3_unittest.cc
@@ -116,13 +116,19 @@ class SpdyWebSocketStreamEventRecorder : public SpdyWebSocketStream::Delegate {
if (!on_sent_data_.is_null())
on_sent_data_.Run(&events_.back());
}
- virtual void OnReceivedSpdyData(const char* data, int length) OVERRIDE {
+ virtual void OnReceivedSpdyData(scoped_ptr<SpdyBuffer> buffer) OVERRIDE {
+ std::string buffer_data;
+ size_t buffer_len = 0;
+ if (buffer) {
+ buffer_len = buffer->GetRemainingSize();
+ buffer_data.append(buffer->GetRemainingData(), buffer_len);
+ }
events_.push_back(
SpdyWebSocketStreamEvent(
SpdyWebSocketStreamEvent::EVENT_RECEIVED_DATA,
SpdyHeaderBlock(),
- length,
- std::string(data, length)));
+ buffer_len,
+ buffer_data));
if (!on_received_data_.is_null())
on_received_data_.Run(&events_.back());
}
diff --git a/net/spdy/spdy_write_queue.cc b/net/spdy/spdy_write_queue.cc
index f33cf62..18ddc39 100644
--- a/net/spdy/spdy_write_queue.cc
+++ b/net/spdy/spdy_write_queue.cc
@@ -7,7 +7,8 @@
#include <cstddef>
#include "base/logging.h"
-#include "net/spdy/spdy_frame_producer.h"
+#include "net/spdy/spdy_buffer.h"
+#include "net/spdy/spdy_buffer_producer.h"
#include "net/spdy/spdy_stream.h"
namespace net {
@@ -16,7 +17,7 @@ SpdyWriteQueue::PendingWrite::PendingWrite() : frame_producer(NULL) {}
SpdyWriteQueue::PendingWrite::PendingWrite(
SpdyFrameType frame_type,
- SpdyFrameProducer* frame_producer,
+ SpdyBufferProducer* frame_producer,
const scoped_refptr<SpdyStream>& stream)
: frame_type(frame_type),
frame_producer(frame_producer),
@@ -32,7 +33,7 @@ SpdyWriteQueue::~SpdyWriteQueue() {
void SpdyWriteQueue::Enqueue(RequestPriority priority,
SpdyFrameType frame_type,
- scoped_ptr<SpdyFrameProducer> frame_producer,
+ scoped_ptr<SpdyBufferProducer> frame_producer,
const scoped_refptr<SpdyStream>& stream) {
if (stream.get()) {
DCHECK_EQ(stream->priority(), priority);
@@ -42,7 +43,7 @@ void SpdyWriteQueue::Enqueue(RequestPriority priority,
}
bool SpdyWriteQueue::Dequeue(SpdyFrameType* frame_type,
- scoped_ptr<SpdyFrameProducer>* frame_producer,
+ scoped_ptr<SpdyBufferProducer>* frame_producer,
scoped_refptr<SpdyStream>* stream) {
for (int i = NUM_PRIORITIES - 1; i >= 0; --i) {
if (!queue_[i].empty()) {
diff --git a/net/spdy/spdy_write_queue.h b/net/spdy/spdy_write_queue.h
index 996fca75..1b8f1b8 100644
--- a/net/spdy/spdy_write_queue.h
+++ b/net/spdy/spdy_write_queue.h
@@ -16,10 +16,11 @@
namespace net {
-class SpdyFrameProducer;
+class SpdyBuffer;
+class SpdyBufferProducer;
class SpdyStream;
-// A queue of SpdyFrameProducers to produce frames to write. Ordered
+// A queue of SpdyBufferProducers to produce frames to write. Ordered
// by priority, and then FIFO.
class NET_EXPORT_PRIVATE SpdyWriteQueue {
public:
@@ -32,7 +33,7 @@ class NET_EXPORT_PRIVATE SpdyWriteQueue {
// is non-NULL, its priority must be equal to |priority|.
void Enqueue(RequestPriority priority,
SpdyFrameType frame_type,
- scoped_ptr<SpdyFrameProducer> frame_producer,
+ scoped_ptr<SpdyBufferProducer> frame_producer,
const scoped_refptr<SpdyStream>& stream);
// Dequeues the frame producer with the highest priority that was
@@ -40,7 +41,7 @@ class NET_EXPORT_PRIVATE SpdyWriteQueue {
// fills in |frame_type|, |frame_producer|, and |stream| if
// successful -- otherwise, just returns false.
bool Dequeue(SpdyFrameType* frame_type,
- scoped_ptr<SpdyFrameProducer>* frame_producer,
+ scoped_ptr<SpdyBufferProducer>* frame_producer,
scoped_refptr<SpdyStream>* stream);
// Removes all pending writes for the given stream, which must be
@@ -56,12 +57,12 @@ class NET_EXPORT_PRIVATE SpdyWriteQueue {
SpdyFrameType frame_type;
// This has to be a raw pointer since we store this in an STL
// container.
- SpdyFrameProducer* frame_producer;
+ SpdyBufferProducer* frame_producer;
scoped_refptr<SpdyStream> stream;
PendingWrite();
PendingWrite(SpdyFrameType frame_type,
- SpdyFrameProducer* frame_producer,
+ SpdyBufferProducer* frame_producer,
const scoped_refptr<SpdyStream>& stream);
~PendingWrite();
};
diff --git a/net/spdy/spdy_write_queue_unittest.cc b/net/spdy/spdy_write_queue_unittest.cc
index e284619..c98074f 100644
--- a/net/spdy/spdy_write_queue_unittest.cc
+++ b/net/spdy/spdy_write_queue_unittest.cc
@@ -14,7 +14,7 @@
#include "base/strings/string_number_conversions.h"
#include "net/base/net_log.h"
#include "net/base/request_priority.h"
-#include "net/spdy/spdy_frame_producer.h"
+#include "net/spdy/spdy_buffer_producer.h"
#include "net/spdy/spdy_stream.h"
#include "testing/gtest/include/gtest/gtest.h"
@@ -26,31 +26,33 @@ class SpdyWriteQueueTest : public ::testing::Test {};
// Makes a SpdyFrameProducer producing a frame with the data in the
// given string.
-scoped_ptr<SpdyFrameProducer> StringToProducer(const std::string& s) {
+scoped_ptr<SpdyBufferProducer> StringToProducer(const std::string& s) {
scoped_ptr<char[]> data(new char[s.size()]);
std::memcpy(data.get(), s.data(), s.size());
- return scoped_ptr<SpdyFrameProducer>(
- new SimpleFrameProducer(
- scoped_ptr<SpdyFrame>(
- new SpdyFrame(data.release(), s.size(), true))));
+ return scoped_ptr<SpdyBufferProducer>(
+ new SimpleBufferProducer(
+ scoped_ptr<SpdyBuffer>(
+ new SpdyBuffer(
+ scoped_ptr<SpdyFrame>(
+ new SpdyFrame(data.release(), s.size(), true))))));
}
-// Makes a SpdyFrameProducer producing a frame with the data in the
+// Makes a SpdyBufferProducer producing a frame with the data in the
// given int (converted to a string).
-scoped_ptr<SpdyFrameProducer> IntToProducer(int i) {
+scoped_ptr<SpdyBufferProducer> IntToProducer(int i) {
return StringToProducer(base::IntToString(i));
}
// Produces a frame with the given producer and returns a copy of its
// data as a string.
-std::string ProducerToString(scoped_ptr<SpdyFrameProducer> producer) {
- scoped_ptr<SpdyFrame> frame = producer->ProduceFrame();
- return std::string(frame->data(), frame->size());
+std::string ProducerToString(scoped_ptr<SpdyBufferProducer> producer) {
+ scoped_ptr<SpdyBuffer> buffer = producer->ProduceBuffer();
+ return std::string(buffer->GetRemainingData(), buffer->GetRemainingSize());
}
// Produces a frame with the given producer and returns a copy of its
// data as an int (converted from a string).
-int ProducerToInt(scoped_ptr<SpdyFrameProducer> producer) {
+int ProducerToInt(scoped_ptr<SpdyBufferProducer> producer) {
int i = 0;
EXPECT_TRUE(base::StringToInt(ProducerToString(producer.Pass()), &i));
return i;
@@ -69,9 +71,9 @@ SpdyStream* MakeTestStream(RequestPriority priority) {
TEST_F(SpdyWriteQueueTest, DequeuesByPriority) {
SpdyWriteQueue write_queue;
- scoped_ptr<SpdyFrameProducer> producer_low = StringToProducer("LOW");
- scoped_ptr<SpdyFrameProducer> producer_medium = StringToProducer("MEDIUM");
- scoped_ptr<SpdyFrameProducer> producer_highest = StringToProducer("HIGHEST");
+ scoped_ptr<SpdyBufferProducer> producer_low = StringToProducer("LOW");
+ scoped_ptr<SpdyBufferProducer> producer_medium = StringToProducer("MEDIUM");
+ scoped_ptr<SpdyBufferProducer> producer_highest = StringToProducer("HIGHEST");
// A NULL stream should still work.
scoped_refptr<SpdyStream> stream_low(NULL);
@@ -86,7 +88,7 @@ TEST_F(SpdyWriteQueueTest, DequeuesByPriority) {
HIGHEST, RST_STREAM, producer_highest.Pass(), stream_highest);
SpdyFrameType frame_type = DATA;
- scoped_ptr<SpdyFrameProducer> frame_producer;
+ scoped_ptr<SpdyBufferProducer> frame_producer;
scoped_refptr<SpdyStream> stream;
ASSERT_TRUE(write_queue.Dequeue(&frame_type, &frame_producer, &stream));
EXPECT_EQ(RST_STREAM, frame_type);
@@ -111,9 +113,9 @@ TEST_F(SpdyWriteQueueTest, DequeuesByPriority) {
TEST_F(SpdyWriteQueueTest, DequeuesFIFO) {
SpdyWriteQueue write_queue;
- scoped_ptr<SpdyFrameProducer> producer1 = IntToProducer(1);
- scoped_ptr<SpdyFrameProducer> producer2 = IntToProducer(2);
- scoped_ptr<SpdyFrameProducer> producer3 = IntToProducer(3);
+ scoped_ptr<SpdyBufferProducer> producer1 = IntToProducer(1);
+ scoped_ptr<SpdyBufferProducer> producer2 = IntToProducer(2);
+ scoped_ptr<SpdyBufferProducer> producer3 = IntToProducer(3);
scoped_refptr<SpdyStream> stream1(MakeTestStream(DEFAULT_PRIORITY));
scoped_refptr<SpdyStream> stream2(MakeTestStream(DEFAULT_PRIORITY));
@@ -124,7 +126,7 @@ TEST_F(SpdyWriteQueueTest, DequeuesFIFO) {
write_queue.Enqueue(DEFAULT_PRIORITY, RST_STREAM, producer3.Pass(), stream3);
SpdyFrameType frame_type = DATA;
- scoped_ptr<SpdyFrameProducer> frame_producer;
+ scoped_ptr<SpdyBufferProducer> frame_producer;
scoped_refptr<SpdyStream> stream;
ASSERT_TRUE(write_queue.Dequeue(&frame_type, &frame_producer, &stream));
EXPECT_EQ(SYN_STREAM, frame_type);
@@ -162,7 +164,7 @@ TEST_F(SpdyWriteQueueTest, RemovePendingWritesForStream) {
for (int i = 0; i < 100; i += 3) {
SpdyFrameType frame_type = DATA;
- scoped_ptr<SpdyFrameProducer> frame_producer;
+ scoped_ptr<SpdyBufferProducer> frame_producer;
scoped_refptr<SpdyStream> stream;
ASSERT_TRUE(write_queue.Dequeue(&frame_type, &frame_producer, &stream));
EXPECT_EQ(SYN_STREAM, frame_type);
@@ -171,7 +173,7 @@ TEST_F(SpdyWriteQueueTest, RemovePendingWritesForStream) {
}
SpdyFrameType frame_type = DATA;
- scoped_ptr<SpdyFrameProducer> frame_producer;
+ scoped_ptr<SpdyBufferProducer> frame_producer;
scoped_refptr<SpdyStream> stream;
EXPECT_FALSE(write_queue.Dequeue(&frame_type, &frame_producer, &stream));
}
@@ -189,11 +191,11 @@ TEST_F(SpdyWriteQueueTest, Clear) {
write_queue.Clear();
SpdyFrameType frame_type = DATA;
- scoped_ptr<SpdyFrameProducer> frame_producer;
+ scoped_ptr<SpdyBufferProducer> frame_producer;
scoped_refptr<SpdyStream> stream;
EXPECT_FALSE(write_queue.Dequeue(&frame_type, &frame_producer, &stream));
}
-}
+} // namespace
} // namespace net