diff options
author | hubbe <hubbe@chromium.org> | 2015-11-17 17:07:59 -0800 |
---|---|---|
committer | Commit bot <commit-bot@chromium.org> | 2015-11-18 01:08:49 +0000 |
commit | f8b2532f9f9b4a35f396602bb80dfc34ee102fe4 (patch) | |
tree | 8d89a09bf56a4d48fe18502c3d89cbb36645db09 | |
parent | 923be40414722da56f0a33efa42488d3cf8e4acc (diff) | |
download | chromium_src-f8b2532f9f9b4a35f396602bb80dfc34ee102fe4.zip chromium_src-f8b2532f9f9b4a35f396602bb80dfc34ee102fe4.tar.gz chromium_src-f8b2532f9f9b4a35f396602bb80dfc34ee102fe4.tar.bz2 |
Multibuffer reader implementation
Provides a simpler, byte-oriented api for getting data out of a multibuffer.
This CL also has the unit tests for the multibuffer itself. (Since those tests
uses the MultiBufferReader class.)
Depends on:
https://codereview.chromium.org/1165903002
Media cache design doc:
https://docs.google.com/document/d/15q6LTG0iDUe30QcoMtj4XNmKCa_7W_Q2uUIPFsJhS1E/edit
BUG=514719
Review URL: https://codereview.chromium.org/1420883004
Cr-Commit-Position: refs/heads/master@{#360244}
-rw-r--r-- | media/blink/BUILD.gn | 3 | ||||
-rw-r--r-- | media/blink/interval_map.h | 8 | ||||
-rw-r--r-- | media/blink/media_blink.gyp | 3 | ||||
-rw-r--r-- | media/blink/multibuffer_reader.cc | 237 | ||||
-rw-r--r-- | media/blink/multibuffer_reader.h | 158 | ||||
-rw-r--r-- | media/blink/multibuffer_unittest.cc | 508 |
6 files changed, 912 insertions, 5 deletions
diff --git a/media/blink/BUILD.gn b/media/blink/BUILD.gn index 54d5429..3b9365c 100644 --- a/media/blink/BUILD.gn +++ b/media/blink/BUILD.gn @@ -48,6 +48,8 @@ component("blink") { "media_blink_export.h", "multibuffer.cc", "multibuffer.h", + "multibuffer_reader.cc", + "multibuffer_reader.h", "new_session_cdm_result_promise.cc", "new_session_cdm_result_promise.h", "texttrack_impl.cc", @@ -122,6 +124,7 @@ test("media_blink_unittests") { "mock_webframeclient.h", "mock_weburlloader.cc", "mock_weburlloader.h", + "multibuffer_unittest.cc", "run_all_unittests.cc", "test_random.h", "test_response_generator.cc", diff --git a/media/blink/interval_map.h b/media/blink/interval_map.h index e11a03c..74cdca7 100644 --- a/media/blink/interval_map.h +++ b/media/blink/interval_map.h @@ -124,8 +124,6 @@ class IntervalMapConstIterator { // Needed to make the following construct work: // for (const auto& interval_value_pair : interval_map) - // Note however that this will skip the "end" interval, which - // is usually ok since it generally has the default value. std::pair<Interval<KeyType>, ValueType> operator*() const { return std::make_pair(interval(), value()); } @@ -182,8 +180,7 @@ class IntervalMap { // Increase [from..to) by |how_much|. void IncrementInterval(KeyType from, KeyType to, ValueType how_much) { - DCHECK_GT(to, from); - if (how_much == 0) + if (to <= from || how_much == 0) return; typename MapType::iterator a = MakeEntry(from); typename MapType::iterator b = MakeEntry(to); @@ -197,7 +194,8 @@ class IntervalMap { // Set [from..to) to |how_much|. void SetInterval(KeyType from, KeyType to, ValueType how_much) { - DCHECK_GT(to, from); + if (to <= from) + return; typename MapType::iterator a = MakeEntry(from); typename MapType::iterator b = MakeEntry(to); a->second = how_much; diff --git a/media/blink/media_blink.gyp b/media/blink/media_blink.gyp index 17a577d..3b08ee7 100644 --- a/media/blink/media_blink.gyp +++ b/media/blink/media_blink.gyp @@ -53,6 +53,8 @@ 'media_blink_export.h', 'multibuffer.cc', 'multibuffer.h', + 'multibuffer_reader.cc', + 'multibuffer_reader.h', 'new_session_cdm_result_promise.cc', 'new_session_cdm_result_promise.h', 'texttrack_impl.cc', @@ -128,6 +130,7 @@ 'mock_webframeclient.h', 'mock_weburlloader.cc', 'mock_weburlloader.h', + 'multibuffer_unittest.cc', 'run_all_unittests.cc', 'test_random.h', 'test_response_generator.cc', diff --git a/media/blink/multibuffer_reader.cc b/media/blink/multibuffer_reader.cc new file mode 100644 index 0000000..8f98167 --- /dev/null +++ b/media/blink/multibuffer_reader.cc @@ -0,0 +1,237 @@ +// Copyright 2015 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/bind.h" +#include "base/callback_helpers.h" +#include "base/message_loop/message_loop.h" +#include "media/blink/multibuffer_reader.h" +#include "net/base/net_errors.h" + +namespace media { + +MultiBufferReader::MultiBufferReader( + MultiBuffer* multibuffer, + int64_t start, + int64_t end, + const base::Callback<void(int64_t, int64_t)>& progress_callback) + : multibuffer_(multibuffer), + // If end is -1, we use a very large (but still supported) value instead. + end_(end == -1LL ? (1LL << (multibuffer->block_size_shift() + 30)) : end), + preload_high_(0), + preload_low_(0), + max_buffer_forward_(0), + max_buffer_backward_(0), + pos_(start), + preload_pos_(-1), + loading_(true), + current_wait_size_(0), + progress_callback_(progress_callback), + weak_factory_(this) { + DCHECK_GE(start, 0); + DCHECK_GE(end_, 0); +} + +MultiBufferReader::~MultiBufferReader() { + multibuffer_->RemoveReader(preload_pos_, this); + multibuffer_->IncrementMaxSize( + -block_ceil(max_buffer_forward_ + max_buffer_backward_)); + multibuffer_->PinRange(block(pos_ - max_buffer_backward_), + block_ceil(pos_ + max_buffer_forward_), -1); + multibuffer_->CleanupWriters(preload_pos_); +} + +void MultiBufferReader::Seek(int64_t pos) { + DCHECK_GE(pos, 0); + if (pos == pos_) + return; + // Use a rangemap to compute the diff in pinning. + IntervalMap<MultiBuffer::BlockId, int32_t> tmp; + tmp.IncrementInterval(block(pos_ - max_buffer_backward_), + block_ceil(pos_ + max_buffer_forward_), -1); + tmp.IncrementInterval(block(pos - max_buffer_backward_), + block_ceil(pos + max_buffer_forward_), 1); + + multibuffer_->PinRanges(tmp); + + multibuffer_->RemoveReader(preload_pos_, this); + MultiBufferBlockId old_preload_pos = preload_pos_; + preload_pos_ = block(pos); + pos_ = pos; + UpdateInternalState(); + multibuffer_->CleanupWriters(old_preload_pos); +} + +void MultiBufferReader::SetMaxBuffer(int64_t backward, int64_t forward) { + // Safe, because we know this doesn't actually prune the cache right away. + multibuffer_->IncrementMaxSize( + -block_ceil(max_buffer_forward_ + max_buffer_backward_)); + // Use a rangemap to compute the diff in pinning. + IntervalMap<MultiBuffer::BlockId, int32_t> tmp; + tmp.IncrementInterval(block(pos_ - max_buffer_backward_), + block_ceil(pos_ + max_buffer_forward_), -1); + max_buffer_backward_ = backward; + max_buffer_forward_ = forward; + tmp.IncrementInterval(block(pos_ - max_buffer_backward_), + block_ceil(pos_ + max_buffer_forward_), 1); + multibuffer_->PinRanges(tmp); + + multibuffer_->IncrementMaxSize( + block_ceil(max_buffer_forward_ + max_buffer_backward_)); +} + +int64_t MultiBufferReader::Available() const { + int64_t unavailable_byte_pos = + static_cast<int64_t>(multibuffer_->FindNextUnavailable(block(pos_))) + << multibuffer_->block_size_shift(); + return std::max<int64_t>(0, unavailable_byte_pos - pos_); +} + +int64_t MultiBufferReader::TryRead(uint8_t* data, int64_t len) { + DCHECK_GT(len, 0); + current_wait_size_ = 0; + cb_.Reset(); + DCHECK_LE(pos_ + len, end_); + const MultiBuffer::DataMap& data_map = multibuffer_->map(); + MultiBuffer::DataMap::const_iterator i = data_map.find(block(pos_)); + int64_t p = pos_; + int64_t bytes_read = 0; + while (bytes_read < len) { + if (i == data_map.end()) + break; + if (i->first != block(p)) + break; + if (i->second->end_of_stream()) + break; + size_t offset = p & ((1LL << multibuffer_->block_size_shift()) - 1); + size_t tocopy = + std::min<size_t>(len - bytes_read, i->second->data_size() - offset); + memcpy(data, i->second->data() + offset, tocopy); + data += tocopy; + bytes_read += tocopy; + p += tocopy; + ++i; + } + Seek(p); + return bytes_read; +} + +int MultiBufferReader::Wait(int64_t len, const base::Closure& cb) { + DCHECK_LE(pos_ + len, end_); + DCHECK_NE(Available(), -1); + DCHECK_LE(len, max_buffer_forward_); + current_wait_size_ = len; + + cb_.Reset(); + UpdateInternalState(); + + if (Available() >= current_wait_size_) { + return net::OK; + } else { + cb_ = cb; + return net::ERR_IO_PENDING; + } +} + +void MultiBufferReader::SetPreload(int64_t preload_high, int64_t preload_low) { + DCHECK_GE(preload_high, preload_low); + multibuffer_->RemoveReader(preload_pos_, this); + preload_pos_ = block(pos_); + preload_high_ = preload_high; + preload_low_ = preload_low; + UpdateInternalState(); +} + +bool MultiBufferReader::IsLoading() const { + return loading_; +} + +void MultiBufferReader::CheckWait() { + if (!cb_.is_null() && + (Available() >= current_wait_size_ || Available() == -1)) { + // We redirect the call through a weak pointer to ourselves to guarantee + // there are no callbacks from us after we've been destroyed. + base::MessageLoop::current()->PostTask( + FROM_HERE, + base::Bind(&MultiBufferReader::Call, weak_factory_.GetWeakPtr(), + base::ResetAndReturn(&cb_))); + } +} + +void MultiBufferReader::Call(const base::Closure& cb) const { + cb.Run(); +} + +void MultiBufferReader::NotifyAvailableRange( + const Interval<MultiBufferBlockId>& range) { + // Update end_ if we can. + if (range.end > range.begin) { + auto i = multibuffer_->map().find(range.end - 1); + DCHECK(i != multibuffer_->map().end()); + if (i->second->end_of_stream()) { + // This is an upper limit because the last-to-one block is allowed + // to be smaller than the rest of the blocks. + int64_t size_upper_limit = static_cast<int64_t>(range.end) + << multibuffer_->block_size_shift(); + end_ = std::min(end_, size_upper_limit); + } + } + UpdateInternalState(); + if (!progress_callback_.is_null()) { + // We redirect the call through a weak pointer to ourselves to guarantee + // there are no callbacks from us after we've been destroyed. + base::MessageLoop::current()->PostTask( + FROM_HERE, + base::Bind(&MultiBufferReader::Call, weak_factory_.GetWeakPtr(), + base::Bind(progress_callback_, + static_cast<int64_t>(range.begin) + << multibuffer_->block_size_shift(), + static_cast<int64_t>(range.end) + << multibuffer_->block_size_shift()))); + // We may be destroyed, do not touch |this|. + } +} + +void MultiBufferReader::UpdateInternalState() { + int64_t effective_preload = loading_ ? preload_high_ : preload_low_; + + loading_ = false; + if (preload_pos_ == -1) { + preload_pos_ = block(pos_); + DCHECK_GE(preload_pos_, 0); + } + MultiBuffer::BlockId max_preload = block_ceil( + std::min(end_, pos_ + std::max(effective_preload, current_wait_size_))); + + // Note that we might not have been added to the multibuffer, + // removing ourselves is a no-op in that case. + multibuffer_->RemoveReader(preload_pos_, this); + + // We explicitly allow preloading to go beyond the pinned region in the cache. + // It only happens when we want to preload something into the disk cache. + // Thus it is possible to have blocks between our current reading position + // and preload_pos_ be unavailable. When we get a Seek() call (possibly + // through TryRead()) we reset the preload_pos_ to the current reading + // position, and preload_pos_ will become the first unavailable block after + // our current reading position again. + preload_pos_ = multibuffer_->FindNextUnavailable(preload_pos_); + DCHECK_GE(preload_pos_, 0); + + DVLOG(3) << "UpdateInternalState" + << " pp = " << preload_pos_ + << " block_ceil(end_) = " << block_ceil(end_) << " end_ = " << end_ + << " max_preload " << max_preload; + + if (preload_pos_ < block_ceil(end_)) { + if (preload_pos_ < max_preload) { + loading_ = true; + multibuffer_->AddReader(preload_pos_, this); + } else if (multibuffer_->Contains(preload_pos_ - 1)) { + --preload_pos_; + multibuffer_->AddReader(preload_pos_, this); + } + } + CheckWait(); +} + +} // namespace media diff --git a/media/blink/multibuffer_reader.h b/media/blink/multibuffer_reader.h new file mode 100644 index 0000000..44f14f9 --- /dev/null +++ b/media/blink/multibuffer_reader.h @@ -0,0 +1,158 @@ +// Copyright 2015 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef MEDIA_BLINK_MULTIBUFFER_READER_H_ +#define MEDIA_BLINK_MULTIBUFFER_READER_H_ + +#include <stdint.h> + +#include <limits> +#include <map> +#include <set> + +#include "base/callback.h" +#include "base/memory/weak_ptr.h" +#include "media/blink/media_blink_export.h" +#include "media/blink/multibuffer.h" + +namespace media { + +// Wrapper for MultiBuffer that offers a simple byte-reading +// interface with prefetch. +class MEDIA_BLINK_EXPORT MultiBufferReader + : NON_EXPORTED_BASE(public MultiBuffer::Reader) { + public: + // Note that |progress_callback| is guaranteed to be called if + // a redirect happens and the url_data is updated. Otherwise + // origin checks will become insecure. + // Users probably want to call SetMaxBuffer & SetPreload after + // creating the a MultiBufferReader. + // The progress callback will be called when the "available range" + // changes. (The number of bytes available for reading before and + // after the current position.) The arguments for the progress + // callback is the first byte available (from beginning of file) + // and the last byte available + 1. Note that there may be other + // regions of available data in the cache as well. + // If |end| is not known, use -1. + MultiBufferReader( + MultiBuffer* multibuffer, + int64_t start, + int64_t end, + const base::Callback<void(int64_t, int64_t)>& progress_callback); + + ~MultiBufferReader() override; + + // Returns number of bytes available for reading. When the rest of the file + // is available, the number returned will be greater than the number + // or readable bytes. If an error occurs, -1 is returned. + int64_t Available() const; + + // Seek to a different position. + // If there is a pending Wait(), it will be cancelled. + void Seek(int64_t pos); + + // Returns the current position. + int64_t Tell() const { return pos_; } + + // Tries to read |len| bytes and advance position. + // Returns number of bytes read. + // If there is a pending Wait(), it will be cancelled. + int64_t TryRead(uint8_t* data, int64_t len); + + // Wait until |len| bytes are available for reading. + // Returns net::OK if |len| bytes are already available, otherwise it will + // return net::ERR_IO_PENDING and call |cb| at some point later. + // |len| must be smaller or equal to max_buffer_forward. + int Wait(int64_t len, const base::Closure& cb); + + // Set how much data we try to preload into the cache ahead of our current + // location. Normally, we preload until we have preload_high bytes, then + // stop until we fall below preload_low bytes. Note that preload can be + // set higher than max_buffer_forward, but the result is usually that + // some blocks will be freed between the current position and the preload + // position. + void SetPreload(int64_t preload_high, int64_t preload_low); + + // Change how much data we pin to the cache. + // The range [current_position - backward ... current_position + forward) + // will be locked in the cache. Calling Wait() or TryRead() with values + // larger than |forward| is not supported. + void SetMaxBuffer(int64_t backward, int64_t forward); + + // Returns true if we are currently loading data. + bool IsLoading() const; + + // Reader implementation. + void NotifyAvailableRange(const Interval<MultiBufferBlockId>& range) override; + + // Getters + int64_t preload_high() const { return preload_high_; } + int64_t preload_low() const { return preload_low_; } + + private: + // Returns the block for a particular byte position. + MultiBufferBlockId block(int64_t byte_pos) const { + return byte_pos >> multibuffer_->block_size_shift(); + } + + // Returns the block for a particular byte position, rounding up. + MultiBufferBlockId block_ceil(int64_t byte_pos) const { + return block(byte_pos + (1LL << multibuffer_->block_size_shift()) - 1); + } + + // Check if wait operation can complete now. + void CheckWait(); + + // Recalculate preload_pos_ and update our entry in the multibuffer + // reader index. Also call CheckWait(). This function is basically + // called anything changes, like when we get more data or seek to + // a new position. + void UpdateInternalState(); + + // Indirection function used to call callbacks. When we post a callback + // we indirect it through a weak_ptr and this function to make sure we + // don't call any callbacks after this object has been destroyed. + void Call(const base::Closure& cb) const; + + // The multibuffer we're wrapping, not owned. + MultiBuffer* multibuffer_; + + // We're not interested in reading past this position. + int64_t end_; + + // Defer reading once we have this much data. + int64_t preload_high_; + // Stop deferring once we have this much data. + int64_t preload_low_; + + // Pin this much data in the cache from the current position. + int64_t max_buffer_forward_; + int64_t max_buffer_backward_; + + // Current position in bytes. + int64_t pos_; + + // [block(pos_)..preload_pos_) are known to be in the cache. + // preload_pos_ is only allowed to point to a filled + // cache position if it is equal to end_ or pos_+preload_. + // This is a pointer to a slot in the cache, so the unit is + // blocks. + MultiBufferBlockId preload_pos_; + + // True if we've requested data from the cache by calling WaitFor(). + bool loading_; + + // When Available() > current_wait_size_ we call cb_. + int64_t current_wait_size_; + base::Closure cb_; + + // Progress callback. + base::Callback<void(int64_t, int64_t)> progress_callback_; + + base::WeakPtrFactory<MultiBufferReader> weak_factory_; +}; + +} // namespace media + +#endif // MEDIA_BLINK_MULTIBUFFER_READER_H_ diff --git a/media/blink/multibuffer_unittest.cc b/media/blink/multibuffer_unittest.cc new file mode 100644 index 0000000..59693ef --- /dev/null +++ b/media/blink/multibuffer_unittest.cc @@ -0,0 +1,508 @@ +// Copyright 2015 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include <deque> +#include <string> +#include <vector> + +#include "base/bind.h" +#include "base/callback_helpers.h" +#include "base/message_loop/message_loop.h" +#include "media/blink/multibuffer.h" +#include "media/blink/multibuffer_reader.h" +#include "media/blink/test_random.h" +#include "testing/gtest/include/gtest/gtest.h" + +const int kBlockSizeShift = 8; +const size_t kBlockSize = 1UL << kBlockSizeShift; + +namespace media { +class TestMultiBufferDataProvider; + +std::vector<TestMultiBufferDataProvider*> writers; + +class TestMultiBufferDataProvider : public media::MultiBuffer::DataProvider { + public: + TestMultiBufferDataProvider(MultiBufferBlockId pos, + size_t file_size, + int max_blocks_after_defer, + bool must_read_whole_file, + media::TestRandom* rnd) + : pos_(pos), + blocks_until_deferred_(1 << 30), + max_blocks_after_defer_(max_blocks_after_defer), + file_size_(file_size), + must_read_whole_file_(must_read_whole_file), + rnd_(rnd) { + writers.push_back(this); + } + + ~TestMultiBufferDataProvider() override { + if (must_read_whole_file_) { + CHECK_GE(pos_ * kBlockSize, file_size_); + } + for (size_t i = 0; i < writers.size(); i++) { + if (writers[i] == this) { + writers[i] = writers.back(); + writers.pop_back(); + return; + } + } + LOG(FATAL) << "Couldn't find myself in writers!"; + } + + MultiBufferBlockId Tell() const override { return pos_; } + + bool Available() const override { return !fifo_.empty(); } + + scoped_refptr<DataBuffer> Read() override { + DCHECK(Available()); + scoped_refptr<DataBuffer> ret = fifo_.front(); + fifo_.pop_front(); + ++pos_; + return ret; + } + + void SetAvailableCallback(const base::Closure& cb) override { + DCHECK(!Available()); + cb_ = cb; + } + + void SetDeferred(bool deferred) override { + if (deferred) { + if (max_blocks_after_defer_ > 0) { + blocks_until_deferred_ = rnd_->Rand() % max_blocks_after_defer_; + } else if (max_blocks_after_defer_ < 0) { + blocks_until_deferred_ = -max_blocks_after_defer_; + } else { + blocks_until_deferred_ = 0; + } + } else { + blocks_until_deferred_ = 1 << 30; + } + } + + bool Advance() { + if (blocks_until_deferred_ == 0) + return false; + --blocks_until_deferred_; + + bool ret = true; + scoped_refptr<media::DataBuffer> block = new media::DataBuffer(kBlockSize); + size_t x = 0; + size_t byte_pos = (fifo_.size() + pos_) * kBlockSize; + for (x = 0; x < kBlockSize; x++, byte_pos++) { + if (byte_pos >= file_size_) + break; + block->writable_data()[x] = + static_cast<uint8_t>((byte_pos * 15485863) >> 16); + } + block->set_data_size(static_cast<int>(x)); + fifo_.push_back(block); + if (byte_pos == file_size_) { + fifo_.push_back(DataBuffer::CreateEOSBuffer()); + ret = false; + } + cb_.Run(); + return ret; + } + + private: + std::deque<scoped_refptr<media::DataBuffer>> fifo_; + MultiBufferBlockId pos_; + int32_t blocks_until_deferred_; + int32_t max_blocks_after_defer_; + size_t file_size_; + bool must_read_whole_file_; + base::Closure cb_; + media::TestRandom* rnd_; +}; + +class TestMultiBuffer : public media::MultiBuffer { + public: + explicit TestMultiBuffer( + int32_t shift, + const scoped_refptr<media::MultiBuffer::GlobalLRU>& lru, + media::TestRandom* rnd) + : media::MultiBuffer(shift, lru), + range_supported_(false), + create_ok_(true), + max_writers_(10000), + file_size_(1 << 30), + max_blocks_after_defer_(0), + must_read_whole_file_(false), + writers_created_(0), + rnd_(rnd) {} + + void SetMaxWriters(size_t max_writers) { max_writers_ = max_writers; } + + void CheckPresentState() { + IntervalMap<MultiBufferBlockId, int32_t> tmp; + for (DataMap::iterator i = data_.begin(); i != data_.end(); ++i) { + CHECK(i->second); // Null poineters are not allowed in data_ + CHECK_NE(!!pinned_[i->first], lru_->Contains(this, i->first)) + << " i->first = " << i->first; + tmp.IncrementInterval(i->first, i->first + 1, 1); + } + IntervalMap<MultiBufferBlockId, int32_t>::const_iterator tmp_iterator = + tmp.begin(); + IntervalMap<MultiBufferBlockId, int32_t>::const_iterator present_iterator = + present_.begin(); + while (tmp_iterator != tmp.end() && present_iterator != tmp.end()) { + EXPECT_EQ(tmp_iterator.interval_begin(), + present_iterator.interval_begin()); + EXPECT_EQ(tmp_iterator.interval_end(), present_iterator.interval_end()); + EXPECT_EQ(tmp_iterator.value(), present_iterator.value()); + ++tmp_iterator; + ++present_iterator; + } + EXPECT_TRUE(tmp_iterator == tmp.end()); + EXPECT_TRUE(present_iterator == present_.end()); + } + + void CheckLRUState() { + for (DataMap::iterator i = data_.begin(); i != data_.end(); ++i) { + CHECK(i->second); // Null poineters are not allowed in data_ + CHECK_NE(!!pinned_[i->first], lru_->Contains(this, i->first)) + << " i->first = " << i->first; + CHECK_EQ(1, present_[i->first]) << " i->first = " << i->first; + } + } + + void SetFileSize(size_t file_size) { file_size_ = file_size; } + + void SetMaxBlocksAfterDefer(int32_t max_blocks_after_defer) { + max_blocks_after_defer_ = max_blocks_after_defer; + } + + void SetMustReadWholeFile(bool must_read_whole_file) { + must_read_whole_file_ = must_read_whole_file; + } + + int32_t writers_created() const { return writers_created_; } + + void SetRangeSupported(bool supported) { range_supported_ = supported; } + + protected: + DataProvider* CreateWriter(const MultiBufferBlockId& pos) override { + DCHECK(create_ok_); + writers_created_++; + CHECK_LT(writers.size(), max_writers_); + return new TestMultiBufferDataProvider( + pos, file_size_, max_blocks_after_defer_, must_read_whole_file_, rnd_); + } + void Prune(size_t max_to_free) override { + // Prune should not cause additional writers to be spawned. + create_ok_ = false; + MultiBuffer::Prune(max_to_free); + create_ok_ = true; + } + + bool RangeSupported() const override { return range_supported_; } + + private: + bool range_supported_; + bool create_ok_; + size_t max_writers_; + size_t file_size_; + int32_t max_blocks_after_defer_; + bool must_read_whole_file_; + int32_t writers_created_; + media::TestRandom* rnd_; +}; +} + +class MultiBufferTest : public testing::Test { + public: + MultiBufferTest() + : rnd_(42), + lru_(new media::MultiBuffer::GlobalLRU()), + multibuffer_(kBlockSizeShift, lru_, &rnd_) {} + + void Advance() { + CHECK(media::writers.size()); + media::writers[rnd_.Rand() % media::writers.size()]->Advance(); + } + + bool AdvanceAll() { + bool advanced = false; + for (size_t i = 0; i < media::writers.size(); i++) { + advanced |= media::writers[i]->Advance(); + } + multibuffer_.CheckLRUState(); + return advanced; + } + + protected: + media::TestRandom rnd_; + scoped_refptr<media::MultiBuffer::GlobalLRU> lru_; + media::TestMultiBuffer multibuffer_; + base::MessageLoop message_loop_; +}; + +TEST_F(MultiBufferTest, ReadAll) { + multibuffer_.SetMaxWriters(1); + size_t pos = 0; + size_t end = 10000; + multibuffer_.SetFileSize(10000); + multibuffer_.SetMustReadWholeFile(true); + media::MultiBufferReader reader(&multibuffer_, pos, end, + base::Callback<void(int64_t, int64_t)>()); + reader.SetMaxBuffer(2000, 5000); + reader.SetPreload(1000, 1000); + while (pos < end) { + unsigned char buffer[27]; + buffer[17] = 17; + size_t to_read = std::min<size_t>(end - pos, 17); + int64_t bytes_read = reader.TryRead(buffer, to_read); + if (bytes_read) { + EXPECT_EQ(buffer[17], 17); + for (int64_t i = 0; i < bytes_read; i++) { + uint8_t expected = static_cast<uint8_t>((pos * 15485863) >> 16); + EXPECT_EQ(expected, buffer[i]) << " pos = " << pos; + pos++; + } + } else { + Advance(); + } + } +} + +TEST_F(MultiBufferTest, ReadAllAdvanceFirst) { + multibuffer_.SetMaxWriters(1); + size_t pos = 0; + size_t end = 10000; + multibuffer_.SetFileSize(10000); + multibuffer_.SetMustReadWholeFile(true); + media::MultiBufferReader reader(&multibuffer_, pos, end, + base::Callback<void(int64_t, int64_t)>()); + reader.SetMaxBuffer(2000, 5000); + reader.SetPreload(1000, 1000); + while (pos < end) { + unsigned char buffer[27]; + buffer[17] = 17; + size_t to_read = std::min<size_t>(end - pos, 17); + while (AdvanceAll()) + ; + int64_t bytes = reader.TryRead(buffer, to_read); + EXPECT_GT(bytes, 0); + EXPECT_EQ(buffer[17], 17); + for (int64_t i = 0; i < bytes; i++) { + uint8_t expected = static_cast<uint8_t>((pos * 15485863) >> 16); + EXPECT_EQ(expected, buffer[i]) << " pos = " << pos; + pos++; + } + } +} + +// Checks that if the data provider provides too much data after we told it +// to defer, we kill it. +TEST_F(MultiBufferTest, ReadAllAdvanceFirst_NeverDefer) { + multibuffer_.SetMaxWriters(1); + size_t pos = 0; + size_t end = 10000; + multibuffer_.SetFileSize(10000); + multibuffer_.SetMaxBlocksAfterDefer(-10000); + multibuffer_.SetRangeSupported(true); + media::MultiBufferReader reader(&multibuffer_, pos, end, + base::Callback<void(int64_t, int64_t)>()); + reader.SetMaxBuffer(2000, 5000); + reader.SetPreload(1000, 1000); + while (pos < end) { + unsigned char buffer[27]; + buffer[17] = 17; + size_t to_read = std::min<size_t>(end - pos, 17); + while (AdvanceAll()) + ; + int64_t bytes = reader.TryRead(buffer, to_read); + EXPECT_GT(bytes, 0); + EXPECT_EQ(buffer[17], 17); + for (int64_t i = 0; i < bytes; i++) { + uint8_t expected = static_cast<uint8_t>((pos * 15485863) >> 16); + EXPECT_EQ(expected, buffer[i]) << " pos = " << pos; + pos++; + } + } + EXPECT_GT(multibuffer_.writers_created(), 1); +} + +// Same as ReadAllAdvanceFirst_NeverDefer, but the url doesn't support +// ranges, so we don't destroy it no matter how much data it provides. +TEST_F(MultiBufferTest, ReadAllAdvanceFirst_NeverDefer2) { + multibuffer_.SetMaxWriters(1); + size_t pos = 0; + size_t end = 10000; + multibuffer_.SetFileSize(10000); + multibuffer_.SetMustReadWholeFile(true); + multibuffer_.SetMaxBlocksAfterDefer(-10000); + media::MultiBufferReader reader(&multibuffer_, pos, end, + base::Callback<void(int64_t, int64_t)>()); + reader.SetMaxBuffer(2000, 5000); + reader.SetPreload(1000, 1000); + while (pos < end) { + unsigned char buffer[27]; + buffer[17] = 17; + size_t to_read = std::min<size_t>(end - pos, 17); + while (AdvanceAll()) + ; + int64_t bytes = reader.TryRead(buffer, to_read); + EXPECT_GT(bytes, 0); + EXPECT_EQ(buffer[17], 17); + for (int64_t i = 0; i < bytes; i++) { + uint8_t expected = static_cast<uint8_t>((pos * 15485863) >> 16); + EXPECT_EQ(expected, buffer[i]) << " pos = " << pos; + pos++; + } + } +} + +TEST_F(MultiBufferTest, LRUTest) { + int64_t max_size = 17; + int64_t current_size = 0; + lru_->IncrementMaxSize(max_size); + + multibuffer_.SetMaxWriters(1); + size_t pos = 0; + size_t end = 10000; + multibuffer_.SetFileSize(10000); + media::MultiBufferReader reader(&multibuffer_, pos, end, + base::Callback<void(int64_t, int64_t)>()); + reader.SetPreload(10000, 10000); + // Note, no pinning, all data should end up in LRU. + EXPECT_EQ(current_size, lru_->Size()); + current_size += max_size; + while (AdvanceAll()) + ; + EXPECT_EQ(current_size, lru_->Size()); + lru_->IncrementMaxSize(-max_size); + lru_->Prune(3); + current_size -= 3; + EXPECT_EQ(current_size, lru_->Size()); + lru_->Prune(3); + current_size -= 3; + EXPECT_EQ(current_size, lru_->Size()); + lru_->Prune(1000); + EXPECT_EQ(0, lru_->Size()); +} + +class ReadHelper { + public: + ReadHelper(size_t end, + size_t max_read_size, + media::MultiBuffer* multibuffer, + media::TestRandom* rnd) + : pos_(0), + end_(end), + max_read_size_(max_read_size), + read_size_(0), + rnd_(rnd), + reader_(multibuffer, + pos_, + end_, + base::Callback<void(int64_t, int64_t)>()) { + reader_.SetMaxBuffer(2000, 5000); + reader_.SetPreload(1000, 1000); + } + + bool Read() { + if (read_size_ == 0) + return true; + unsigned char buffer[4096]; + CHECK_LE(read_size_, static_cast<int64_t>(sizeof(buffer))); + CHECK_EQ(pos_, reader_.Tell()); + int64_t bytes_read = reader_.TryRead(buffer, read_size_); + if (bytes_read) { + for (int64_t i = 0; i < bytes_read; i++) { + unsigned char expected = (pos_ * 15485863) >> 16; + EXPECT_EQ(expected, buffer[i]) << " pos = " << pos_; + pos_++; + } + CHECK_EQ(pos_, reader_.Tell()); + return true; + } + return false; + } + + void StartRead() { + CHECK_EQ(pos_, reader_.Tell()); + read_size_ = std::min(1 + rnd_->Rand() % (max_read_size_ - 1), end_ - pos_); + if (!Read()) { + reader_.Wait(read_size_, + base::Bind(&ReadHelper::WaitCB, base::Unretained(this))); + } + } + + void WaitCB() { CHECK(Read()); } + + void Seek() { + pos_ = rnd_->Rand() % end_; + reader_.Seek(pos_); + CHECK_EQ(pos_, reader_.Tell()); + } + + private: + int64_t pos_; + int64_t end_; + int64_t max_read_size_; + int64_t read_size_; + media::TestRandom* rnd_; + media::MultiBufferReader reader_; +}; + +TEST_F(MultiBufferTest, RandomTest) { + size_t file_size = 1000000; + multibuffer_.SetFileSize(file_size); + multibuffer_.SetMaxBlocksAfterDefer(10); + std::vector<ReadHelper*> read_helpers; + for (size_t i = 0; i < 20; i++) { + read_helpers.push_back( + new ReadHelper(file_size, 1000, &multibuffer_, &rnd_)); + } + for (int i = 0; i < 10000; i++) { + if (rnd_.Rand() & 1) { + if (!media::writers.empty()) + Advance(); + } else { + size_t j = rnd_.Rand() % read_helpers.size(); + if (rnd_.Rand() % 100 < 3) + read_helpers[j]->Seek(); + read_helpers[j]->StartRead(); + multibuffer_.CheckLRUState(); + } + } + multibuffer_.CheckPresentState(); + while (!read_helpers.empty()) { + delete read_helpers.back(); + read_helpers.pop_back(); + } +} + +TEST_F(MultiBufferTest, RandomTest_RangeSupported) { + size_t file_size = 1000000; + multibuffer_.SetFileSize(file_size); + multibuffer_.SetMaxBlocksAfterDefer(10); + std::vector<ReadHelper*> read_helpers; + multibuffer_.SetRangeSupported(true); + for (size_t i = 0; i < 20; i++) { + read_helpers.push_back( + new ReadHelper(file_size, 1000, &multibuffer_, &rnd_)); + } + for (int i = 0; i < 10000; i++) { + if (rnd_.Rand() & 1) { + if (!media::writers.empty()) + Advance(); + } else { + size_t j = rnd_.Rand() % read_helpers.size(); + if (rnd_.Rand() % 100 < 3) + read_helpers[j]->Seek(); + read_helpers[j]->StartRead(); + multibuffer_.CheckLRUState(); + } + } + multibuffer_.CheckPresentState(); + while (!read_helpers.empty()) { + delete read_helpers.back(); + read_helpers.pop_back(); + } +} |