summaryrefslogtreecommitdiffstats
path: root/content/browser/streams
diff options
context:
space:
mode:
authortyoshino@chromium.org <tyoshino@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2013-08-22 07:43:24 +0000
committertyoshino@chromium.org <tyoshino@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2013-08-22 07:43:24 +0000
commit07516269ca14f8ef4cf6bfaa721f169290ef7318 (patch)
tree48b5db5a9942032ea55e0bda6187d4afc1fa6935 /content/browser/streams
parent10fbd595e72eb145399ebf94dbd472281cc2b053 (diff)
downloadchromium_src-07516269ca14f8ef4cf6bfaa721f169290ef7318.zip
chromium_src-07516269ca14f8ef4cf6bfaa721f169290ef7318.tar.gz
chromium_src-07516269ca14f8ef4cf6bfaa721f169290ef7318.tar.bz2
Limit the total memory usage for Stream instances
Stream instances report their memory usage to StreamRegistry to get approval. If rejected, they unregisters themselves. writer_ and reader_ are cleared immediately when memory usage violation happens. Added task runner DCHECKs to ByteStreamReaderImpl and ByteStreamWriterImpl so that we can check we're not violating thread restriction. BUG=169957 Review URL: https://chromiumcodereview.appspot.com/22908008 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@218933 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'content/browser/streams')
-rw-r--r--content/browser/streams/stream.cc37
-rw-r--r--content/browser/streams/stream.h11
-rw-r--r--content/browser/streams/stream_registry.cc41
-rw-r--r--content/browser/streams/stream_registry.h21
-rw-r--r--content/browser/streams/stream_unittest.cc55
5 files changed, 160 insertions, 5 deletions
diff --git a/content/browser/streams/stream.cc b/content/browser/streams/stream.cc
index 6026df9..a9d43332 100644
--- a/content/browser/streams/stream.cc
+++ b/content/browser/streams/stream.cc
@@ -27,6 +27,7 @@ Stream::Stream(StreamRegistry* registry,
can_add_data_(true),
url_(url),
data_length_(0),
+ last_total_buffered_bytes_(0),
registry_(registry),
read_observer_(NULL),
write_observer_(write_observer),
@@ -67,19 +68,46 @@ void Stream::RemoveWriteObserver(StreamWriteObserver* observer) {
write_observer_ = NULL;
}
+void Stream::Abort() {
+ // Clear all buffer. It's safe to clear reader_ here since the same thread
+ // is used for both input and output operation.
+ writer_.reset();
+ reader_.reset();
+ can_add_data_ = false;
+ registry_->UnregisterStream(url());
+}
+
void Stream::AddData(scoped_refptr<net::IOBuffer> buffer, size_t size) {
+ if (!writer_.get())
+ return;
+
+ size_t current_buffered_bytes = writer_->GetTotalBufferedBytes();
+ if (!registry_->UpdateMemoryUsage(url(), current_buffered_bytes, size)) {
+ Abort();
+ return;
+ }
+
+ // Now it's guaranteed that this doesn't overflow. This must be done before
+ // Write() since GetTotalBufferedBytes() may return different value after
+ // Write() call, so if we use the new value, information in this instance and
+ // one in |registry_| become inconsistent.
+ last_total_buffered_bytes_ = current_buffered_bytes + size;
+
can_add_data_ = writer_->Write(buffer, size);
}
void Stream::AddData(const char* data, size_t size) {
scoped_refptr<net::IOBuffer> io_buffer(new net::IOBuffer(size));
memcpy(io_buffer->data(), data, size);
- can_add_data_ = writer_->Write(io_buffer, size);
+ AddData(io_buffer, size);
}
void Stream::Finalize() {
+ if (!writer_.get())
+ return;
+
writer_->Close(0);
- writer_.reset(NULL);
+ writer_.reset();
// Continue asynchronously.
base::MessageLoopProxy::current()->PostTask(
@@ -95,6 +123,11 @@ Stream::StreamState Stream::ReadRawData(net::IOBuffer* buf,
*bytes_read = 0;
if (!data_.get()) {
+ // TODO(tyoshino): Add STREAM_ABORTED type to tell the reader that this
+ // stream is aborted.
+ if (!reader_.get())
+ return STREAM_EMPTY;
+
data_length_ = 0;
data_bytes_read_ = 0;
ByteStreamReader::StreamState state = reader_->Read(&data_, &data_length_);
diff --git a/content/browser/streams/stream.h b/content/browser/streams/stream.h
index 85edc88..1878adf 100644
--- a/content/browser/streams/stream.h
+++ b/content/browser/streams/stream.h
@@ -81,6 +81,11 @@ class CONTENT_EXPORT Stream : public base::RefCountedThreadSafe<Stream> {
const GURL& url() const { return url_; }
+ // For StreamRegistry to remember the last memory usage reported to it.
+ size_t last_total_buffered_bytes() const {
+ return last_total_buffered_bytes_;
+ }
+
private:
friend class base::RefCountedThreadSafe<Stream>;
@@ -89,6 +94,8 @@ class CONTENT_EXPORT Stream : public base::RefCountedThreadSafe<Stream> {
void OnSpaceAvailable();
void OnDataAvailable();
+ void Abort();
+
size_t data_bytes_read_;
bool can_add_data_;
@@ -97,6 +104,10 @@ class CONTENT_EXPORT Stream : public base::RefCountedThreadSafe<Stream> {
scoped_refptr<net::IOBuffer> data_;
size_t data_length_;
+ // Last value returned by writer_->TotalBufferedBytes() in AddData(). Stored
+ // in order to check memory usage.
+ size_t last_total_buffered_bytes_;
+
scoped_ptr<ByteStreamWriter> writer_;
scoped_ptr<ByteStreamReader> reader_;
diff --git a/content/browser/streams/stream_registry.cc b/content/browser/streams/stream_registry.cc
index 39d24b3..1a83c41 100644
--- a/content/browser/streams/stream_registry.cc
+++ b/content/browser/streams/stream_registry.cc
@@ -8,7 +8,15 @@
namespace content {
-StreamRegistry::StreamRegistry() {
+namespace {
+// The maximum size of memory each StreamRegistry instance is allowed to use
+// for its Stream instances.
+const size_t kDefaultMaxMemoryUsage = 1024 * 1024 * 1024U; // 1GiB
+}
+
+StreamRegistry::StreamRegistry()
+ : total_memory_usage_(0),
+ max_memory_usage_(kDefaultMaxMemoryUsage) {
}
StreamRegistry::~StreamRegistry() {
@@ -42,7 +50,38 @@ bool StreamRegistry::CloneStream(const GURL& url, const GURL& src_url) {
void StreamRegistry::UnregisterStream(const GURL& url) {
DCHECK(CalledOnValidThread());
+
+ StreamMap::iterator iter = streams_.find(url);
+ if (iter == streams_.end())
+ return;
+
+ size_t buffered_bytes = iter->second->last_total_buffered_bytes();
+ DCHECK_LE(buffered_bytes, total_memory_usage_);
+ total_memory_usage_ -= buffered_bytes;
streams_.erase(url);
}
+bool StreamRegistry::UpdateMemoryUsage(const GURL& url,
+ size_t current_size,
+ size_t increase) {
+ DCHECK(CalledOnValidThread());
+
+ StreamMap::iterator iter = streams_.find(url);
+ // A Stream must be registered with its parent registry to get memory.
+ if (iter == streams_.end())
+ return false;
+
+ size_t last_size = iter->second->last_total_buffered_bytes();
+ DCHECK_LE(last_size, total_memory_usage_);
+ size_t usage_of_others = total_memory_usage_ - last_size;
+ DCHECK_LE(current_size, last_size);
+ size_t current_total_memory_usage = usage_of_others + current_size;
+
+ if (increase > max_memory_usage_ - current_total_memory_usage)
+ return false;
+
+ total_memory_usage_ = current_total_memory_usage + increase;
+ return true;
+}
+
} // namespace content
diff --git a/content/browser/streams/stream_registry.h b/content/browser/streams/stream_registry.h
index e75c97c..a359411 100644
--- a/content/browser/streams/stream_registry.h
+++ b/content/browser/streams/stream_registry.h
@@ -32,20 +32,37 @@ class CONTENT_EXPORT StreamRegistry : public base::NonThreadSafe {
void UnregisterStream(const GURL& url);
+ // Called by Stream instances to request increase of memory usage. If the
+ // total memory usage for this registry is going to exceed the limit,
+ // returns false. Otherwise, updates |total_memory_usage_| and returns true.
+ //
+ // |current_size| is the up-to-date size of ByteStream of the Stream instance
+ // and |increase| must be the amount of data going to be added to the Stream
+ // instance.
+ bool UpdateMemoryUsage(const GURL& url, size_t current_size, size_t increase);
+
// Gets the stream associated with |url|. Returns NULL if there is no such
// stream.
scoped_refptr<Stream> GetStream(const GURL& url);
+ void set_max_memory_usage_for_testing(size_t size) {
+ max_memory_usage_ = size;
+ }
+
private:
typedef std::map<GURL, scoped_refptr<Stream> > StreamMap;
StreamMap streams_;
+ size_t total_memory_usage_;
+
+ // Maximum amount of memory allowed to use for Stream instances registered
+ // with this registry.
+ size_t max_memory_usage_;
+
DISALLOW_COPY_AND_ASSIGN(StreamRegistry);
};
} // namespace content
#endif // CONTENT_BROWSER_STREAMS_STREAM_REGISTRY_H_
-
-
diff --git a/content/browser/streams/stream_unittest.cc b/content/browser/streams/stream_unittest.cc
index c0077b7..2e0386b 100644
--- a/content/browser/streams/stream_unittest.cc
+++ b/content/browser/streams/stream_unittest.cc
@@ -250,4 +250,59 @@ TEST_F(StreamTest, UnregisterStream) {
ASSERT_FALSE(stream2.get());
}
+TEST_F(StreamTest, MemoryExceedMemoryUsageLimit) {
+ TestStreamWriter writer1;
+ TestStreamWriter writer2;
+
+ GURL url1("blob://stream");
+ scoped_refptr<Stream> stream1(
+ new Stream(registry_.get(), &writer1, url1));
+
+ GURL url2("blob://stream2");
+ scoped_refptr<Stream> stream2(
+ new Stream(registry_.get(), &writer2, url2));
+
+ const int kMaxMemoryUsage = 1500000;
+ registry_->set_max_memory_usage_for_testing(kMaxMemoryUsage);
+
+ const int kBufferSize = 1000000;
+ scoped_refptr<net::IOBuffer> buffer(NewIOBuffer(kBufferSize));
+ writer1.Write(stream1.get(), buffer, kBufferSize);
+ // Make transfer happen.
+ base::MessageLoop::current()->RunUntilIdle();
+
+ writer2.Write(stream2.get(), buffer, kBufferSize);
+
+ // Written data (1000000 * 2) exceeded limit (1500000). |stream2| should be
+ // unregistered with |registry_|.
+ EXPECT_EQ(NULL, registry_->GetStream(url2).get());
+
+ writer1.Write(stream1.get(), buffer, kMaxMemoryUsage - kBufferSize);
+ // Should be accepted since stream2 is unregistered and the new data is not
+ // so big to exceed the limit.
+ EXPECT_FALSE(registry_->GetStream(url1).get() == NULL);
+}
+
+TEST_F(StreamTest, UnderMemoryUsageLimit) {
+ TestStreamWriter writer;
+ TestStreamReader reader;
+
+ GURL url("blob://stream");
+ scoped_refptr<Stream> stream(new Stream(registry_.get(), &writer, url));
+ EXPECT_TRUE(stream->SetReadObserver(&reader));
+
+ registry_->set_max_memory_usage_for_testing(1500000);
+
+ const int kBufferSize = 1000000;
+ scoped_refptr<net::IOBuffer> buffer(NewIOBuffer(kBufferSize));
+ writer.Write(stream.get(), buffer, kBufferSize);
+
+ // Run loop to make |reader| consume the data.
+ base::MessageLoop::current()->RunUntilIdle();
+
+ writer.Write(stream.get(), buffer, kBufferSize);
+
+ EXPECT_EQ(stream.get(), registry_->GetStream(url).get());
+}
+
} // namespace content