diff options
author | zork@chromium.org <zork@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2013-03-13 04:34:19 +0000 |
---|---|---|
committer | zork@chromium.org <zork@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2013-03-13 04:34:19 +0000 |
commit | bb85a8c45042b238dade29ae80963153790cb491 (patch) | |
tree | 658304ab46ee87e0d8abeff8b666da078e5b62fa | |
parent | 8f9e722ee00bcbe1733b7e2163c098bf8239d516 (diff) | |
download | chromium_src-bb85a8c45042b238dade29ae80963153790cb491.zip chromium_src-bb85a8c45042b238dade29ae80963153790cb491.tar.gz chromium_src-bb85a8c45042b238dade29ae80963153790cb491.tar.bz2 |
Reland r187230: Implement the Stream registry in content
This fixes the memory leaks introduced by the original CL.
BUG=171585
Review URL: https://chromiumcodereview.appspot.com/12637006
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@187777 0039d316-1c4b-4281-b951-d872f2087c98
-rw-r--r-- | content/browser/download/byte_stream.h | 3 | ||||
-rw-r--r-- | content/browser/resource_context_impl.cc | 14 | ||||
-rw-r--r-- | content/browser/resource_context_impl.h | 4 | ||||
-rw-r--r-- | content/browser/streams/stream.cc | 116 | ||||
-rw-r--r-- | content/browser/streams/stream.h | 101 | ||||
-rw-r--r-- | content/browser/streams/stream_context.cc | 57 | ||||
-rw-r--r-- | content/browser/streams/stream_context.h | 60 | ||||
-rw-r--r-- | content/browser/streams/stream_read_observer.h | 24 | ||||
-rw-r--r-- | content/browser/streams/stream_registry.cc | 48 | ||||
-rw-r--r-- | content/browser/streams/stream_registry.h | 51 | ||||
-rw-r--r-- | content/browser/streams/stream_unittest.cc | 207 | ||||
-rw-r--r-- | content/browser/streams/stream_write_observer.h | 24 | ||||
-rw-r--r-- | content/content_browser.gypi | 8 | ||||
-rw-r--r-- | content/content_tests.gypi | 1 |
14 files changed, 718 insertions, 0 deletions
diff --git a/content/browser/download/byte_stream.h b/content/browser/download/byte_stream.h index b8426be..d8302ec 100644 --- a/content/browser/download/byte_stream.h +++ b/content/browser/download/byte_stream.h @@ -21,6 +21,9 @@ class SequencedTaskRunner; namespace content { +// TODO(zork): Move this class out of content/browser/download +// crbug.com/180833 +// // A byte stream is a pipe to transfer bytes between a source and a // sink, which may be on different threads. It is intended to be the // only connection between source and sink; they need have no diff --git a/content/browser/resource_context_impl.cc b/content/browser/resource_context_impl.cc index e9332a0..4480158 100644 --- a/content/browser/resource_context_impl.cc +++ b/content/browser/resource_context_impl.cc @@ -9,6 +9,7 @@ #include "content/browser/host_zoom_map_impl.h" #include "content/browser/loader/resource_dispatcher_host_impl.h" #include "content/browser/loader/resource_request_info_impl.h" +#include "content/browser/streams/stream_context.h" #include "content/browser/webui/url_data_manager_backend.h" #include "content/public/browser/browser_context.h" #include "content/public/browser/browser_thread.h" @@ -22,6 +23,7 @@ namespace { // Key names on ResourceContext. const char kBlobStorageContextKeyName[] = "content_blob_storage_context"; const char kHostZoomMapKeyName[] = "content_host_zoom_map"; +const char kStreamContextKeyName[] = "content_stream_context"; const char kURLDataManagerBackendKeyName[] = "url_data_manager_backend"; class NonOwningZoomData : public base::SupportsUserData::Data { @@ -59,6 +61,13 @@ ChromeBlobStorageContext* GetChromeBlobStorageContextForResourceContext( resource_context, kBlobStorageContextKeyName); } +StreamContext* GetStreamContextForResourceContext( + ResourceContext* resource_context) { + DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO)); + return UserDataAdapter<StreamContext>::Get( + resource_context, kStreamContextKeyName); +} + HostZoomMap* GetHostZoomMapForResourceContext(ResourceContext* context) { DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO)); return static_cast<NonOwningZoomData*>( @@ -85,6 +94,11 @@ void InitializeResourceContext(BrowserContext* browser_context) { new UserDataAdapter<ChromeBlobStorageContext>( ChromeBlobStorageContext::GetFor(browser_context))); + resource_context->SetUserData( + kStreamContextKeyName, + new UserDataAdapter<StreamContext>( + StreamContext::GetFor(browser_context))); + // This object is owned by the BrowserContext and not ResourceContext, so // store a non-owning pointer here. resource_context->SetUserData( diff --git a/content/browser/resource_context_impl.h b/content/browser/resource_context_impl.h index 8c0b844..ed4de4d 100644 --- a/content/browser/resource_context_impl.h +++ b/content/browser/resource_context_impl.h @@ -10,6 +10,7 @@ namespace content { class ChromeBlobStorageContext; +class StreamContext; class BrowserContext; class HostZoomMap; class URLDataManagerBackend; @@ -21,6 +22,9 @@ class URLDataManagerBackend; ChromeBlobStorageContext* GetChromeBlobStorageContextForResourceContext( ResourceContext* resource_context); +StreamContext* GetStreamContextForResourceContext( + ResourceContext* resource_context); + HostZoomMap* GetHostZoomMapForResourceContext(ResourceContext* context); URLDataManagerBackend* GetURLDataManagerForResourceContext( diff --git a/content/browser/streams/stream.cc b/content/browser/streams/stream.cc new file mode 100644 index 0000000..e72a887 --- /dev/null +++ b/content/browser/streams/stream.cc @@ -0,0 +1,116 @@ +// Copyright (c) 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "content/browser/streams/stream.h" + +#include "base/bind.h" +#include "base/message_loop_proxy.h" +#include "content/browser/streams/stream_read_observer.h" +#include "content/browser/streams/stream_registry.h" +#include "content/browser/streams/stream_write_observer.h" +#include "net/base/io_buffer.h" + +namespace { +// Start throttling the connection at about 1MB. +const size_t kDeferSizeThreshold = 40 * 32768; +} + +namespace content { + +Stream::Stream(StreamRegistry* registry, + StreamWriteObserver* write_observer, + const GURL& security_origin, + const GURL& url) + : bytes_read_(0), + can_add_data_(true), + security_origin_(security_origin), + url_(url), + data_length_(0), + registry_(registry), + read_observer_(NULL), + write_observer_(write_observer), + weak_ptr_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)) { + CreateByteStream(base::MessageLoopProxy::current(), + base::MessageLoopProxy::current(), + kDeferSizeThreshold, + &writer_, + &reader_); + + // Setup callback for writing. + writer_->RegisterCallback(base::Bind(&Stream::OnSpaceAvailable, + weak_ptr_factory_.GetWeakPtr())); + reader_->RegisterCallback(base::Bind(&Stream::OnDataAvailable, + weak_ptr_factory_.GetWeakPtr())); + + registry_->RegisterStream(this); +} + +Stream::~Stream() { +} + +bool Stream::SetReadObserver(StreamReadObserver* observer) { + if (read_observer_) + return false; + read_observer_ = observer; + return true; +} + +void Stream::RemoveReadObserver(StreamReadObserver* observer) { + DCHECK(observer == read_observer_); + read_observer_ = NULL; +} + +void Stream::AddData(scoped_refptr<net::IOBuffer> buffer, size_t size) { + can_add_data_ = writer_->Write(buffer, size); +} + +void Stream::Finalize() { + writer_->Close(DOWNLOAD_INTERRUPT_REASON_NONE); + writer_.reset(NULL); + + OnDataAvailable(); +} + +Stream::StreamState Stream::ReadRawData(net::IOBuffer* buf, + int buf_size, + int* bytes_read) { + if (!data_) { + data_length_ = 0; + bytes_read_ = 0; + ByteStreamReader::StreamState state = reader_->Read(&data_, &data_length_); + switch (state) { + case ByteStreamReader::STREAM_HAS_DATA: + break; + case ByteStreamReader::STREAM_COMPLETE: + registry_->UnregisterStream(url()); + return STREAM_COMPLETE; + case ByteStreamReader::STREAM_EMPTY: + return STREAM_EMPTY; + } + } + + size_t remaining_bytes = data_length_ - bytes_read_; + size_t to_read = + static_cast<size_t>(buf_size) < remaining_bytes ? + buf_size : remaining_bytes; + memcpy(buf->data(), data_->data() + bytes_read_, to_read); + bytes_read_ += to_read; + if (bytes_read_ >= data_length_) + data_ = NULL; + + *bytes_read = to_read; + return STREAM_HAS_DATA; +} + +void Stream::OnSpaceAvailable() { + can_add_data_ = true; + write_observer_->OnSpaceAvailable(this); +} + +void Stream::OnDataAvailable() { + read_observer_->OnDataAvailable(this); +} + +} // namespace content + diff --git a/content/browser/streams/stream.h b/content/browser/streams/stream.h new file mode 100644 index 0000000..3aa389c --- /dev/null +++ b/content/browser/streams/stream.h @@ -0,0 +1,101 @@ +// Copyright (c) 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef CONTENT_BROWSER_STREAMS_STREAM_H_ +#define CONTENT_BROWSER_STREAMS_STREAM_H_ + +#include "base/basictypes.h" +#include "base/memory/ref_counted.h" +#include "base/memory/weak_ptr.h" +#include "content/browser/download/byte_stream.h" +#include "content/common/content_export.h" +#include "googleurl/src/gurl.h" + +namespace net { +class IOBuffer; +} + +namespace content { + +class StreamReadObserver; +class StreamRegistry; +class StreamWriteObserver; + +// A stream that sends data from an arbitrary source to an internal URL +// that can be read by an internal consumer. It will continue to pull from the +// original URL as long as there is data available. It can be read from +// multiple clients, but only one can be reading at a time. This allows a +// reader to consume part of the stream, then pass it along to another client +// to continue processing the stream. +class CONTENT_EXPORT Stream : public base::RefCountedThreadSafe<Stream> { + public: + enum StreamState { + STREAM_HAS_DATA, + STREAM_COMPLETE, + STREAM_EMPTY, + }; + + // Creates a stream useable from the |security_origin|. + Stream(StreamRegistry* registry, + StreamWriteObserver* write_observer, + const GURL& security_origin, + const GURL& url); + + // Sets the reader of this stream. Returns true on success, or false if there + // is already a reader. + bool SetReadObserver(StreamReadObserver* observer); + + // Removes the read observer. |observer| must be the current observer. + void RemoveReadObserver(StreamReadObserver* observer); + + // Adds the data in |buffer| to the stream. Takes ownership of |buffer|. + void AddData(scoped_refptr<net::IOBuffer> buffer, size_t size); + + // Notifies this stream that it will not be receiving any more data. + void Finalize(); + + // Reads a maximum of |buf_size| from the stream into |buf|. Sets + // |*bytes_read| to the number of bytes actually read. + // Returns STREAM_HAS_DATA if data was read, STREAM_EMPTY if no data was read, + // and STREAM_COMPLETE if the stream is finalized and all data has been read. + StreamState ReadRawData(net::IOBuffer* buf, int buf_size, int* bytes_read); + + // Indicates whether there is space in the buffer to add more data. + bool can_add_data() const { return can_add_data_; } + + const GURL& url() const { return url_; } + + const GURL& security_origin() const { return security_origin_; } + + private: + friend class base::RefCountedThreadSafe<Stream>; + + virtual ~Stream(); + + void OnSpaceAvailable(); + void OnDataAvailable(); + + size_t bytes_read_; + bool can_add_data_; + + GURL security_origin_; + GURL url_; + + scoped_refptr<net::IOBuffer> data_; + size_t data_length_; + + scoped_ptr<ByteStreamWriter> writer_; + scoped_ptr<ByteStreamReader> reader_; + + StreamRegistry* registry_; + StreamReadObserver* read_observer_; + StreamWriteObserver* write_observer_; + + base::WeakPtrFactory<Stream> weak_ptr_factory_; + DISALLOW_COPY_AND_ASSIGN(Stream); +}; + +} // namespace content + +#endif // CONTENT_BROWSER_STREAMS_STREAM_H_ diff --git a/content/browser/streams/stream_context.cc b/content/browser/streams/stream_context.cc new file mode 100644 index 0000000..ef3a79d --- /dev/null +++ b/content/browser/streams/stream_context.cc @@ -0,0 +1,57 @@ +// Copyright (c) 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "content/browser/streams/stream_context.h" + +#include "base/bind.h" +#include "content/browser/streams/stream_registry.h" +#include "content/public/browser/browser_context.h" + +using base::UserDataAdapter; + +namespace { +const char* kStreamContextKeyName = "content_stream_context"; +} + +namespace content { + +StreamContext::StreamContext() {} + +StreamContext* StreamContext::GetFor(BrowserContext* context) { + if (!context->GetUserData(kStreamContextKeyName)) { + scoped_refptr<StreamContext> stream = new StreamContext(); + context->SetUserData(kStreamContextKeyName, + new UserDataAdapter<StreamContext>(stream)); + // Check first to avoid memory leak in unittests. + if (BrowserThread::IsMessageLoopValid(BrowserThread::IO)) { + BrowserThread::PostTask( + BrowserThread::IO, FROM_HERE, + base::Bind(&StreamContext::InitializeOnIOThread, stream)); + } + } + + return UserDataAdapter<StreamContext>::Get(context, kStreamContextKeyName); +} + +void StreamContext::InitializeOnIOThread() { + DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO)); + registry_.reset(new StreamRegistry()); +} + +StreamContext::~StreamContext() {} + +void StreamContext::DeleteOnCorrectThread() const { + // In many tests, there isn't a valid IO thread. In that case, just delete on + // the current thread. + // TODO(zork): Remove this custom deleter, and fix the leaks in all the + // tests. + if (BrowserThread::IsMessageLoopValid(BrowserThread::IO) && + !BrowserThread::CurrentlyOn(BrowserThread::IO)) { + BrowserThread::DeleteSoon(BrowserThread::IO, FROM_HERE, this); + return; + } + delete this; +} + +} // namespace content diff --git a/content/browser/streams/stream_context.h b/content/browser/streams/stream_context.h new file mode 100644 index 0000000..7b8adcbd --- /dev/null +++ b/content/browser/streams/stream_context.h @@ -0,0 +1,60 @@ +// Copyright (c) 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef CONTENT_BROWSER_STREAMS_STREAM_CONTEXT_H_ +#define CONTENT_BROWSER_STREAMS_STREAM_CONTEXT_H_ + +#include "base/memory/ref_counted.h" +#include "base/memory/scoped_ptr.h" +#include "base/sequenced_task_runner_helpers.h" +#include "content/public/browser/browser_thread.h" + +namespace content { +class BrowserContext; +class StreamRegistry; +struct StreamContextDeleter; + +// A context class that keeps track of StreamRegistry used by the chrome. +// There is an instance associated with each BrowserContext. There could be +// multiple URLRequestContexts in the same browser context that refers to the +// same instance. +// +// All methods, except the ctor, are expected to be called on +// the IO thread (unless specifically called out in doc comments). +class StreamContext + : public base::RefCountedThreadSafe<StreamContext, + StreamContextDeleter> { + public: + StreamContext(); + + static StreamContext* GetFor(BrowserContext* browser_context); + + void InitializeOnIOThread(); + + StreamRegistry* registry() const { return registry_.get(); } + + protected: + virtual ~StreamContext(); + + private: + friend class base::DeleteHelper<StreamContext>; + friend class base::RefCountedThreadSafe<StreamContext, + StreamContextDeleter>; + friend struct StreamContextDeleter; + + void DeleteOnCorrectThread() const; + + scoped_ptr<StreamRegistry> registry_; +}; + +struct StreamContextDeleter { + static void Destruct(const StreamContext* context) { + context->DeleteOnCorrectThread(); + } +}; + +} // namespace content + +#endif // CONTENT_BROWSER_STREAMS_STREAM_CONTEXT_H_ + diff --git a/content/browser/streams/stream_read_observer.h b/content/browser/streams/stream_read_observer.h new file mode 100644 index 0000000..a2b1bdc --- /dev/null +++ b/content/browser/streams/stream_read_observer.h @@ -0,0 +1,24 @@ +// Copyright (c) 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef CONTENT_BROWSER_STREAMS_STREAM_READ_OBSERVER_H_ +#define CONTENT_BROWSER_STREAMS_STREAM_READ_OBSERVER_H_ + +namespace content { + +class Stream; + +class StreamReadObserver { + public: + // Sent when there is data available to be read from the stream. + virtual void OnDataAvailable(Stream* stream) = 0; + + protected: + virtual ~StreamReadObserver() {} +}; + +} // namespace content + +#endif // CONTENT_BROWSER_STREAMS_STREAM_READ_OBSERVER_H_ + diff --git a/content/browser/streams/stream_registry.cc b/content/browser/streams/stream_registry.cc new file mode 100644 index 0000000..7159831 --- /dev/null +++ b/content/browser/streams/stream_registry.cc @@ -0,0 +1,48 @@ +// Copyright (c) 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "content/browser/streams/stream_registry.h" + +#include "content/browser/streams/stream.h" + +namespace content { + +StreamRegistry::StreamRegistry() { +} + +StreamRegistry::~StreamRegistry() { +} + +void StreamRegistry::RegisterStream(scoped_refptr<Stream> stream) { + DCHECK(CalledOnValidThread()); + DCHECK(stream); + DCHECK(!stream->url().is_empty()); + streams_[stream->url()] = stream; +} + +scoped_refptr<Stream> StreamRegistry::GetStream(const GURL& url) { + DCHECK(CalledOnValidThread()); + StreamMap::const_iterator stream = streams_.find(url); + if (stream != streams_.end()) + return stream->second; + + return NULL; +} + +bool StreamRegistry::CloneStream(const GURL& url, const GURL& src_url) { + DCHECK(CalledOnValidThread()); + scoped_refptr<Stream> stream(GetStream(src_url)); + if (stream) { + streams_[url] = stream; + return true; + } + return false; +} + +void StreamRegistry::UnregisterStream(const GURL& url) { + DCHECK(CalledOnValidThread()); + streams_.erase(url); +} + +} // namespace content diff --git a/content/browser/streams/stream_registry.h b/content/browser/streams/stream_registry.h new file mode 100644 index 0000000..eaab7ef --- /dev/null +++ b/content/browser/streams/stream_registry.h @@ -0,0 +1,51 @@ +// Copyright (c) 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef CONTENT_BROWSER_STREAMS_STREAM_REGISTRY_H_ +#define CONTENT_BROWSER_STREAMS_STREAM_REGISTRY_H_ + +#include <map> + +#include "base/basictypes.h" +#include "base/memory/ref_counted.h" +#include "base/threading/non_thread_safe.h" +#include "content/common/content_export.h" +#include "googleurl/src/gurl.h" + +namespace content { + +class Stream; + +// Maintains a mapping of blob: URLs to active streams. +class CONTENT_EXPORT StreamRegistry : public base::NonThreadSafe { + public: + StreamRegistry(); + virtual ~StreamRegistry(); + + // Registers a stream, and sets its URL. + void RegisterStream(scoped_refptr<Stream> stream); + + // Clones a stream. Returns true on success, or false if |src_url| doesn't + // exist. + bool CloneStream(const GURL& url, const GURL& src_url); + + void UnregisterStream(const GURL& url); + + // Gets the stream associated with |url|. Returns NULL if there is no such + // stream. + scoped_refptr<Stream> GetStream(const GURL& url); + + private: + typedef std::map<GURL, scoped_refptr<Stream> > StreamMap; + + StreamMap streams_; + + DISALLOW_COPY_AND_ASSIGN(StreamRegistry); +}; + +} // namespace content + +#endif // CONTENT_BROWSER_STREAMS_STREAM_REGISTRY_H_ + + diff --git a/content/browser/streams/stream_unittest.cc b/content/browser/streams/stream_unittest.cc new file mode 100644 index 0000000..62f3e3e --- /dev/null +++ b/content/browser/streams/stream_unittest.cc @@ -0,0 +1,207 @@ +// Copyright (c) 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/message_loop.h" +#include "base/test/test_simple_task_runner.h" +#include "content/browser/streams/stream.h" +#include "content/browser/streams/stream_read_observer.h" +#include "content/browser/streams/stream_registry.h" +#include "content/browser/streams/stream_write_observer.h" +#include "testing/gtest/include/gtest/gtest.h" + +namespace content { + +class StreamTest : public testing::Test { + public: + StreamTest() : producing_seed_key_(0) {} + + virtual void SetUp() OVERRIDE { + registry_.reset(new StreamRegistry()); + } + + // Create a new IO buffer of the given |buffer_size| and fill it with random + // data. + scoped_refptr<net::IOBuffer> NewIOBuffer(size_t buffer_size) { + scoped_refptr<net::IOBuffer> buffer(new net::IOBuffer(buffer_size)); + char *bufferp = buffer->data(); + for (size_t i = 0; i < buffer_size; i++) + bufferp[i] = (i + producing_seed_key_) % (1 << sizeof(char)); + ++producing_seed_key_; + return buffer; + } + + protected: + MessageLoop message_loop_; + scoped_ptr<StreamRegistry> registry_; + + private: + int producing_seed_key_; +}; + +class TestStreamReader : public StreamReadObserver { + public: + TestStreamReader() : buffer_(new net::GrowableIOBuffer()) { + } + virtual ~TestStreamReader() {} + + void Read(Stream* stream) { + const size_t kBufferSize = 32768; + scoped_refptr<net::IOBuffer> buffer(new net::IOBuffer(kBufferSize)); + + int bytes_read = 0; + while (stream->ReadRawData(buffer, kBufferSize, &bytes_read) == + Stream::STREAM_HAS_DATA) { + size_t old_capacity = buffer_->capacity(); + buffer_->SetCapacity(old_capacity + bytes_read); + memcpy(buffer_->StartOfBuffer() + old_capacity, + buffer->data(), bytes_read); + } + } + + virtual void OnDataAvailable(Stream* stream) OVERRIDE { + Read(stream); + } + + scoped_refptr<net::GrowableIOBuffer> buffer() { return buffer_; } + + private: + scoped_refptr<net::GrowableIOBuffer> buffer_; +}; + +class TestStreamWriter : public StreamWriteObserver { + public: + TestStreamWriter() {} + virtual ~TestStreamWriter() {} + + void Write(Stream* stream, + scoped_refptr<net::IOBuffer> buffer, + size_t buffer_size) { + stream->AddData(buffer, buffer_size); + } + + virtual void OnSpaceAvailable(Stream* stream) OVERRIDE { + } +}; + +TEST_F(StreamTest, SetReadObserver) { + TestStreamReader reader; + TestStreamWriter writer; + + GURL url("blob://stream"); + scoped_refptr<Stream> stream( + new Stream(registry_.get(), &writer, GURL(), url)); + EXPECT_TRUE(stream->SetReadObserver(&reader)); +} + +TEST_F(StreamTest, SetReadObserver_SecondFails) { + TestStreamReader reader1; + TestStreamReader reader2; + TestStreamWriter writer; + + GURL url("blob://stream"); + scoped_refptr<Stream> stream( + new Stream(registry_.get(), &writer, GURL(), url)); + EXPECT_TRUE(stream->SetReadObserver(&reader1)); + EXPECT_FALSE(stream->SetReadObserver(&reader2)); +} + +TEST_F(StreamTest, SetReadObserver_TwoReaders) { + TestStreamReader reader1; + TestStreamReader reader2; + TestStreamWriter writer; + + GURL url("blob://stream"); + scoped_refptr<Stream> stream( + new Stream(registry_.get(), &writer, GURL(), url)); + EXPECT_TRUE(stream->SetReadObserver(&reader1)); + + // Once the first read observer is removed, a new one can be added. + stream->RemoveReadObserver(&reader1); + EXPECT_TRUE(stream->SetReadObserver(&reader2)); +} + +TEST_F(StreamTest, Stream) { + TestStreamReader reader; + TestStreamWriter writer; + + GURL url("blob://stream"); + scoped_refptr<Stream> stream( + new Stream(registry_.get(), &writer, GURL(), url)); + EXPECT_TRUE(stream->SetReadObserver(&reader)); + + const int kBufferSize = 1000000; + scoped_refptr<net::IOBuffer> buffer(NewIOBuffer(kBufferSize)); + writer.Write(stream, buffer, kBufferSize); + stream->Finalize(); + reader.Read(stream); + MessageLoop::current()->RunUntilIdle(); + + ASSERT_EQ(reader.buffer()->capacity(), kBufferSize); + for (int i = 0; i < kBufferSize; i++) + EXPECT_EQ(buffer->data()[i], reader.buffer()->data()[i]); +} + +TEST_F(StreamTest, GetStream) { + TestStreamWriter writer; + + GURL url("blob://stream"); + scoped_refptr<Stream> stream1( + new Stream(registry_.get(), &writer, GURL(), url)); + + scoped_refptr<Stream> stream2 = registry_->GetStream(url); + ASSERT_EQ(stream1, stream2); +} + +TEST_F(StreamTest, GetStream_Missing) { + TestStreamWriter writer; + + GURL url1("blob://stream"); + scoped_refptr<Stream> stream1( + new Stream(registry_.get(), &writer, GURL(), url1)); + + GURL url2("blob://stream2"); + scoped_refptr<Stream> stream2 = registry_->GetStream(url2); + ASSERT_FALSE(stream2); +} + +TEST_F(StreamTest, CloneStream) { + TestStreamWriter writer; + + GURL url1("blob://stream"); + scoped_refptr<Stream> stream1( + new Stream(registry_.get(), &writer, GURL(), url1)); + + GURL url2("blob://stream2"); + ASSERT_TRUE(registry_->CloneStream(url2, url1)); + scoped_refptr<Stream> stream2 = registry_->GetStream(url2); + ASSERT_EQ(stream1, stream2); +} + +TEST_F(StreamTest, CloneStream_Missing) { + TestStreamWriter writer; + + GURL url1("blob://stream"); + scoped_refptr<Stream> stream1( + new Stream(registry_.get(), &writer, GURL(), url1)); + + GURL url2("blob://stream2"); + GURL url3("blob://stream3"); + ASSERT_FALSE(registry_->CloneStream(url2, url3)); + scoped_refptr<Stream> stream2 = registry_->GetStream(url2); + ASSERT_FALSE(stream2); +} + +TEST_F(StreamTest, UnregisterStream) { + TestStreamWriter writer; + + GURL url("blob://stream"); + scoped_refptr<Stream> stream1( + new Stream(registry_.get(), &writer, GURL(), url)); + + registry_->UnregisterStream(url); + scoped_refptr<Stream> stream2 = registry_->GetStream(url); + ASSERT_FALSE(stream2); +} + +} // namespace content diff --git a/content/browser/streams/stream_write_observer.h b/content/browser/streams/stream_write_observer.h new file mode 100644 index 0000000..31d84cd --- /dev/null +++ b/content/browser/streams/stream_write_observer.h @@ -0,0 +1,24 @@ +// Copyright (c) 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef CONTENT_BROWSER_STREAMS_STREAM_WRITE_OBSERVER_H_ +#define CONTENT_BROWSER_STREAMS_STREAM_WRITE_OBSERVER_H_ + +namespace content { + +class Stream; + +class StreamWriteObserver { + public: + // Sent when space becomes available in the stream, and the source should + // resume writing. + virtual void OnSpaceAvailable(Stream* stream) = 0; + + protected: + virtual ~StreamWriteObserver() {} +}; + +} // namespace content + +#endif // CONTENT_BROWSER_STREAMS_STREAM_WRITE_OBSERVER_H_ diff --git a/content/content_browser.gypi b/content/content_browser.gypi index 332e4f7..966e996 100644 --- a/content/content_browser.gypi +++ b/content/content_browser.gypi @@ -870,6 +870,14 @@ 'browser/ssl/ssl_policy.h', 'browser/ssl/ssl_request_info.cc', 'browser/ssl/ssl_request_info.h', + 'browser/streams/stream.cc', + 'browser/streams/stream.h', + 'browser/streams/stream_context.cc', + 'browser/streams/stream_context.h', + 'browser/streams/stream_read_observer.h', + 'browser/streams/stream_registry.cc', + 'browser/streams/stream_registry.h', + 'browser/streams/stream_write_observer.h', 'browser/storage_partition_impl.cc', 'browser/storage_partition_impl.h', 'browser/storage_partition_impl_map.cc', diff --git a/content/content_tests.gypi b/content/content_tests.gypi index 2a3d515..451470f 100644 --- a/content/content_tests.gypi +++ b/content/content_tests.gypi @@ -334,6 +334,7 @@ 'browser/speech/speech_recognizer_unittest.cc', 'browser/ssl/ssl_host_state_unittest.cc', 'browser/storage_partition_impl_map_unittest.cc', + 'browser/streams/stream_unittest.cc', 'browser/system_message_window_win_unittest.cc', 'browser/tracing/trace_subscriber_stdio_unittest.cc', 'browser/web_contents/navigation_controller_impl_unittest.cc', |