summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorzork@chromium.org <zork@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2013-03-13 04:34:19 +0000
committerzork@chromium.org <zork@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2013-03-13 04:34:19 +0000
commitbb85a8c45042b238dade29ae80963153790cb491 (patch)
tree658304ab46ee87e0d8abeff8b666da078e5b62fa
parent8f9e722ee00bcbe1733b7e2163c098bf8239d516 (diff)
downloadchromium_src-bb85a8c45042b238dade29ae80963153790cb491.zip
chromium_src-bb85a8c45042b238dade29ae80963153790cb491.tar.gz
chromium_src-bb85a8c45042b238dade29ae80963153790cb491.tar.bz2
Reland r187230: Implement the Stream registry in content
This fixes the memory leaks introduced by the original CL. BUG=171585 Review URL: https://chromiumcodereview.appspot.com/12637006 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@187777 0039d316-1c4b-4281-b951-d872f2087c98
-rw-r--r--content/browser/download/byte_stream.h3
-rw-r--r--content/browser/resource_context_impl.cc14
-rw-r--r--content/browser/resource_context_impl.h4
-rw-r--r--content/browser/streams/stream.cc116
-rw-r--r--content/browser/streams/stream.h101
-rw-r--r--content/browser/streams/stream_context.cc57
-rw-r--r--content/browser/streams/stream_context.h60
-rw-r--r--content/browser/streams/stream_read_observer.h24
-rw-r--r--content/browser/streams/stream_registry.cc48
-rw-r--r--content/browser/streams/stream_registry.h51
-rw-r--r--content/browser/streams/stream_unittest.cc207
-rw-r--r--content/browser/streams/stream_write_observer.h24
-rw-r--r--content/content_browser.gypi8
-rw-r--r--content/content_tests.gypi1
14 files changed, 718 insertions, 0 deletions
diff --git a/content/browser/download/byte_stream.h b/content/browser/download/byte_stream.h
index b8426be..d8302ec 100644
--- a/content/browser/download/byte_stream.h
+++ b/content/browser/download/byte_stream.h
@@ -21,6 +21,9 @@ class SequencedTaskRunner;
namespace content {
+// TODO(zork): Move this class out of content/browser/download
+// crbug.com/180833
+//
// A byte stream is a pipe to transfer bytes between a source and a
// sink, which may be on different threads. It is intended to be the
// only connection between source and sink; they need have no
diff --git a/content/browser/resource_context_impl.cc b/content/browser/resource_context_impl.cc
index e9332a0..4480158 100644
--- a/content/browser/resource_context_impl.cc
+++ b/content/browser/resource_context_impl.cc
@@ -9,6 +9,7 @@
#include "content/browser/host_zoom_map_impl.h"
#include "content/browser/loader/resource_dispatcher_host_impl.h"
#include "content/browser/loader/resource_request_info_impl.h"
+#include "content/browser/streams/stream_context.h"
#include "content/browser/webui/url_data_manager_backend.h"
#include "content/public/browser/browser_context.h"
#include "content/public/browser/browser_thread.h"
@@ -22,6 +23,7 @@ namespace {
// Key names on ResourceContext.
const char kBlobStorageContextKeyName[] = "content_blob_storage_context";
const char kHostZoomMapKeyName[] = "content_host_zoom_map";
+const char kStreamContextKeyName[] = "content_stream_context";
const char kURLDataManagerBackendKeyName[] = "url_data_manager_backend";
class NonOwningZoomData : public base::SupportsUserData::Data {
@@ -59,6 +61,13 @@ ChromeBlobStorageContext* GetChromeBlobStorageContextForResourceContext(
resource_context, kBlobStorageContextKeyName);
}
+StreamContext* GetStreamContextForResourceContext(
+ ResourceContext* resource_context) {
+ DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO));
+ return UserDataAdapter<StreamContext>::Get(
+ resource_context, kStreamContextKeyName);
+}
+
HostZoomMap* GetHostZoomMapForResourceContext(ResourceContext* context) {
DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO));
return static_cast<NonOwningZoomData*>(
@@ -85,6 +94,11 @@ void InitializeResourceContext(BrowserContext* browser_context) {
new UserDataAdapter<ChromeBlobStorageContext>(
ChromeBlobStorageContext::GetFor(browser_context)));
+ resource_context->SetUserData(
+ kStreamContextKeyName,
+ new UserDataAdapter<StreamContext>(
+ StreamContext::GetFor(browser_context)));
+
// This object is owned by the BrowserContext and not ResourceContext, so
// store a non-owning pointer here.
resource_context->SetUserData(
diff --git a/content/browser/resource_context_impl.h b/content/browser/resource_context_impl.h
index 8c0b844..ed4de4d 100644
--- a/content/browser/resource_context_impl.h
+++ b/content/browser/resource_context_impl.h
@@ -10,6 +10,7 @@
namespace content {
class ChromeBlobStorageContext;
+class StreamContext;
class BrowserContext;
class HostZoomMap;
class URLDataManagerBackend;
@@ -21,6 +22,9 @@ class URLDataManagerBackend;
ChromeBlobStorageContext* GetChromeBlobStorageContextForResourceContext(
ResourceContext* resource_context);
+StreamContext* GetStreamContextForResourceContext(
+ ResourceContext* resource_context);
+
HostZoomMap* GetHostZoomMapForResourceContext(ResourceContext* context);
URLDataManagerBackend* GetURLDataManagerForResourceContext(
diff --git a/content/browser/streams/stream.cc b/content/browser/streams/stream.cc
new file mode 100644
index 0000000..e72a887
--- /dev/null
+++ b/content/browser/streams/stream.cc
@@ -0,0 +1,116 @@
+// Copyright (c) 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "content/browser/streams/stream.h"
+
+#include "base/bind.h"
+#include "base/message_loop_proxy.h"
+#include "content/browser/streams/stream_read_observer.h"
+#include "content/browser/streams/stream_registry.h"
+#include "content/browser/streams/stream_write_observer.h"
+#include "net/base/io_buffer.h"
+
+namespace {
+// Start throttling the connection at about 1MB.
+const size_t kDeferSizeThreshold = 40 * 32768;
+}
+
+namespace content {
+
+Stream::Stream(StreamRegistry* registry,
+ StreamWriteObserver* write_observer,
+ const GURL& security_origin,
+ const GURL& url)
+ : bytes_read_(0),
+ can_add_data_(true),
+ security_origin_(security_origin),
+ url_(url),
+ data_length_(0),
+ registry_(registry),
+ read_observer_(NULL),
+ write_observer_(write_observer),
+ weak_ptr_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)) {
+ CreateByteStream(base::MessageLoopProxy::current(),
+ base::MessageLoopProxy::current(),
+ kDeferSizeThreshold,
+ &writer_,
+ &reader_);
+
+ // Setup callback for writing.
+ writer_->RegisterCallback(base::Bind(&Stream::OnSpaceAvailable,
+ weak_ptr_factory_.GetWeakPtr()));
+ reader_->RegisterCallback(base::Bind(&Stream::OnDataAvailable,
+ weak_ptr_factory_.GetWeakPtr()));
+
+ registry_->RegisterStream(this);
+}
+
+Stream::~Stream() {
+}
+
+bool Stream::SetReadObserver(StreamReadObserver* observer) {
+ if (read_observer_)
+ return false;
+ read_observer_ = observer;
+ return true;
+}
+
+void Stream::RemoveReadObserver(StreamReadObserver* observer) {
+ DCHECK(observer == read_observer_);
+ read_observer_ = NULL;
+}
+
+void Stream::AddData(scoped_refptr<net::IOBuffer> buffer, size_t size) {
+ can_add_data_ = writer_->Write(buffer, size);
+}
+
+void Stream::Finalize() {
+ writer_->Close(DOWNLOAD_INTERRUPT_REASON_NONE);
+ writer_.reset(NULL);
+
+ OnDataAvailable();
+}
+
+Stream::StreamState Stream::ReadRawData(net::IOBuffer* buf,
+ int buf_size,
+ int* bytes_read) {
+ if (!data_) {
+ data_length_ = 0;
+ bytes_read_ = 0;
+ ByteStreamReader::StreamState state = reader_->Read(&data_, &data_length_);
+ switch (state) {
+ case ByteStreamReader::STREAM_HAS_DATA:
+ break;
+ case ByteStreamReader::STREAM_COMPLETE:
+ registry_->UnregisterStream(url());
+ return STREAM_COMPLETE;
+ case ByteStreamReader::STREAM_EMPTY:
+ return STREAM_EMPTY;
+ }
+ }
+
+ size_t remaining_bytes = data_length_ - bytes_read_;
+ size_t to_read =
+ static_cast<size_t>(buf_size) < remaining_bytes ?
+ buf_size : remaining_bytes;
+ memcpy(buf->data(), data_->data() + bytes_read_, to_read);
+ bytes_read_ += to_read;
+ if (bytes_read_ >= data_length_)
+ data_ = NULL;
+
+ *bytes_read = to_read;
+ return STREAM_HAS_DATA;
+}
+
+void Stream::OnSpaceAvailable() {
+ can_add_data_ = true;
+ write_observer_->OnSpaceAvailable(this);
+}
+
+void Stream::OnDataAvailable() {
+ read_observer_->OnDataAvailable(this);
+}
+
+} // namespace content
+
diff --git a/content/browser/streams/stream.h b/content/browser/streams/stream.h
new file mode 100644
index 0000000..3aa389c
--- /dev/null
+++ b/content/browser/streams/stream.h
@@ -0,0 +1,101 @@
+// Copyright (c) 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef CONTENT_BROWSER_STREAMS_STREAM_H_
+#define CONTENT_BROWSER_STREAMS_STREAM_H_
+
+#include "base/basictypes.h"
+#include "base/memory/ref_counted.h"
+#include "base/memory/weak_ptr.h"
+#include "content/browser/download/byte_stream.h"
+#include "content/common/content_export.h"
+#include "googleurl/src/gurl.h"
+
+namespace net {
+class IOBuffer;
+}
+
+namespace content {
+
+class StreamReadObserver;
+class StreamRegistry;
+class StreamWriteObserver;
+
+// A stream that sends data from an arbitrary source to an internal URL
+// that can be read by an internal consumer. It will continue to pull from the
+// original URL as long as there is data available. It can be read from
+// multiple clients, but only one can be reading at a time. This allows a
+// reader to consume part of the stream, then pass it along to another client
+// to continue processing the stream.
+class CONTENT_EXPORT Stream : public base::RefCountedThreadSafe<Stream> {
+ public:
+ enum StreamState {
+ STREAM_HAS_DATA,
+ STREAM_COMPLETE,
+ STREAM_EMPTY,
+ };
+
+ // Creates a stream useable from the |security_origin|.
+ Stream(StreamRegistry* registry,
+ StreamWriteObserver* write_observer,
+ const GURL& security_origin,
+ const GURL& url);
+
+ // Sets the reader of this stream. Returns true on success, or false if there
+ // is already a reader.
+ bool SetReadObserver(StreamReadObserver* observer);
+
+ // Removes the read observer. |observer| must be the current observer.
+ void RemoveReadObserver(StreamReadObserver* observer);
+
+ // Adds the data in |buffer| to the stream. Takes ownership of |buffer|.
+ void AddData(scoped_refptr<net::IOBuffer> buffer, size_t size);
+
+ // Notifies this stream that it will not be receiving any more data.
+ void Finalize();
+
+ // Reads a maximum of |buf_size| from the stream into |buf|. Sets
+ // |*bytes_read| to the number of bytes actually read.
+ // Returns STREAM_HAS_DATA if data was read, STREAM_EMPTY if no data was read,
+ // and STREAM_COMPLETE if the stream is finalized and all data has been read.
+ StreamState ReadRawData(net::IOBuffer* buf, int buf_size, int* bytes_read);
+
+ // Indicates whether there is space in the buffer to add more data.
+ bool can_add_data() const { return can_add_data_; }
+
+ const GURL& url() const { return url_; }
+
+ const GURL& security_origin() const { return security_origin_; }
+
+ private:
+ friend class base::RefCountedThreadSafe<Stream>;
+
+ virtual ~Stream();
+
+ void OnSpaceAvailable();
+ void OnDataAvailable();
+
+ size_t bytes_read_;
+ bool can_add_data_;
+
+ GURL security_origin_;
+ GURL url_;
+
+ scoped_refptr<net::IOBuffer> data_;
+ size_t data_length_;
+
+ scoped_ptr<ByteStreamWriter> writer_;
+ scoped_ptr<ByteStreamReader> reader_;
+
+ StreamRegistry* registry_;
+ StreamReadObserver* read_observer_;
+ StreamWriteObserver* write_observer_;
+
+ base::WeakPtrFactory<Stream> weak_ptr_factory_;
+ DISALLOW_COPY_AND_ASSIGN(Stream);
+};
+
+} // namespace content
+
+#endif // CONTENT_BROWSER_STREAMS_STREAM_H_
diff --git a/content/browser/streams/stream_context.cc b/content/browser/streams/stream_context.cc
new file mode 100644
index 0000000..ef3a79d
--- /dev/null
+++ b/content/browser/streams/stream_context.cc
@@ -0,0 +1,57 @@
+// Copyright (c) 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "content/browser/streams/stream_context.h"
+
+#include "base/bind.h"
+#include "content/browser/streams/stream_registry.h"
+#include "content/public/browser/browser_context.h"
+
+using base::UserDataAdapter;
+
+namespace {
+const char* kStreamContextKeyName = "content_stream_context";
+}
+
+namespace content {
+
+StreamContext::StreamContext() {}
+
+StreamContext* StreamContext::GetFor(BrowserContext* context) {
+ if (!context->GetUserData(kStreamContextKeyName)) {
+ scoped_refptr<StreamContext> stream = new StreamContext();
+ context->SetUserData(kStreamContextKeyName,
+ new UserDataAdapter<StreamContext>(stream));
+ // Check first to avoid memory leak in unittests.
+ if (BrowserThread::IsMessageLoopValid(BrowserThread::IO)) {
+ BrowserThread::PostTask(
+ BrowserThread::IO, FROM_HERE,
+ base::Bind(&StreamContext::InitializeOnIOThread, stream));
+ }
+ }
+
+ return UserDataAdapter<StreamContext>::Get(context, kStreamContextKeyName);
+}
+
+void StreamContext::InitializeOnIOThread() {
+ DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO));
+ registry_.reset(new StreamRegistry());
+}
+
+StreamContext::~StreamContext() {}
+
+void StreamContext::DeleteOnCorrectThread() const {
+ // In many tests, there isn't a valid IO thread. In that case, just delete on
+ // the current thread.
+ // TODO(zork): Remove this custom deleter, and fix the leaks in all the
+ // tests.
+ if (BrowserThread::IsMessageLoopValid(BrowserThread::IO) &&
+ !BrowserThread::CurrentlyOn(BrowserThread::IO)) {
+ BrowserThread::DeleteSoon(BrowserThread::IO, FROM_HERE, this);
+ return;
+ }
+ delete this;
+}
+
+} // namespace content
diff --git a/content/browser/streams/stream_context.h b/content/browser/streams/stream_context.h
new file mode 100644
index 0000000..7b8adcbd
--- /dev/null
+++ b/content/browser/streams/stream_context.h
@@ -0,0 +1,60 @@
+// Copyright (c) 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef CONTENT_BROWSER_STREAMS_STREAM_CONTEXT_H_
+#define CONTENT_BROWSER_STREAMS_STREAM_CONTEXT_H_
+
+#include "base/memory/ref_counted.h"
+#include "base/memory/scoped_ptr.h"
+#include "base/sequenced_task_runner_helpers.h"
+#include "content/public/browser/browser_thread.h"
+
+namespace content {
+class BrowserContext;
+class StreamRegistry;
+struct StreamContextDeleter;
+
+// A context class that keeps track of StreamRegistry used by the chrome.
+// There is an instance associated with each BrowserContext. There could be
+// multiple URLRequestContexts in the same browser context that refers to the
+// same instance.
+//
+// All methods, except the ctor, are expected to be called on
+// the IO thread (unless specifically called out in doc comments).
+class StreamContext
+ : public base::RefCountedThreadSafe<StreamContext,
+ StreamContextDeleter> {
+ public:
+ StreamContext();
+
+ static StreamContext* GetFor(BrowserContext* browser_context);
+
+ void InitializeOnIOThread();
+
+ StreamRegistry* registry() const { return registry_.get(); }
+
+ protected:
+ virtual ~StreamContext();
+
+ private:
+ friend class base::DeleteHelper<StreamContext>;
+ friend class base::RefCountedThreadSafe<StreamContext,
+ StreamContextDeleter>;
+ friend struct StreamContextDeleter;
+
+ void DeleteOnCorrectThread() const;
+
+ scoped_ptr<StreamRegistry> registry_;
+};
+
+struct StreamContextDeleter {
+ static void Destruct(const StreamContext* context) {
+ context->DeleteOnCorrectThread();
+ }
+};
+
+} // namespace content
+
+#endif // CONTENT_BROWSER_STREAMS_STREAM_CONTEXT_H_
+
diff --git a/content/browser/streams/stream_read_observer.h b/content/browser/streams/stream_read_observer.h
new file mode 100644
index 0000000..a2b1bdc
--- /dev/null
+++ b/content/browser/streams/stream_read_observer.h
@@ -0,0 +1,24 @@
+// Copyright (c) 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef CONTENT_BROWSER_STREAMS_STREAM_READ_OBSERVER_H_
+#define CONTENT_BROWSER_STREAMS_STREAM_READ_OBSERVER_H_
+
+namespace content {
+
+class Stream;
+
+class StreamReadObserver {
+ public:
+ // Sent when there is data available to be read from the stream.
+ virtual void OnDataAvailable(Stream* stream) = 0;
+
+ protected:
+ virtual ~StreamReadObserver() {}
+};
+
+} // namespace content
+
+#endif // CONTENT_BROWSER_STREAMS_STREAM_READ_OBSERVER_H_
+
diff --git a/content/browser/streams/stream_registry.cc b/content/browser/streams/stream_registry.cc
new file mode 100644
index 0000000..7159831
--- /dev/null
+++ b/content/browser/streams/stream_registry.cc
@@ -0,0 +1,48 @@
+// Copyright (c) 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "content/browser/streams/stream_registry.h"
+
+#include "content/browser/streams/stream.h"
+
+namespace content {
+
+StreamRegistry::StreamRegistry() {
+}
+
+StreamRegistry::~StreamRegistry() {
+}
+
+void StreamRegistry::RegisterStream(scoped_refptr<Stream> stream) {
+ DCHECK(CalledOnValidThread());
+ DCHECK(stream);
+ DCHECK(!stream->url().is_empty());
+ streams_[stream->url()] = stream;
+}
+
+scoped_refptr<Stream> StreamRegistry::GetStream(const GURL& url) {
+ DCHECK(CalledOnValidThread());
+ StreamMap::const_iterator stream = streams_.find(url);
+ if (stream != streams_.end())
+ return stream->second;
+
+ return NULL;
+}
+
+bool StreamRegistry::CloneStream(const GURL& url, const GURL& src_url) {
+ DCHECK(CalledOnValidThread());
+ scoped_refptr<Stream> stream(GetStream(src_url));
+ if (stream) {
+ streams_[url] = stream;
+ return true;
+ }
+ return false;
+}
+
+void StreamRegistry::UnregisterStream(const GURL& url) {
+ DCHECK(CalledOnValidThread());
+ streams_.erase(url);
+}
+
+} // namespace content
diff --git a/content/browser/streams/stream_registry.h b/content/browser/streams/stream_registry.h
new file mode 100644
index 0000000..eaab7ef
--- /dev/null
+++ b/content/browser/streams/stream_registry.h
@@ -0,0 +1,51 @@
+// Copyright (c) 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef CONTENT_BROWSER_STREAMS_STREAM_REGISTRY_H_
+#define CONTENT_BROWSER_STREAMS_STREAM_REGISTRY_H_
+
+#include <map>
+
+#include "base/basictypes.h"
+#include "base/memory/ref_counted.h"
+#include "base/threading/non_thread_safe.h"
+#include "content/common/content_export.h"
+#include "googleurl/src/gurl.h"
+
+namespace content {
+
+class Stream;
+
+// Maintains a mapping of blob: URLs to active streams.
+class CONTENT_EXPORT StreamRegistry : public base::NonThreadSafe {
+ public:
+ StreamRegistry();
+ virtual ~StreamRegistry();
+
+ // Registers a stream, and sets its URL.
+ void RegisterStream(scoped_refptr<Stream> stream);
+
+ // Clones a stream. Returns true on success, or false if |src_url| doesn't
+ // exist.
+ bool CloneStream(const GURL& url, const GURL& src_url);
+
+ void UnregisterStream(const GURL& url);
+
+ // Gets the stream associated with |url|. Returns NULL if there is no such
+ // stream.
+ scoped_refptr<Stream> GetStream(const GURL& url);
+
+ private:
+ typedef std::map<GURL, scoped_refptr<Stream> > StreamMap;
+
+ StreamMap streams_;
+
+ DISALLOW_COPY_AND_ASSIGN(StreamRegistry);
+};
+
+} // namespace content
+
+#endif // CONTENT_BROWSER_STREAMS_STREAM_REGISTRY_H_
+
+
diff --git a/content/browser/streams/stream_unittest.cc b/content/browser/streams/stream_unittest.cc
new file mode 100644
index 0000000..62f3e3e
--- /dev/null
+++ b/content/browser/streams/stream_unittest.cc
@@ -0,0 +1,207 @@
+// Copyright (c) 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "base/message_loop.h"
+#include "base/test/test_simple_task_runner.h"
+#include "content/browser/streams/stream.h"
+#include "content/browser/streams/stream_read_observer.h"
+#include "content/browser/streams/stream_registry.h"
+#include "content/browser/streams/stream_write_observer.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+namespace content {
+
+class StreamTest : public testing::Test {
+ public:
+ StreamTest() : producing_seed_key_(0) {}
+
+ virtual void SetUp() OVERRIDE {
+ registry_.reset(new StreamRegistry());
+ }
+
+ // Create a new IO buffer of the given |buffer_size| and fill it with random
+ // data.
+ scoped_refptr<net::IOBuffer> NewIOBuffer(size_t buffer_size) {
+ scoped_refptr<net::IOBuffer> buffer(new net::IOBuffer(buffer_size));
+ char *bufferp = buffer->data();
+ for (size_t i = 0; i < buffer_size; i++)
+ bufferp[i] = (i + producing_seed_key_) % (1 << sizeof(char));
+ ++producing_seed_key_;
+ return buffer;
+ }
+
+ protected:
+ MessageLoop message_loop_;
+ scoped_ptr<StreamRegistry> registry_;
+
+ private:
+ int producing_seed_key_;
+};
+
+class TestStreamReader : public StreamReadObserver {
+ public:
+ TestStreamReader() : buffer_(new net::GrowableIOBuffer()) {
+ }
+ virtual ~TestStreamReader() {}
+
+ void Read(Stream* stream) {
+ const size_t kBufferSize = 32768;
+ scoped_refptr<net::IOBuffer> buffer(new net::IOBuffer(kBufferSize));
+
+ int bytes_read = 0;
+ while (stream->ReadRawData(buffer, kBufferSize, &bytes_read) ==
+ Stream::STREAM_HAS_DATA) {
+ size_t old_capacity = buffer_->capacity();
+ buffer_->SetCapacity(old_capacity + bytes_read);
+ memcpy(buffer_->StartOfBuffer() + old_capacity,
+ buffer->data(), bytes_read);
+ }
+ }
+
+ virtual void OnDataAvailable(Stream* stream) OVERRIDE {
+ Read(stream);
+ }
+
+ scoped_refptr<net::GrowableIOBuffer> buffer() { return buffer_; }
+
+ private:
+ scoped_refptr<net::GrowableIOBuffer> buffer_;
+};
+
+class TestStreamWriter : public StreamWriteObserver {
+ public:
+ TestStreamWriter() {}
+ virtual ~TestStreamWriter() {}
+
+ void Write(Stream* stream,
+ scoped_refptr<net::IOBuffer> buffer,
+ size_t buffer_size) {
+ stream->AddData(buffer, buffer_size);
+ }
+
+ virtual void OnSpaceAvailable(Stream* stream) OVERRIDE {
+ }
+};
+
+TEST_F(StreamTest, SetReadObserver) {
+ TestStreamReader reader;
+ TestStreamWriter writer;
+
+ GURL url("blob://stream");
+ scoped_refptr<Stream> stream(
+ new Stream(registry_.get(), &writer, GURL(), url));
+ EXPECT_TRUE(stream->SetReadObserver(&reader));
+}
+
+TEST_F(StreamTest, SetReadObserver_SecondFails) {
+ TestStreamReader reader1;
+ TestStreamReader reader2;
+ TestStreamWriter writer;
+
+ GURL url("blob://stream");
+ scoped_refptr<Stream> stream(
+ new Stream(registry_.get(), &writer, GURL(), url));
+ EXPECT_TRUE(stream->SetReadObserver(&reader1));
+ EXPECT_FALSE(stream->SetReadObserver(&reader2));
+}
+
+TEST_F(StreamTest, SetReadObserver_TwoReaders) {
+ TestStreamReader reader1;
+ TestStreamReader reader2;
+ TestStreamWriter writer;
+
+ GURL url("blob://stream");
+ scoped_refptr<Stream> stream(
+ new Stream(registry_.get(), &writer, GURL(), url));
+ EXPECT_TRUE(stream->SetReadObserver(&reader1));
+
+ // Once the first read observer is removed, a new one can be added.
+ stream->RemoveReadObserver(&reader1);
+ EXPECT_TRUE(stream->SetReadObserver(&reader2));
+}
+
+TEST_F(StreamTest, Stream) {
+ TestStreamReader reader;
+ TestStreamWriter writer;
+
+ GURL url("blob://stream");
+ scoped_refptr<Stream> stream(
+ new Stream(registry_.get(), &writer, GURL(), url));
+ EXPECT_TRUE(stream->SetReadObserver(&reader));
+
+ const int kBufferSize = 1000000;
+ scoped_refptr<net::IOBuffer> buffer(NewIOBuffer(kBufferSize));
+ writer.Write(stream, buffer, kBufferSize);
+ stream->Finalize();
+ reader.Read(stream);
+ MessageLoop::current()->RunUntilIdle();
+
+ ASSERT_EQ(reader.buffer()->capacity(), kBufferSize);
+ for (int i = 0; i < kBufferSize; i++)
+ EXPECT_EQ(buffer->data()[i], reader.buffer()->data()[i]);
+}
+
+TEST_F(StreamTest, GetStream) {
+ TestStreamWriter writer;
+
+ GURL url("blob://stream");
+ scoped_refptr<Stream> stream1(
+ new Stream(registry_.get(), &writer, GURL(), url));
+
+ scoped_refptr<Stream> stream2 = registry_->GetStream(url);
+ ASSERT_EQ(stream1, stream2);
+}
+
+TEST_F(StreamTest, GetStream_Missing) {
+ TestStreamWriter writer;
+
+ GURL url1("blob://stream");
+ scoped_refptr<Stream> stream1(
+ new Stream(registry_.get(), &writer, GURL(), url1));
+
+ GURL url2("blob://stream2");
+ scoped_refptr<Stream> stream2 = registry_->GetStream(url2);
+ ASSERT_FALSE(stream2);
+}
+
+TEST_F(StreamTest, CloneStream) {
+ TestStreamWriter writer;
+
+ GURL url1("blob://stream");
+ scoped_refptr<Stream> stream1(
+ new Stream(registry_.get(), &writer, GURL(), url1));
+
+ GURL url2("blob://stream2");
+ ASSERT_TRUE(registry_->CloneStream(url2, url1));
+ scoped_refptr<Stream> stream2 = registry_->GetStream(url2);
+ ASSERT_EQ(stream1, stream2);
+}
+
+TEST_F(StreamTest, CloneStream_Missing) {
+ TestStreamWriter writer;
+
+ GURL url1("blob://stream");
+ scoped_refptr<Stream> stream1(
+ new Stream(registry_.get(), &writer, GURL(), url1));
+
+ GURL url2("blob://stream2");
+ GURL url3("blob://stream3");
+ ASSERT_FALSE(registry_->CloneStream(url2, url3));
+ scoped_refptr<Stream> stream2 = registry_->GetStream(url2);
+ ASSERT_FALSE(stream2);
+}
+
+TEST_F(StreamTest, UnregisterStream) {
+ TestStreamWriter writer;
+
+ GURL url("blob://stream");
+ scoped_refptr<Stream> stream1(
+ new Stream(registry_.get(), &writer, GURL(), url));
+
+ registry_->UnregisterStream(url);
+ scoped_refptr<Stream> stream2 = registry_->GetStream(url);
+ ASSERT_FALSE(stream2);
+}
+
+} // namespace content
diff --git a/content/browser/streams/stream_write_observer.h b/content/browser/streams/stream_write_observer.h
new file mode 100644
index 0000000..31d84cd
--- /dev/null
+++ b/content/browser/streams/stream_write_observer.h
@@ -0,0 +1,24 @@
+// Copyright (c) 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef CONTENT_BROWSER_STREAMS_STREAM_WRITE_OBSERVER_H_
+#define CONTENT_BROWSER_STREAMS_STREAM_WRITE_OBSERVER_H_
+
+namespace content {
+
+class Stream;
+
+class StreamWriteObserver {
+ public:
+ // Sent when space becomes available in the stream, and the source should
+ // resume writing.
+ virtual void OnSpaceAvailable(Stream* stream) = 0;
+
+ protected:
+ virtual ~StreamWriteObserver() {}
+};
+
+} // namespace content
+
+#endif // CONTENT_BROWSER_STREAMS_STREAM_WRITE_OBSERVER_H_
diff --git a/content/content_browser.gypi b/content/content_browser.gypi
index 332e4f7..966e996 100644
--- a/content/content_browser.gypi
+++ b/content/content_browser.gypi
@@ -870,6 +870,14 @@
'browser/ssl/ssl_policy.h',
'browser/ssl/ssl_request_info.cc',
'browser/ssl/ssl_request_info.h',
+ 'browser/streams/stream.cc',
+ 'browser/streams/stream.h',
+ 'browser/streams/stream_context.cc',
+ 'browser/streams/stream_context.h',
+ 'browser/streams/stream_read_observer.h',
+ 'browser/streams/stream_registry.cc',
+ 'browser/streams/stream_registry.h',
+ 'browser/streams/stream_write_observer.h',
'browser/storage_partition_impl.cc',
'browser/storage_partition_impl.h',
'browser/storage_partition_impl_map.cc',
diff --git a/content/content_tests.gypi b/content/content_tests.gypi
index 2a3d515..451470f 100644
--- a/content/content_tests.gypi
+++ b/content/content_tests.gypi
@@ -334,6 +334,7 @@
'browser/speech/speech_recognizer_unittest.cc',
'browser/ssl/ssl_host_state_unittest.cc',
'browser/storage_partition_impl_map_unittest.cc',
+ 'browser/streams/stream_unittest.cc',
'browser/system_message_window_win_unittest.cc',
'browser/tracing/trace_subscriber_stdio_unittest.cc',
'browser/web_contents/navigation_controller_impl_unittest.cc',