From 641c9abcde75980f45771240db3d8131c6656fc7 Mon Sep 17 00:00:00 2001 From: "acolwell@chromium.org" Date: Wed, 5 Jan 2011 18:44:10 +0000 Subject: Refactor PipelineImpl to use CompositeFilter to manage Filter state transitions. BUG=54110 TEST=media_unittests CompositeFilterTest.* Committed: http://src.chromium.org/viewvc/chrome?view=rev&revision=70267 Review URL: http://codereview.chromium.org/5744002 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@70527 0039d316-1c4b-4281-b951-d872f2087c98 --- media/base/composite_filter.cc | 603 ++++++++++++++++++++++++ media/base/composite_filter.h | 165 +++++++ media/base/composite_filter_unittest.cc | 799 ++++++++++++++++++++++++++++++++ media/base/mock_filters.cc | 27 +- media/base/mock_filters.h | 39 +- media/base/pipeline.h | 2 + media/base/pipeline_impl.cc | 173 +++---- media/base/pipeline_impl.h | 20 +- media/base/pipeline_impl_unittest.cc | 56 ++- 9 files changed, 1761 insertions(+), 123 deletions(-) create mode 100644 media/base/composite_filter.cc create mode 100644 media/base/composite_filter.h create mode 100644 media/base/composite_filter_unittest.cc (limited to 'media/base') diff --git a/media/base/composite_filter.cc b/media/base/composite_filter.cc new file mode 100644 index 0000000..87ff799 --- /dev/null +++ b/media/base/composite_filter.cc @@ -0,0 +1,603 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "media/base/composite_filter.h" + +#include "base/stl_util-inl.h" +#include "media/base/callback.h" + +namespace media { + +class CompositeFilter::FilterHostImpl : public FilterHost { + public: + FilterHostImpl(CompositeFilter* parent, FilterHost* host); + + FilterHost* host(); + + // media::FilterHost methods. + virtual void SetError(PipelineError error); + virtual base::TimeDelta GetTime() const; + virtual base::TimeDelta GetDuration() const; + virtual void SetTime(base::TimeDelta time); + virtual void SetDuration(base::TimeDelta duration); + virtual void SetBufferedTime(base::TimeDelta buffered_time); + virtual void SetTotalBytes(int64 total_bytes); + virtual void SetBufferedBytes(int64 buffered_bytes); + virtual void SetVideoSize(size_t width, size_t height); + virtual void SetStreaming(bool streaming); + virtual void NotifyEnded(); + virtual void SetLoaded(bool loaded); + virtual void SetNetworkActivity(bool network_activity); + virtual void DisableAudioRenderer(); + virtual void SetCurrentReadPosition(int64 offset); + virtual int64 GetCurrentReadPosition(); + + private: + CompositeFilter* parent_; + FilterHost* host_; + + DISALLOW_COPY_AND_ASSIGN(FilterHostImpl); +}; + +CompositeFilter::CompositeFilter(MessageLoop* message_loop) { + Init(message_loop, NULL); +} + +CompositeFilter::CompositeFilter(MessageLoop* message_loop, + ThreadFactoryFunction thread_factory) { + DCHECK(thread_factory); + Init(message_loop, thread_factory); +} + +void CompositeFilter::Init(MessageLoop* message_loop, + ThreadFactoryFunction thread_factory) { + DCHECK(message_loop); + message_loop_ = message_loop; + thread_factory_ = thread_factory; + + if (!thread_factory_) { + thread_factory_ = &CompositeFilter::DefaultThreadFactory; + } + + state_ = kCreated; + sequence_index_ = 0; + error_ = PIPELINE_OK; +} + +CompositeFilter::~CompositeFilter() { + DCHECK_EQ(message_loop_, MessageLoop::current()); + DCHECK(state_ == kCreated || state_ == kStopped); + + // Stop every running filter thread. + for (FilterThreadVector::iterator iter = filter_threads_.begin(); + iter != filter_threads_.end(); + ++iter) { + (*iter)->Stop(); + } + + // Reset the pipeline, which will decrement a reference to this object. + // We will get destroyed as soon as the remaining tasks finish executing. + // To be safe, we'll set our pipeline reference to NULL. + filters_.clear(); + STLDeleteElements(&filter_threads_); +} + +bool CompositeFilter::AddFilter(scoped_refptr filter) { + DCHECK_EQ(message_loop_, MessageLoop::current()); + if (!filter.get() || state_ != kCreated || !host()) + return false; + + // Create a dedicated thread for this filter if applicable. + if (filter->requires_message_loop()) { + scoped_ptr thread( + thread_factory_(filter->message_loop_name())); + + if (!thread.get() || !thread->Start()) { + return false; + } + + filter->set_message_loop(thread->message_loop()); + filter_threads_.push_back(thread.release()); + } + + // Register ourselves as the filter's host. + filter->set_host(host_impl_.get()); + filters_.push_back(make_scoped_refptr(filter.get())); + return true; +} + +const char* CompositeFilter::major_mime_type() const { + return ""; +} + +void CompositeFilter::set_host(FilterHost* host) { + DCHECK_EQ(message_loop_, MessageLoop::current()); + DCHECK(host); + DCHECK(!host_impl_.get()); + host_impl_.reset(new FilterHostImpl(this, host)); +} + +FilterHost* CompositeFilter::host() { + return host_impl_.get() ? host_impl_->host() : NULL; +} + +bool CompositeFilter::requires_message_loop() const { + return false; +} + +const char* CompositeFilter::message_loop_name() const { + return "CompositeFilter"; +} + +void CompositeFilter::set_message_loop(MessageLoop* message_loop) { + NOTREACHED() << "Message loop should not be set."; +} + +MessageLoop* CompositeFilter::message_loop() { + return NULL; +} + +void CompositeFilter::Play(FilterCallback* play_callback) { + DCHECK_EQ(message_loop_, MessageLoop::current()); + scoped_ptr callback(play_callback); + if (callback_.get()) { + SendErrorToHost(PIPELINE_ERROR_OPERATION_PENDING); + callback->Run(); + return; + } else if (state_ == kPlaying) { + callback->Run(); + return; + } else if (!host() || (state_ != kPaused && state_ != kCreated)) { + SendErrorToHost(PIPELINE_ERROR_INVALID_STATE); + callback->Run(); + return; + } + + ChangeState(kPlayPending); + callback_.reset(callback.release()); + StartSerialCallSequence(); +} + +void CompositeFilter::Pause(FilterCallback* pause_callback) { + DCHECK_EQ(message_loop_, MessageLoop::current()); + scoped_ptr callback(pause_callback); + if (callback_.get()) { + SendErrorToHost(PIPELINE_ERROR_OPERATION_PENDING); + callback->Run(); + return; + } else if (state_ == kPaused) { + callback->Run(); + return; + } else if (!host() || state_ != kPlaying) { + SendErrorToHost(PIPELINE_ERROR_INVALID_STATE); + callback->Run(); + return; + } + + ChangeState(kPausePending); + callback_.reset(callback.release()); + StartSerialCallSequence(); +} + +void CompositeFilter::Flush(FilterCallback* flush_callback) { + DCHECK_EQ(message_loop_, MessageLoop::current()); + scoped_ptr callback(flush_callback); + if (callback_.get()) { + SendErrorToHost(PIPELINE_ERROR_OPERATION_PENDING); + callback->Run(); + return; + } else if (!host() || (state_ != kCreated && state_ != kPaused)) { + SendErrorToHost(PIPELINE_ERROR_INVALID_STATE); + callback->Run(); + return; + } + + ChangeState(kFlushPending); + callback_.reset(callback.release()); + StartParallelCallSequence(); +} + +void CompositeFilter::Stop(FilterCallback* stop_callback) { + DCHECK_EQ(message_loop_, MessageLoop::current()); + scoped_ptr callback(stop_callback); + if (!host()) { + SendErrorToHost(PIPELINE_ERROR_INVALID_STATE); + callback->Run(); + return; + } else if (state_ == kStopped) { + callback->Run(); + return; + } + + switch(state_) { + case kError: + case kCreated: + case kPaused: + case kPlaying: + ChangeState(kStopPending); + break; + case kPlayPending: + ChangeState(kStopWhilePlayPending); + break; + case kPausePending: + ChangeState(kStopWhilePausePending); + break; + case kFlushPending: + ChangeState(kStopWhileFlushPending); + break; + case kSeekPending: + ChangeState(kStopWhileSeekPending); + break; + default: + SendErrorToHost(PIPELINE_ERROR_INVALID_STATE); + callback->Run(); + return; + } + + callback_.reset(callback.release()); + if (state_ == kStopPending) { + StartSerialCallSequence(); + } +} + +void CompositeFilter::SetPlaybackRate(float playback_rate) { + DCHECK_EQ(message_loop_, MessageLoop::current()); + for (FilterVector::iterator iter = filters_.begin(); + iter != filters_.end(); + ++iter) { + (*iter)->SetPlaybackRate(playback_rate); + } +} + +void CompositeFilter::Seek(base::TimeDelta time, + FilterCallback* seek_callback) { + DCHECK_EQ(message_loop_, MessageLoop::current()); + scoped_ptr callback(seek_callback); + if (callback_.get()) { + SendErrorToHost(PIPELINE_ERROR_OPERATION_PENDING); + callback->Run(); + return; + } else if (!host() || (state_ != kPaused && state_ != kCreated)) { + SendErrorToHost(PIPELINE_ERROR_INVALID_STATE); + callback->Run(); + return; + } + + ChangeState(kSeekPending); + callback_.reset(callback.release()); + pending_seek_time_ = time; + StartSerialCallSequence(); +} + +void CompositeFilter::OnAudioRendererDisabled() { + DCHECK_EQ(message_loop_, MessageLoop::current()); + for (FilterVector::iterator iter = filters_.begin(); + iter != filters_.end(); + ++iter) { + (*iter)->OnAudioRendererDisabled(); + } +} + +base::Thread* CompositeFilter::DefaultThreadFactory( + const char* thread_name) { + return new base::Thread(thread_name); +} + +void CompositeFilter::ChangeState(State new_state) { + DCHECK_EQ(message_loop_, MessageLoop::current()); + state_ = new_state; +} + +void CompositeFilter::StartSerialCallSequence() { + DCHECK_EQ(message_loop_, MessageLoop::current()); + error_ = PIPELINE_OK; + + if (filters_.size() > 0) { + sequence_index_ = 0; + CallFilter(filters_[sequence_index_], + NewThreadSafeCallback(&CompositeFilter::SerialCallback)); + } else { + sequence_index_ = 0; + SerialCallback(); + } +} + +void CompositeFilter::StartParallelCallSequence() { + DCHECK_EQ(message_loop_, MessageLoop::current()); + error_ = PIPELINE_OK; + + if (filters_.size() > 0) { + sequence_index_ = 0; + for (size_t i = 0; i < filters_.size(); i++) { + CallFilter(filters_[i], + NewThreadSafeCallback(&CompositeFilter::ParallelCallback)); + } + } else { + sequence_index_ = 0; + ParallelCallback(); + } +} + +void CompositeFilter::CallFilter(scoped_refptr& filter, + FilterCallback* callback) { + switch(state_) { + case kPlayPending: + filter->Play(callback); + break; + case kPausePending: + filter->Pause(callback); + break; + case kFlushPending: + filter->Flush(callback); + break; + case kStopPending: + filter->Stop(callback); + break; + case kSeekPending: + filter->Seek(pending_seek_time_, callback); + break; + default: + delete callback; + ChangeState(kError); + HandleError(PIPELINE_ERROR_INVALID_STATE); + } +} + +void CompositeFilter::DispatchPendingCallback() { + if (callback_.get()) { + scoped_ptr callback(callback_.release()); + callback->Run(); + } +} + +CompositeFilter::State CompositeFilter::GetNextState(State state) const { + State ret = kInvalid; + switch (state) { + case kPlayPending: + ret = kPlaying; + break; + case kPausePending: + ret = kPaused; + case kFlushPending: + ret = kPaused; + break; + case kStopPending: + ret = kStopped; + break; + case kSeekPending: + ret = kPaused; + break; + case kStopWhilePlayPending: + case kStopWhilePausePending: + case kStopWhileFlushPending: + case kStopWhileSeekPending: + ret = kStopPending; + break; + + case kInvalid: + case kCreated: + case kPlaying: + case kPaused: + case kStopped: + case kError: + ret = kInvalid; + break; + + // default: intentionally left out to catch missing states. + } + + return ret; +} + +void CompositeFilter::SerialCallback() { + DCHECK_EQ(message_loop_, MessageLoop::current()); + if (error_ != PIPELINE_OK) { + // We encountered an error. Terminate the sequence now. + ChangeState(kError); + HandleError(error_); + return; + } + + if (filters_.size() > 0) + sequence_index_++; + + if (sequence_index_ == filters_.size()) { + // All filters have been successfully called without error. + OnCallSequenceDone(); + } else if (GetNextState(state_) == kStopPending) { + // Abort sequence early and start issuing Stop() calls. + ChangeState(kStopPending); + StartSerialCallSequence(); + } else { + // We aren't done with the sequence. Call the next filter. + CallFilter(filters_[sequence_index_], + NewThreadSafeCallback(&CompositeFilter::SerialCallback)); + } +} + +void CompositeFilter::ParallelCallback() { + DCHECK_EQ(message_loop_, MessageLoop::current()); + + if (filters_.size() > 0) + sequence_index_++; + + if (sequence_index_ == filters_.size()) { + if (error_ != PIPELINE_OK) { + // We encountered an error. + ChangeState(kError); + HandleError(error_); + return; + } + + OnCallSequenceDone(); + } +} + +void CompositeFilter::OnCallSequenceDone() { + State next_state = GetNextState(state_); + + if (next_state == kInvalid) { + // We somehow got into an unexpected state. + ChangeState(kError); + HandleError(PIPELINE_ERROR_INVALID_STATE); + } + + ChangeState(next_state); + + if (state_ == kStopPending) { + // Handle a deferred Stop(). + StartSerialCallSequence(); + } else { + // Call the callback to indicate that the operation has completed. + DispatchPendingCallback(); + } +} + +void CompositeFilter::SendErrorToHost(PipelineError error) { + if (host_impl_.get()) + host_impl_.get()->host()->SetError(error); +} + +void CompositeFilter::HandleError(PipelineError error) { + if (error != PIPELINE_OK) { + SendErrorToHost(error); + } + + DispatchPendingCallback(); +} + +FilterCallback* CompositeFilter::NewThreadSafeCallback( + void (CompositeFilter::*method)()) { + return TaskToCallbackAdapter::NewCallback( + NewRunnableMethod(this, + &CompositeFilter::OnCallback, + message_loop_, + method)); +} + +void CompositeFilter::OnCallback(MessageLoop* message_loop, + void (CompositeFilter::*method)()) { + if (MessageLoop::current() != message_loop) { + // Posting callback to the proper thread. + message_loop->PostTask(FROM_HERE, NewRunnableMethod(this, method)); + return; + } + + (this->*method)(); +} + +bool CompositeFilter::CanForwardError() { + return (state_ == kCreated) || (state_ == kPlaying) || (state_ == kPaused); +} + +void CompositeFilter::SetError(PipelineError error) { + // TODO(acolwell): Temporary hack to handle errors that occur + // during filter initialization. In this case we just forward + // the error to the host even if it is on the wrong thread. We + // have to do this because if we defer the call, we can't be + // sure the host will get the error before the "init done" callback + // is executed. This will be cleaned up when filter init is refactored. + if (state_ == kCreated) { + SendErrorToHost(error); + return; + } + + if (message_loop_ != MessageLoop::current()) { + message_loop_->PostTask(FROM_HERE, + NewRunnableMethod(this, &CompositeFilter::SetError, error)); + return; + } + + DCHECK_EQ(message_loop_, MessageLoop::current()); + + // Drop errors recieved while stopping or stopped. + // This shields the owner of this object from having + // to deal with errors it can't do anything about. + if (state_ == kStopPending || state_ == kStopped) + return; + + error_ = error; + if (CanForwardError()) + SendErrorToHost(error); +} + +CompositeFilter::FilterHostImpl::FilterHostImpl(CompositeFilter* parent, + FilterHost* host) : + parent_(parent), + host_(host) { +} + +FilterHost* CompositeFilter::FilterHostImpl::host() { + return host_; +} + +// media::FilterHost methods. +void CompositeFilter::FilterHostImpl::SetError(PipelineError error) { + parent_->SetError(error); +} + +base::TimeDelta CompositeFilter::FilterHostImpl::GetTime() const { + return host_->GetTime(); +} + +base::TimeDelta CompositeFilter::FilterHostImpl::GetDuration() const { + return host_->GetDuration(); +} + +void CompositeFilter::FilterHostImpl::SetTime(base::TimeDelta time) { + host_->SetTime(time); +} + +void CompositeFilter::FilterHostImpl::SetDuration(base::TimeDelta duration) { + host_->SetDuration(duration); +} + +void CompositeFilter::FilterHostImpl::SetBufferedTime( + base::TimeDelta buffered_time) { + host_->SetBufferedTime(buffered_time); +} + +void CompositeFilter::FilterHostImpl::SetTotalBytes(int64 total_bytes) { + host_->SetTotalBytes(total_bytes); +} + +void CompositeFilter::FilterHostImpl::SetBufferedBytes(int64 buffered_bytes) { + host_->SetBufferedBytes(buffered_bytes); +} + +void CompositeFilter::FilterHostImpl::SetVideoSize(size_t width, + size_t height) { + host_->SetVideoSize(width, height); +} + +void CompositeFilter::FilterHostImpl::SetStreaming(bool streaming) { + host_->SetStreaming(streaming); +} + +void CompositeFilter::FilterHostImpl::NotifyEnded() { + host_->NotifyEnded(); +} + +void CompositeFilter::FilterHostImpl::SetLoaded(bool loaded) { + host_->SetLoaded(loaded); +} + +void CompositeFilter::FilterHostImpl::SetNetworkActivity( + bool network_activity) { + host_->SetNetworkActivity(network_activity); +} + +void CompositeFilter::FilterHostImpl::DisableAudioRenderer() { + host_->DisableAudioRenderer(); +} + +void CompositeFilter::FilterHostImpl::SetCurrentReadPosition(int64 offset) { + host_->SetCurrentReadPosition(offset); +} + +int64 CompositeFilter::FilterHostImpl::GetCurrentReadPosition() { + return host_->GetCurrentReadPosition(); +} + +} // namespace media diff --git a/media/base/composite_filter.h b/media/base/composite_filter.h new file mode 100644 index 0000000..d39b79d --- /dev/null +++ b/media/base/composite_filter.h @@ -0,0 +1,165 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef MEDIA_BASE_COMPOSITE_FILTER_H_ +#define MEDIA_BASE_COMPOSITE_FILTER_H_ + +#include "base/threading/thread.h" +#include "media/base/filter_host.h" +#include "media/base/filters.h" + +namespace media { + +class CompositeFilter : public Filter { + public: + typedef base::Thread* (*ThreadFactoryFunction)(const char* thread_name); + + CompositeFilter(MessageLoop* message_loop); + + // Constructor that allows the default thread creation strategy to be + // overridden. + CompositeFilter(MessageLoop* message_loop, + ThreadFactoryFunction thread_factory); + + // Adds a filter to the composite. This is only allowed after set_host() + // is called and before the first state changing operation such as Play(), + // Flush(), Stop(), or Seek(). True is returned if the filter was successfully + // added to the composite. False is returned if the filter couldn't be added + // because the composite is in the wrong state or the filter needed a thread + // and the composite was unable to create one. + bool AddFilter(scoped_refptr filter); + + // media::Filter methods. + virtual const char* major_mime_type() const; + virtual void set_host(FilterHost* host); + virtual FilterHost* host(); + virtual bool requires_message_loop() const; + virtual const char* message_loop_name() const; + virtual void set_message_loop(MessageLoop* message_loop); + virtual MessageLoop* message_loop(); + virtual void Play(FilterCallback* play_callback); + virtual void Pause(FilterCallback* pause_callback); + virtual void Flush(FilterCallback* flush_callback); + virtual void Stop(FilterCallback* stop_callback); + virtual void SetPlaybackRate(float playback_rate); + virtual void Seek(base::TimeDelta time, FilterCallback* seek_callback); + virtual void OnAudioRendererDisabled(); + + protected: + virtual ~CompositeFilter(); + + /// Default thread factory strategy. + static base::Thread* DefaultThreadFactory(const char* thread_name); + + void SetError(PipelineError error); + + private: + class FilterHostImpl; + + enum State { + kInvalid, + kCreated, + kPaused, + kPlayPending, + kStopWhilePlayPending, + kPlaying, + kPausePending, + kStopWhilePausePending, + kFlushPending, + kStopWhileFlushPending, + kSeekPending, + kStopWhileSeekPending, + kStopPending, + kStopped, + kError + }; + + // Initialization method called by constructors. + void Init(MessageLoop* message_loop, ThreadFactoryFunction thread_factory); + + // Transition to a new state. + void ChangeState(State new_state); + + // Start calling filters in a sequence. + void StartSerialCallSequence(); + + // Call filters in parallel. + void StartParallelCallSequence(); + + // Call the filter based on the current value of state_. + void CallFilter(scoped_refptr& filter, FilterCallback* callback); + + // Calls |callback_| and then clears the reference. + void DispatchPendingCallback(); + + // Gets the state to transition to given |state|. + State GetNextState(State state) const; + + // Filter callback for a serial sequence. + void SerialCallback(); + + // Filter callback for a parallel sequence. + void ParallelCallback(); + + // Called when a parallel or serial call sequence completes. + void OnCallSequenceDone(); + + // Helper function for sending an error to the FilterHost. + void SendErrorToHost(PipelineError error); + + // Helper function for handling errors during call sequences. + void HandleError(PipelineError error); + + // Creates a callback that can be called from any thread, but is guaranteed + // to call the specified method on the thread associated with this filter. + FilterCallback* NewThreadSafeCallback(void (CompositeFilter::*method)()); + + // Helper function used by NewThreadSafeCallback() to make sure the + // method gets called on the right thread. + void OnCallback(MessageLoop* message_loop, + void (CompositeFilter::*method)()); + + // Helper function that indicates whether SetError() calls can be forwarded + // to the host of this filter. + bool CanForwardError(); + + // Vector of threads owned by the composite and used by filters in |filters_|. + typedef std::vector FilterThreadVector; + FilterThreadVector filter_threads_; + + // Vector of the filters added to the composite. + typedef std::vector > FilterVector; + FilterVector filters_; + + // Factory function used to create filter threads. + ThreadFactoryFunction thread_factory_; + + // Callback for the pending request. + scoped_ptr callback_; + + // Time parameter for the pending Seek() request. + base::TimeDelta pending_seek_time_; + + // Current state of this filter. + State state_; + + // The index of the filter currently processing a request. + unsigned int sequence_index_; + + // Message loop passed into the constructor. + MessageLoop* message_loop_; + + // FilterHost implementation passed to Filters owned by this + // object. + scoped_ptr host_impl_; + + // Error passed in the last SetError() call. + PipelineError error_; + + DISALLOW_COPY_AND_ASSIGN(CompositeFilter); +}; + +} // namespace media + +#endif // MEDIA_BASE_COMPOSITE_FILTER_H_ diff --git a/media/base/composite_filter_unittest.cc b/media/base/composite_filter_unittest.cc new file mode 100644 index 0000000..fe757c3 --- /dev/null +++ b/media/base/composite_filter_unittest.cc @@ -0,0 +1,799 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "media/base/composite_filter.h" +#include "media/base/mock_filter_host.h" +#include "media/base/mock_filters.h" +#include "testing/gtest/include/gtest/gtest.h" + +using ::testing::_; +using ::testing::InSequence; +using ::testing::Return; +using ::testing::SaveArg; +using ::testing::StrictMock; + +namespace media { + +class CompositeFilterTest : public testing::Test { + public: + CompositeFilterTest(); + virtual ~CompositeFilterTest(); + + // Sets up a new CompositeFilter in |composite_|, creates |filter_1_| and + // |filter_2_|, and adds these filters to |composite_|. + void SetupAndAdd2Filters(); + + // Helper enum that indicates what filter method to call. + enum MethodToCall { + PLAY, + PAUSE, + FLUSH, + STOP, + SEEK, + }; + + // Helper method that adds a filter method call expectation based on the value + // of |method_to_call|. + // + // |method_to_call| - Indicates which method we expect a call for. + // |filter| - The MockFilter to add the expectation to. + // |seek_time| - The time to pass to the Seek() call if |method_to_call| + // equals SEEK. + void ExpectFilterCall(MethodToCall method_to_call, MockFilter* filter, + base::TimeDelta seek_time); + + // Helper method that calls a filter method based on the value of + // |method_to_call|. + // + // |method_to_call| - Indicates which method to call. + // |filter| - The Filter to make the method call on. + // |seek_time| - The time to pass to the Seek() call if |method_to_call| + // equals SEEK. + // |callback| - The callback object to pass to the method. + void DoFilterCall(MethodToCall method_to_call, Filter* filter, + base::TimeDelta seek_time, + FilterCallback* callback); + + // Creates an expectation sequence based on the value of method_to_call. + // + // |method_to_call| - Indicates which method we want a success sequence for. + // |seek_time| - The time to pass in the Seek() call if |method_to_call| + // equals SEEK. + void ExpectSuccess(MethodToCall method_to_call, + base::TimeDelta seek_time = base::TimeDelta()); + + // Issue a Play(), Pause(), Flush(), Stop(), or Seek() on the composite and + // verify all the expected calls on the filters. + void DoPlay(); + void DoPause(); + void DoFlush(); + void DoStop(); + void DoSeek(base::TimeDelta time); + + // Issue a Play(), Pause(), Flush(), or Seek() and expect the calls to fail + // with a PIPELINE_ERROR_INVALID_STATE error. + // + // |method_to_call| - Indicates whick method to call. + // |seek_time| - The time to pass to the Seek() call if |method_to_call| + // equals SEEK. + void ExpectInvalidStateFail(MethodToCall method_to_call, + base::TimeDelta seek_time = base::TimeDelta()); + + // Run the callback stored in |filter_1_callback_|. + void RunFilter1Callback(); + + // Run the callback stored in |filter_2_callback_|. + void RunFilter2Callback(); + + protected: + MessageLoop message_loop_; + + // The composite object being tested. + scoped_refptr composite_; + + // First filter added to the composite. + scoped_refptr > filter_1_; + + // Callback passed to |filter_1_| during last Play(), Pause(), Flush(), + // Stop(), or Seek() call. + FilterCallback* filter_1_callback_; + + // Second filter added to the composite. + scoped_refptr > filter_2_; + + // Callback passed to |filter_2_| during last Play(), Pause(), Flush(), + // Stop(), or Seek() call. + FilterCallback* filter_2_callback_; + + // FilterHost implementation passed to |composite_| via set_host(). + scoped_ptr > mock_filter_host_; + + DISALLOW_COPY_AND_ASSIGN(CompositeFilterTest); +}; + +CompositeFilterTest::CompositeFilterTest() : + composite_(new CompositeFilter(&message_loop_)), + filter_1_callback_(NULL), + filter_2_callback_(NULL), + mock_filter_host_(new StrictMock()) { +} + +CompositeFilterTest::~CompositeFilterTest() {} + +void CompositeFilterTest::SetupAndAdd2Filters() { + mock_filter_host_.reset(new StrictMock()); + composite_ = new CompositeFilter(&message_loop_); + composite_->set_host(mock_filter_host_.get()); + + // Setup |filter_1_| and arrange for methods to set + // |filter_1_callback_| when they are called. + filter_1_ = new StrictMock(); + filter_1_callback_ = NULL; + ON_CALL(*filter_1_, Play(_)) + .WillByDefault(SaveArg<0>(&filter_1_callback_)); + ON_CALL(*filter_1_, Pause(_)) + .WillByDefault(SaveArg<0>(&filter_1_callback_)); + ON_CALL(*filter_1_, Flush(_)) + .WillByDefault(SaveArg<0>(&filter_1_callback_)); + ON_CALL(*filter_1_, Stop(_)) + .WillByDefault(SaveArg<0>(&filter_1_callback_)); + ON_CALL(*filter_1_, Seek(_,_)) + .WillByDefault(SaveArg<1>(&filter_1_callback_)); + + // Setup |filter_2_| and arrange for methods to set + // |filter_2_callback_| when they are called. + filter_2_ = new StrictMock(); + filter_2_callback_ = NULL; + ON_CALL(*filter_2_, Play(_)) + .WillByDefault(SaveArg<0>(&filter_2_callback_)); + ON_CALL(*filter_2_, Pause(_)) + .WillByDefault(SaveArg<0>(&filter_2_callback_)); + ON_CALL(*filter_2_, Flush(_)) + .WillByDefault(SaveArg<0>(&filter_2_callback_)); + ON_CALL(*filter_2_, Stop(_)) + .WillByDefault(SaveArg<0>(&filter_2_callback_)); + ON_CALL(*filter_2_, Seek(_,_)) + .WillByDefault(SaveArg<1>(&filter_2_callback_)); + + composite_->AddFilter(filter_1_); + composite_->AddFilter(filter_2_); +} + +void CompositeFilterTest::ExpectFilterCall(MethodToCall method_to_call, + MockFilter* filter, + base::TimeDelta seek_time) { + switch(method_to_call) { + case PLAY: + EXPECT_CALL(*filter, Play(_)); + break; + case PAUSE: + EXPECT_CALL(*filter, Pause(_)); + break; + case FLUSH: + EXPECT_CALL(*filter, Flush(_)); + break; + case STOP: + EXPECT_CALL(*filter, Stop(_)); + break; + case SEEK: + EXPECT_CALL(*filter, Seek(seek_time, _)); + break; + }; +} + +void CompositeFilterTest::DoFilterCall(MethodToCall method_to_call, + Filter* filter, + base::TimeDelta seek_time, + FilterCallback* callback) { + switch(method_to_call) { + case PLAY: + filter->Play(callback); + break; + case PAUSE: + filter->Pause(callback); + break; + case FLUSH: + filter->Flush(callback); + break; + case STOP: + filter->Stop(callback); + break; + case SEEK: + filter->Seek(seek_time, callback); + break; + }; +} + +void CompositeFilterTest::ExpectSuccess(MethodToCall method_to_call, + base::TimeDelta seek_time) { + InSequence seq; + + scoped_ptr > mock_callback( + new StrictMock(false)); + + bool is_parallel_call = (method_to_call == FLUSH); + + ExpectFilterCall(method_to_call, filter_1_.get(), seek_time); + + if (is_parallel_call) { + ExpectFilterCall(method_to_call, filter_2_.get(), seek_time); + } + + // Make method call on the composite. + DoFilterCall(method_to_call, composite_.get(), seek_time, + mock_callback->NewCallback()); + + if (is_parallel_call) { + // Make sure both filters have their callbacks set. + EXPECT_TRUE(filter_1_callback_ != NULL); + EXPECT_TRUE(filter_2_callback_ != NULL); + + RunFilter1Callback(); + } else { + // Make sure that only |filter_1_| has its callback set. + EXPECT_TRUE(filter_1_callback_ != NULL); + EXPECT_EQ((FilterCallback*)NULL, filter_2_callback_); + + ExpectFilterCall(method_to_call, filter_2_.get(), seek_time); + + RunFilter1Callback(); + + // Verify that |filter_2_| was called by checking the callback pointer. + EXPECT_TRUE(filter_2_callback_ != NULL); + } + + EXPECT_CALL(*mock_callback, OnFilterCallback()); + + RunFilter2Callback(); +} + +void CompositeFilterTest::DoPlay() { + ExpectSuccess(PLAY); +} + +void CompositeFilterTest::DoPause() { + ExpectSuccess(PAUSE); +} + +void CompositeFilterTest::DoFlush() { + ExpectSuccess(FLUSH); +} + +void CompositeFilterTest::DoStop() { + ExpectSuccess(STOP); +} + +void CompositeFilterTest::DoSeek(base::TimeDelta time) { + ExpectSuccess(SEEK, time); +} + +void CompositeFilterTest::ExpectInvalidStateFail(MethodToCall method_to_call, + base::TimeDelta seek_time) { + InSequence seq; + scoped_ptr > mock_callback( + new StrictMock(false)); + + EXPECT_CALL(*mock_filter_host_, SetError(PIPELINE_ERROR_INVALID_STATE)) + .WillOnce(Return()); + EXPECT_CALL(*mock_callback, OnFilterCallback()); + + DoFilterCall(method_to_call, composite_, seek_time, + mock_callback->NewCallback()); + + // Make sure that neither of the filters were called by + // verifying that the callback pointers weren't set. + EXPECT_EQ((FilterCallback*)NULL, filter_1_callback_); + EXPECT_EQ((FilterCallback*)NULL, filter_2_callback_); +} + +void CompositeFilterTest::RunFilter1Callback() { + EXPECT_TRUE(filter_1_callback_ != NULL); + FilterCallback* callback = filter_1_callback_; + filter_1_callback_ = NULL; + callback->Run(); + delete callback; +} + +void CompositeFilterTest::RunFilter2Callback() { + EXPECT_TRUE(filter_2_callback_ != NULL); + FilterCallback* callback = filter_2_callback_; + filter_2_callback_ = NULL; + callback->Run(); + delete callback; +} + +static base::Thread* NullThreadFactory(const char* thread_name) { + return NULL; +} + +// Test AddFilter() failure cases. +TEST_F(CompositeFilterTest, TestAddFilterFailCases) { + // Test adding a null pointer. + EXPECT_FALSE(composite_->AddFilter(NULL)); + + scoped_refptr > filter = + new StrictMock(true); + EXPECT_EQ(NULL, filter->host()); + EXPECT_EQ(NULL, filter->message_loop()); + + // Test failing because set_host() hasn't been called yet. + EXPECT_FALSE(composite_->AddFilter(filter)); + + // Test thread creation failure. + composite_ = new CompositeFilter(&message_loop_, &NullThreadFactory); + composite_->set_host(mock_filter_host_.get()); + EXPECT_FALSE(composite_->AddFilter(filter)); + EXPECT_EQ(NULL, filter->host()); + EXPECT_EQ(NULL, filter->message_loop()); +} + +// Test successful AddFilter() cases. +TEST_F(CompositeFilterTest, TestAddFilter) { + composite_->set_host(mock_filter_host_.get()); + + // Add a filter that doesn't require a message loop. + scoped_refptr > filter = new StrictMock(); + EXPECT_EQ(NULL, filter->host()); + EXPECT_EQ(NULL, filter->message_loop()); + + EXPECT_TRUE(composite_->AddFilter(filter)); + + EXPECT_TRUE(filter->host() != NULL); + EXPECT_EQ(NULL, filter->message_loop()); + + // Add a filter that requires a message loop. + scoped_refptr > filter_2 = + new StrictMock(true); + + EXPECT_EQ(NULL, filter_2->host()); + EXPECT_EQ(NULL, filter_2->message_loop()); + + EXPECT_TRUE(composite_->AddFilter(filter_2)); + + EXPECT_TRUE(filter_2->host() != NULL); + EXPECT_TRUE(filter_2->message_loop() != NULL); +} + +TEST_F(CompositeFilterTest, TestPlay) { + InSequence sequence; + + SetupAndAdd2Filters(); + + // Verify successful call to Play(). + DoPlay(); + + // At this point we are now in the kPlaying state. + scoped_ptr > mock_callback( + new StrictMock(false)); + + // Try calling Play() again to make sure that we simply get a callback. + // We are already in the Play() state so there is no point calling the + // filters. + EXPECT_CALL(*mock_callback, OnFilterCallback()); + + composite_->Play(mock_callback->NewCallback()); + + // Verify that neither of the filter callbacks were set. + EXPECT_EQ((FilterCallback*)NULL, filter_1_callback_); + EXPECT_EQ((FilterCallback*)NULL, filter_2_callback_); + + // Stop playback. + DoStop(); + + // At this point we should be in the kStopped state. + + // Try calling Stop() again to make sure neither filter is called. + EXPECT_CALL(*mock_callback, OnFilterCallback()); + + composite_->Stop(mock_callback->NewCallback()); + + // Verify that neither of the filter callbacks were set. + EXPECT_EQ((FilterCallback*)NULL, filter_1_callback_); + EXPECT_EQ((FilterCallback*)NULL, filter_2_callback_); + + // Try calling Play() again to make sure we get an error. + ExpectInvalidStateFail(PLAY); +} + +// Test errors in the middle of a serial call sequence like Play(). +TEST_F(CompositeFilterTest, TestPlayErrors) { + InSequence sequence; + + SetupAndAdd2Filters(); + + scoped_ptr > mock_callback( + new StrictMock(false)); + + EXPECT_CALL(*filter_1_, Play(_)); + + // Call Play() on the composite. + composite_->Play(mock_callback->NewCallback()); + + EXPECT_CALL(*filter_2_, Play(_)); + + // Run callback to indicate that |filter_1_|'s Play() has completed. + RunFilter1Callback(); + + // At this point Play() has been called on |filter_2_|. Simulate an + // error by calling SetError() on its FilterHost interface. + filter_2_->host()->SetError(PIPELINE_ERROR_OUT_OF_MEMORY); + + // Expect error to be reported and "play done" callback to be called. + EXPECT_CALL(*mock_filter_host_, SetError(PIPELINE_ERROR_OUT_OF_MEMORY)); + EXPECT_CALL(*mock_callback, OnFilterCallback()); + + // Run callback to indicate that |filter_2_|'s Play() has completed. + RunFilter2Callback(); + + // Verify that Play/Pause/Flush/Seek fail now that an error occured. + ExpectInvalidStateFail(PLAY); + ExpectInvalidStateFail(PAUSE); + ExpectInvalidStateFail(FLUSH); + ExpectInvalidStateFail(SEEK); + + // Make sure you can still Stop(). + DoStop(); +} + +TEST_F(CompositeFilterTest, TestPause) { + InSequence sequence; + + SetupAndAdd2Filters(); + + scoped_ptr > mock_callback( + new StrictMock(false)); + + // Try calling Pause() to make sure we get an error because we aren't in + // the playing state. + ExpectInvalidStateFail(PAUSE); + + // Transition to playing state. + DoPlay(); + + // Issue a successful Pause(). + DoPause(); + + // At this point we are paused. + + // Try calling Pause() again to make sure that the filters aren't called + // because we are already in the paused state. + EXPECT_CALL(*mock_callback, OnFilterCallback()); + + composite_->Pause(mock_callback->NewCallback()); + + // Verify that neither of the filter callbacks were set. + EXPECT_EQ((FilterCallback*)NULL, filter_1_callback_); + EXPECT_EQ((FilterCallback*)NULL, filter_2_callback_); + + // Verify that we can transition pack to the play state. + DoPlay(); + + // Go back to the pause state. + DoPause(); + + // Transition to the stop state. + DoStop(); + + // Try calling Pause() to make sure we get an error because we aren't in + // the playing state. + ExpectInvalidStateFail(PAUSE); +} + +// Test errors in the middle of a serial call sequence like Pause(). +TEST_F(CompositeFilterTest, TestPauseErrors) { + InSequence sequence; + + SetupAndAdd2Filters(); + + DoPlay(); + + scoped_ptr > mock_callback( + new StrictMock(false)); + + EXPECT_CALL(*filter_1_, Pause(_)); + + // Call Pause() on the composite. + composite_->Pause(mock_callback->NewCallback()); + + // Simulate an error by calling SetError() on |filter_1_|'s FilterHost + // interface. + filter_1_->host()->SetError(PIPELINE_ERROR_OUT_OF_MEMORY); + + // Expect error to be reported and "pause done" callback to be called. + EXPECT_CALL(*mock_filter_host_, SetError(PIPELINE_ERROR_OUT_OF_MEMORY)); + EXPECT_CALL(*mock_callback, OnFilterCallback()); + + RunFilter1Callback(); + + // Make sure |filter_2_callback_| was not set. + EXPECT_EQ((FilterCallback*)NULL, filter_2_callback_); + + // Verify that Play/Pause/Flush/Seek fail now that an error occured. + ExpectInvalidStateFail(PLAY); + ExpectInvalidStateFail(PAUSE); + ExpectInvalidStateFail(FLUSH); + ExpectInvalidStateFail(SEEK); + + // Make sure you can still Stop(). + DoStop(); +} + +TEST_F(CompositeFilterTest, TestFlush) { + InSequence sequence; + + SetupAndAdd2Filters(); + + scoped_ptr > mock_callback( + new StrictMock(false)); + + // Make sure Flush() works before calling Play(). + DoFlush(); + + // Transition to playing state. + DoPlay(); + + // Call Flush() to make sure we get an error because we are in + // the playing state. + ExpectInvalidStateFail(FLUSH); + + // Issue a successful Pause(). + DoPause(); + + // Make sure Flush() works after pausing. + DoFlush(); + + // Verify that we can transition back to the play state. + DoPlay(); + + // Transition to the stop state. + DoStop(); + + // Try calling Flush() to make sure we get an error because we are stopped. + ExpectInvalidStateFail(FLUSH); +} + +// Test errors in the middle of a parallel call sequence like Flush(). +TEST_F(CompositeFilterTest, TestFlushErrors) { + InSequence sequence; + + SetupAndAdd2Filters(); + + scoped_ptr > mock_callback( + new StrictMock(false)); + + EXPECT_CALL(*filter_1_, Flush(_)); + EXPECT_CALL(*filter_2_, Flush(_)); + + // Call Flush() on the composite. + composite_->Flush(mock_callback->NewCallback()); + + // Simulate an error by calling SetError() on |filter_1_|'s FilterHost + // interface. + filter_1_->host()->SetError(PIPELINE_ERROR_OUT_OF_MEMORY); + + RunFilter1Callback(); + + // Expect error to be reported and "pause done" callback to be called. + EXPECT_CALL(*mock_filter_host_, SetError(PIPELINE_ERROR_OUT_OF_MEMORY)); + EXPECT_CALL(*mock_callback, OnFilterCallback()); + + RunFilter2Callback(); + + // Verify that Play/Pause/Flush/Seek fail now that an error occured. + ExpectInvalidStateFail(PLAY); + ExpectInvalidStateFail(PAUSE); + ExpectInvalidStateFail(FLUSH); + ExpectInvalidStateFail(SEEK); + + // Make sure you can still Stop(). + DoStop(); +} + +TEST_F(CompositeFilterTest, TestSeek) { + InSequence sequence; + + SetupAndAdd2Filters(); + + // Verify that seek is allowed to be called before a Play() call. + DoSeek(base::TimeDelta::FromSeconds(5)); + + // Verify we can issue a Play() after the Seek(). + DoPlay(); + + scoped_ptr > mock_callback( + new StrictMock(false)); + + // Try calling Seek() while playing to make sure we get an error. + ExpectInvalidStateFail(SEEK); + + // Transition to paused state. + DoPause(); + + // Verify that seek is allowed after pausing. + DoSeek(base::TimeDelta::FromSeconds(5)); + + // Verify we can still play again. + DoPlay(); + + // Stop playback. + DoStop(); + + // Try calling Seek() to make sure we get an error. + ExpectInvalidStateFail(SEEK); +} + +TEST_F(CompositeFilterTest, TestStop) { + InSequence sequence; + + // Test Stop() before any other call. + SetupAndAdd2Filters(); + DoStop(); + + // Test error during Stop() sequence. + SetupAndAdd2Filters(); + scoped_ptr > mock_callback( + new StrictMock(false)); + + EXPECT_CALL(*filter_1_, Stop(_)); + + composite_->Stop(mock_callback->NewCallback()); + + // Have |filter_1_| signal an error. + filter_1_->host()->SetError(PIPELINE_ERROR_READ); + + EXPECT_CALL(*filter_2_, Stop(_)); + + RunFilter1Callback(); + + EXPECT_CALL(*mock_callback, OnFilterCallback()); + + RunFilter2Callback(); +} + +// Test stopping in the middle of a serial call sequence. +TEST_F(CompositeFilterTest, TestStopWhilePlayPending) { + InSequence sequence; + + SetupAndAdd2Filters(); + + scoped_ptr > mock_callback( + new StrictMock()); + + EXPECT_CALL(*filter_1_, Play(_)); + + composite_->Play(mock_callback->NewCallback()); + + // Note: Play() is pending on |filter_1_| right now. + + scoped_ptr > mock_callback_2( + new StrictMock(false)); + + EXPECT_CALL(*mock_callback, OnCallbackDestroyed()); + + composite_->Stop(mock_callback_2->NewCallback()); + + EXPECT_CALL(*filter_1_, Stop(_)); + + // Run |filter_1_|'s callback again to indicate Play() has completed. + RunFilter1Callback(); + + EXPECT_CALL(*filter_2_, Stop(_)); + + // Run |filter_1_|'s callback again to indicate Stop() has completed. + RunFilter1Callback(); + + EXPECT_CALL(*mock_callback_2, OnFilterCallback()); + + // Run |filter_2_|'s callback to indicate Stop() has completed. + RunFilter2Callback(); +} + +// Test stopping in the middle of a parallel call sequence. +TEST_F(CompositeFilterTest, TestStopWhileFlushPending) { + InSequence sequence; + + SetupAndAdd2Filters(); + + scoped_ptr > mock_callback( + new StrictMock()); + + EXPECT_CALL(*filter_1_, Flush(_)); + EXPECT_CALL(*filter_2_, Flush(_)); + + composite_->Flush(mock_callback->NewCallback()); + + // Note: |filter_1_| and |filter_2_| have pending Flush() calls at this point. + + scoped_ptr > mock_callback_2( + new StrictMock(false)); + + EXPECT_CALL(*mock_callback, OnCallbackDestroyed()); + + composite_->Stop(mock_callback_2->NewCallback()); + + // Run callback to indicate that |filter_1_|'s Flush() has completed. + RunFilter1Callback(); + + EXPECT_CALL(*filter_1_, Stop(_)); + + // Run callback to indicate that |filter_2_|'s Flush() has completed. + RunFilter2Callback(); + + EXPECT_CALL(*filter_2_, Stop(_)); + + // Run callback to indicate that |filter_1_|'s Stop() has completed. + RunFilter1Callback(); + + EXPECT_CALL(*mock_callback_2, OnFilterCallback()); + + // Run callback to indicate that |filter_2_|'s Stop() has completed. + RunFilter2Callback(); +} + +TEST_F(CompositeFilterTest, TestErrorWhilePlaying) { + InSequence sequence; + + SetupAndAdd2Filters(); + + // Simulate an error on |filter_2_| while in kCreated state. This + // can happen if an error occurs during filter initialization. + EXPECT_CALL(*mock_filter_host_, SetError(PIPELINE_ERROR_OUT_OF_MEMORY)); + filter_2_->host()->SetError(PIPELINE_ERROR_OUT_OF_MEMORY); + + DoPlay(); + + // Simulate an error on |filter_2_| while playing. + EXPECT_CALL(*mock_filter_host_, SetError(PIPELINE_ERROR_OUT_OF_MEMORY)); + filter_2_->host()->SetError(PIPELINE_ERROR_OUT_OF_MEMORY); + + DoPause(); + + // Simulate an error on |filter_2_| while paused. + EXPECT_CALL(*mock_filter_host_, SetError(PIPELINE_ERROR_NETWORK)); + filter_2_->host()->SetError(PIPELINE_ERROR_NETWORK); + + DoStop(); + + // Verify that errors are not passed to |mock_filter_host_| + // after Stop() has been called. + filter_2_->host()->SetError(PIPELINE_ERROR_NETWORK); +} + +// Make sure that state transitions act as expected even +// if the composite doesn't contain any filters. +TEST_F(CompositeFilterTest, TestEmptyComposite) { + InSequence sequence; + + composite_->set_host(mock_filter_host_.get()); + + scoped_ptr > mock_callback( + new StrictMock(false)); + + // Issue a Play() and expect no errors. + EXPECT_CALL(*mock_callback, OnFilterCallback()); + composite_->Play(mock_callback->NewCallback()); + + // Issue a Pause() and expect no errors. + EXPECT_CALL(*mock_callback, OnFilterCallback()); + composite_->Pause(mock_callback->NewCallback()); + + // Issue a Flush() and expect no errors. + EXPECT_CALL(*mock_callback, OnFilterCallback()); + composite_->Flush(mock_callback->NewCallback()); + + // Issue a Seek() and expect no errors. + EXPECT_CALL(*mock_callback, OnFilterCallback()); + composite_->Seek(base::TimeDelta::FromSeconds(5), + mock_callback->NewCallback()); + + // Issue a Play() and expect no errors. + EXPECT_CALL(*mock_callback, OnFilterCallback()); + composite_->Play(mock_callback->NewCallback()); + + // Issue a Stop() and expect no errors. + EXPECT_CALL(*mock_callback, OnFilterCallback()); + composite_->Stop(mock_callback->NewCallback()); +} + +} // namespace media diff --git a/media/base/mock_filters.cc b/media/base/mock_filters.cc index 3347016..39ed730 100644 --- a/media/base/mock_filters.cc +++ b/media/base/mock_filters.cc @@ -6,12 +6,16 @@ namespace media { -MockFilterCallback::MockFilterCallback() {} +MockFilterCallback::MockFilterCallback() : run_destroy_callback_(true) {} + +MockFilterCallback::MockFilterCallback(bool run_destroy_callback) : + run_destroy_callback_(run_destroy_callback) { +} MockFilterCallback::~MockFilterCallback() {} FilterCallback* MockFilterCallback::NewCallback() { - return new CallbackImpl(this); + return new CallbackImpl(this, run_destroy_callback_); } MockDataSource::MockDataSource() {} @@ -82,4 +86,23 @@ void RunStopFilterCallback(FilterCallback* callback) { delete callback; } +MockFilter::MockFilter() : + requires_message_loop_(false) { +} + +MockFilter::MockFilter(bool requires_message_loop) : + requires_message_loop_(requires_message_loop) { +} + +MockFilter::~MockFilter() {} + +bool MockFilter::requires_message_loop() const { + return requires_message_loop_; +} + +const char* MockFilter::message_loop_name() const { + return "MockFilter"; +} + + } // namespace media diff --git a/media/base/mock_filters.h b/media/base/mock_filters.h index 5b5677b..5f43356 100644 --- a/media/base/mock_filters.h +++ b/media/base/mock_filters.h @@ -51,6 +51,7 @@ class Destroyable : public MockClass { class MockFilterCallback { public: MockFilterCallback(); + MockFilterCallback(bool run_destroy_callback); virtual ~MockFilterCallback(); MOCK_METHOD0(OnCallbackDestroyed, void()); @@ -67,12 +68,15 @@ class MockFilterCallback { // MockFilterCallback. class CallbackImpl : public CallbackRunner { public: - explicit CallbackImpl(MockFilterCallback* mock_callback) - : mock_callback_(mock_callback) { + explicit CallbackImpl(MockFilterCallback* mock_callback, + bool run_destroy_callback) + : mock_callback_(mock_callback), + run_destroy_callback_(run_destroy_callback) { } virtual ~CallbackImpl() { - mock_callback_->OnCallbackDestroyed(); + if (run_destroy_callback_) + mock_callback_->OnCallbackDestroyed(); } virtual void RunWithParams(const Tuple0& params) { @@ -81,13 +85,42 @@ class MockFilterCallback { private: MockFilterCallback* mock_callback_; + bool run_destroy_callback_; DISALLOW_COPY_AND_ASSIGN(CallbackImpl); }; + bool run_destroy_callback_; DISALLOW_COPY_AND_ASSIGN(MockFilterCallback); }; +class MockFilter : public Filter { + public: + MockFilter(); + MockFilter(bool requires_message_loop); + + // Filter implementation. + virtual bool requires_message_loop() const; + virtual const char* message_loop_name() const; + + MOCK_METHOD1(Play, void(FilterCallback* callback)); + MOCK_METHOD1(Pause, void(FilterCallback* callback)); + MOCK_METHOD1(Flush, void(FilterCallback* callback)); + MOCK_METHOD1(Stop, void(FilterCallback* callback)); + MOCK_METHOD1(SetPlaybackRate, void(float playback_rate)); + MOCK_METHOD2(Seek, void(base::TimeDelta time, FilterCallback* callback)); + MOCK_METHOD0(OnAudioRendererDisabled, void()); + + protected: + virtual ~MockFilter(); + + private: + + bool requires_message_loop_; + + DISALLOW_COPY_AND_ASSIGN(MockFilter); +}; + class MockDataSource : public DataSource { public: MockDataSource(); diff --git a/media/base/pipeline.h b/media/base/pipeline.h index 50158d2..61a1110 100644 --- a/media/base/pipeline.h +++ b/media/base/pipeline.h @@ -33,6 +33,8 @@ enum PipelineError { PIPELINE_ERROR_COULD_NOT_RENDER, PIPELINE_ERROR_READ, PIPELINE_ERROR_AUDIO_HARDWARE, + PIPELINE_ERROR_OPERATION_PENDING, + PIPELINE_ERROR_INVALID_STATE, // Demuxer related errors. DEMUXER_ERROR_COULD_NOT_OPEN, DEMUXER_ERROR_COULD_NOT_PARSE, diff --git a/media/base/pipeline_impl.cc b/media/base/pipeline_impl.cc index d295d6b..d071d58 100644 --- a/media/base/pipeline_impl.cc +++ b/media/base/pipeline_impl.cc @@ -22,6 +22,7 @@ class PipelineImpl::PipelineInitState { scoped_refptr demuxer_; scoped_refptr audio_decoder_; scoped_refptr video_decoder_; + scoped_refptr composite_; }; PipelineImpl::PipelineImpl(MessageLoop* message_loop) @@ -29,7 +30,6 @@ PipelineImpl::PipelineImpl(MessageLoop* message_loop) clock_(new ClockImpl(&base::Time::Now)), waiting_for_clock_update_(false), state_(kCreated), - remaining_transitions_(0), current_bytes_(0) { ResetState(); } @@ -317,6 +317,10 @@ void PipelineImpl::ResetState() { rendered_mime_types_.clear(); } +void PipelineImpl::set_state(State next_state) { + state_ = next_state; +} + bool PipelineImpl::IsPipelineOk() { return PIPELINE_OK == GetError(); } @@ -572,22 +576,25 @@ void PipelineImpl::InitializeTask() { // Just created, create data source. if (state_ == kCreated) { - state_ = kInitDataSource; + set_state(kInitDataSource); pipeline_init_state_.reset(new PipelineInitState()); + pipeline_init_state_->composite_ = new CompositeFilter(message_loop_); + pipeline_init_state_->composite_->set_host(this); + InitializeDataSource(); return; } // Data source created, create demuxer. if (state_ == kInitDataSource) { - state_ = kInitDemuxer; + set_state(kInitDemuxer); InitializeDemuxer(pipeline_init_state_->data_source_); return; } // Demuxer created, create audio decoder. if (state_ == kInitDemuxer) { - state_ = kInitAudioDecoder; + set_state(kInitAudioDecoder); // If this method returns false, then there's no audio stream. if (InitializeAudioDecoder(pipeline_init_state_->demuxer_)) return; @@ -595,7 +602,7 @@ void PipelineImpl::InitializeTask() { // Assuming audio decoder was created, create audio renderer. if (state_ == kInitAudioDecoder) { - state_ = kInitAudioRenderer; + set_state(kInitAudioRenderer); // Returns false if there's no audio stream. if (InitializeAudioRenderer(pipeline_init_state_->audio_decoder_)) { InsertRenderedMimeType(mime_type::kMajorTypeAudio); @@ -606,14 +613,14 @@ void PipelineImpl::InitializeTask() { // Assuming audio renderer was created, create video decoder. if (state_ == kInitAudioRenderer) { // Then perform the stage of initialization, i.e. initialize video decoder. - state_ = kInitVideoDecoder; + set_state(kInitVideoDecoder); if (InitializeVideoDecoder(pipeline_init_state_->demuxer_)) return; } // Assuming video decoder was created, create video renderer. if (state_ == kInitVideoDecoder) { - state_ = kInitVideoRenderer; + set_state(kInitVideoRenderer); if (InitializeVideoRenderer(pipeline_init_state_->video_decoder_)) { InsertRenderedMimeType(mime_type::kMajorTypeVideo); return; @@ -629,20 +636,27 @@ void PipelineImpl::InitializeTask() { // Clear the collection of filters. filter_collection_->Clear(); + pipeline_filter_ = pipeline_init_state_->composite_; + // Clear init state since we're done initializing. pipeline_init_state_.reset(); + if (audio_disabled_) { + // Audio was disabled at some point during initialization. Notify + // the pipeline filter now that it has been initialized. + pipeline_filter_->OnAudioRendererDisabled(); + } + // Initialization was successful, we are now considered paused, so it's safe // to set the initial playback rate and volume. PlaybackRateChangedTask(GetPlaybackRate()); VolumeChangedTask(GetVolume()); - // Fire the initial seek request to get the filters to preroll. + // Fire the seek request to get the filters to preroll. seek_pending_ = true; - state_ = kSeeking; - remaining_transitions_ = filters_.size(); + set_state(kSeeking); seek_timestamp_ = base::TimeDelta(); - filters_.front()->Seek(seek_timestamp_, + pipeline_filter_->Seek(seek_timestamp_, NewCallback(this, &PipelineImpl::OnFilterStateTransition)); } } @@ -705,10 +719,12 @@ void PipelineImpl::PlaybackRateChangedTask(float playback_rate) { AutoLock auto_lock(lock_); clock_->SetPlaybackRate(playback_rate); } - for (FilterVector::iterator iter = filters_.begin(); - iter != filters_.end(); - ++iter) { - (*iter)->SetPlaybackRate(playback_rate); + + // Notify |pipeline_filter_| if it has been initialized. If initialization + // hasn't completed yet, the playback rate will be set when initialization + // completes. + if (pipeline_filter_) { + pipeline_filter_->SetPlaybackRate(playback_rate); } } @@ -745,10 +761,9 @@ void PipelineImpl::SeekTask(base::TimeDelta time, // kSeeking (for each filter) // kStarting (for each filter) // kStarted - state_ = kPausing; + set_state(kPausing); seek_timestamp_ = time; seek_callback_.reset(seek_callback); - remaining_transitions_ = filters_.size(); // Kick off seeking! { @@ -757,7 +772,7 @@ void PipelineImpl::SeekTask(base::TimeDelta time, if (!waiting_for_clock_update_) clock_->Pause(); } - filters_.front()->Pause( + pipeline_filter_->Pause( NewCallback(this, &PipelineImpl::OnFilterStateTransition)); } @@ -790,7 +805,7 @@ void PipelineImpl::NotifyEndedTask() { } // Transition to ended, executing the callback if present. - state_ = kEnded; + set_state(kEnded); if (ended_callback_.get()) { ended_callback_->Run(); } @@ -813,11 +828,11 @@ void PipelineImpl::DisableAudioRendererTask() { audio_disabled_ = true; - // Notify all filters of disabled audio renderer. - for (FilterVector::iterator iter = filters_.begin(); - iter != filters_.end(); - ++iter) { - (*iter)->OnAudioRendererDisabled(); + // Notify all filters of disabled audio renderer. If the filter isn't + // initialized yet, OnAudioRendererDisabled() will be called when + // initialization is complete. + if (pipeline_filter_) { + pipeline_filter_->OnAudioRendererDisabled(); } } @@ -837,42 +852,31 @@ void PipelineImpl::FilterStateTransitionTask() { // Decrement the number of remaining transitions, making sure to transition // to the next state if needed. - DCHECK(remaining_transitions_ <= filters_.size()); - DCHECK(remaining_transitions_ > 0u); - if (--remaining_transitions_ == 0) { - state_ = FindNextState(state_); - if (state_ == kSeeking) { - AutoLock auto_lock(lock_); - clock_->SetTime(seek_timestamp_); - } - - if (TransientState(state_)) { - remaining_transitions_ = filters_.size(); - } + set_state(FindNextState(state_)); + if (state_ == kSeeking) { + AutoLock auto_lock(lock_); + clock_->SetTime(seek_timestamp_); } // Carry out the action for the current state. if (TransientState(state_)) { - Filter* filter = filters_[filters_.size() - remaining_transitions_]; if (state_ == kPausing) { - filter->Pause(NewCallback(this, &PipelineImpl::OnFilterStateTransition)); + pipeline_filter_->Pause( + NewCallback(this, &PipelineImpl::OnFilterStateTransition)); } else if (state_ == kFlushing) { - // We had to use parallel flushing all filters. - if (remaining_transitions_ == filters_.size()) { - for (size_t i = 0; i < filters_.size(); i++) { - filters_[i]->Flush( - NewCallback(this, &PipelineImpl::OnFilterStateTransition)); - } - } + pipeline_filter_->Flush( + NewCallback(this, &PipelineImpl::OnFilterStateTransition)); } else if (state_ == kSeeking) { - filter->Seek(seek_timestamp_, + pipeline_filter_->Seek(seek_timestamp_, NewCallback(this, &PipelineImpl::OnFilterStateTransition)); } else if (state_ == kStarting) { - filter->Play(NewCallback(this, &PipelineImpl::OnFilterStateTransition)); + pipeline_filter_->Play( + NewCallback(this,&PipelineImpl::OnFilterStateTransition)); } else if (state_ == kStopping) { - filter->Stop(NewCallback(this, &PipelineImpl::OnFilterStateTransition)); + pipeline_filter_->Stop( + NewCallback(this, &PipelineImpl::OnFilterStateTransition)); } else { - NOTREACHED(); + NOTREACHED() << "Unexpected state: " << state_; } } else if (state_ == kStarted) { FinishInitialization(); @@ -897,7 +901,7 @@ void PipelineImpl::FilterStateTransitionTask() { } else if (IsPipelineStopped()) { FinishDestroyingFiltersTask(); } else { - NOTREACHED(); + NOTREACHED() << "Unexpected state: " << state_; } } @@ -905,24 +909,11 @@ void PipelineImpl::FinishDestroyingFiltersTask() { DCHECK_EQ(MessageLoop::current(), message_loop_); DCHECK(IsPipelineStopped()); - // Stop every running filter thread. - // - // TODO(scherkus): can we watchdog this section to detect wedged threads? - for (FilterThreadVector::iterator iter = filter_threads_.begin(); - iter != filter_threads_.end(); - ++iter) { - (*iter)->Stop(); - } - // Clear renderer references. audio_renderer_ = NULL; video_renderer_ = NULL; - // Reset the pipeline, which will decrement a reference to this object. - // We will get destroyed as soon as the remaining tasks finish executing. - // To be safe, we'll set our pipeline reference to NULL. - filters_.clear(); - STLDeleteElements(&filter_threads_); + pipeline_filter_ = NULL; stop_pending_ = false; tearing_down_ = false; @@ -938,7 +929,7 @@ void PipelineImpl::FinishDestroyingFiltersTask() { } } else { // Destroying filters due to SetError(). - state_ = kError; + set_state(kError); // If our owner has requested to be notified of an error. if (error_callback_.get()) { error_callback_->Run(); @@ -947,28 +938,12 @@ void PipelineImpl::FinishDestroyingFiltersTask() { } bool PipelineImpl::PrepareFilter(scoped_refptr filter) { - DCHECK_EQ(MessageLoop::current(), message_loop_); - DCHECK(IsPipelineOk()); - - // Create a dedicated thread for this filter if applicable. - if (filter->requires_message_loop()) { - scoped_ptr thread( - new base::Thread(filter->message_loop_name())); - if (!thread.get() || !thread->Start()) { - NOTREACHED() << "Could not start filter thread"; - SetError(PIPELINE_ERROR_INITIALIZATION_FAILED); - return false; - } + bool ret = pipeline_init_state_->composite_->AddFilter(filter.get()); - filter->set_message_loop(thread->message_loop()); - filter_threads_.push_back(thread.release()); + if (!ret) { + SetError(PIPELINE_ERROR_INITIALIZATION_FAILED); } - - // Register ourselves as the filter's host. - DCHECK(IsPipelineOk()); - filter->set_host(this); - filters_.push_back(make_scoped_refptr(filter.get())); - return true; + return ret; } void PipelineImpl::InitializeDataSource() { @@ -1145,23 +1120,21 @@ void PipelineImpl::TearDownPipeline() { tearing_down_ = true; if (IsPipelineInitializing()) { - // Notify the client that starting did not complete, if necessary. - FinishInitialization(); - } + // Make it look like initialization was successful. + pipeline_filter_ = pipeline_init_state_->composite_; + pipeline_init_state_.reset(); - remaining_transitions_ = filters_.size(); - if (remaining_transitions_ > 0) { - if (IsPipelineInitializing()) { - state_ = kStopping; - filters_.front()->Stop(NewCallback( - this, &PipelineImpl::OnFilterStateTransition)); - } else { - state_ = kPausing; - filters_.front()->Pause(NewCallback( - this, &PipelineImpl::OnFilterStateTransition)); - } + set_state(kStopping); + pipeline_filter_->Stop(NewCallback( + this, &PipelineImpl::OnFilterStateTransition)); + + FinishInitialization(); + } else if (pipeline_filter_.get()) { + set_state(kPausing); + pipeline_filter_->Pause(NewCallback( + this, &PipelineImpl::OnFilterStateTransition)); } else { - state_ = kStopped; + set_state(kStopped); message_loop_->PostTask(FROM_HERE, NewRunnableMethod(this, &PipelineImpl::FinishDestroyingFiltersTask)); } diff --git a/media/base/pipeline_impl.h b/media/base/pipeline_impl.h index ee1f882..9a217fd 100644 --- a/media/base/pipeline_impl.h +++ b/media/base/pipeline_impl.h @@ -18,6 +18,7 @@ #include "base/threading/thread.h" #include "base/time.h" #include "media/base/clock.h" +#include "media/base/composite_filter.h" #include "media/base/filter_host.h" #include "media/base/pipeline.h" @@ -119,6 +120,9 @@ class PipelineImpl : public Pipeline, public FilterHost { // is used by the constructor, and the Stop() method. void ResetState(); + // Updates |state_|. All state transitions should use this call. + void set_state(State next_state); + // Simple method used to make sure the pipeline is running normally. bool IsPipelineOk(); @@ -353,12 +357,6 @@ class PipelineImpl : public Pipeline, public FilterHost { // Member that tracks the current state. State state_; - // For kPausing, kSeeking and kStarting, we need to track how many filters - // have completed transitioning to the destination state. When - // |remaining_transitions_| reaches 0 the pipeline can transition out - // of the current state. - size_t remaining_transitions_; - // For kSeeking we need to remember where we're seeking between filter // replies. base::TimeDelta seek_timestamp_; @@ -388,20 +386,14 @@ class PipelineImpl : public Pipeline, public FilterHost { scoped_ptr error_callback_; scoped_ptr network_callback_; - // Vector of our filters and map maintaining the relationship between the - // FilterType and the filter itself. - typedef std::vector > FilterVector; - FilterVector filters_; + // Reference to the filter(s) that constitute the pipeline. + scoped_refptr pipeline_filter_; // Renderer references used for setting the volume and determining // when playback has finished. scoped_refptr audio_renderer_; scoped_refptr video_renderer_; - // Vector of threads owned by the pipeline and being used by filters. - typedef std::vector FilterThreadVector; - FilterThreadVector filter_threads_; - // Helper class that stores filter references during pipeline // initialization. class PipelineInitState; diff --git a/media/base/pipeline_impl_unittest.cc b/media/base/pipeline_impl_unittest.cc index 1116582..eff9498 100644 --- a/media/base/pipeline_impl_unittest.cc +++ b/media/base/pipeline_impl_unittest.cc @@ -180,10 +180,17 @@ class PipelineImplTest : public ::testing::Test { } // Sets up expectations to allow the audio renderer to initialize. - void InitializeAudioRenderer() { - EXPECT_CALL(*mocks_->audio_renderer(), - Initialize(mocks_->audio_decoder(), NotNull())) - .WillOnce(Invoke(&RunFilterCallback)); + void InitializeAudioRenderer(bool disable_after_init_callback = false) { + if (disable_after_init_callback) { + EXPECT_CALL(*mocks_->audio_renderer(), + Initialize(mocks_->audio_decoder(), NotNull())) + .WillOnce(DoAll(Invoke(&RunFilterCallback), + DisableAudioRenderer(mocks_->audio_renderer()))); + } else { + EXPECT_CALL(*mocks_->audio_renderer(), + Initialize(mocks_->audio_decoder(), NotNull())) + .WillOnce(Invoke(&RunFilterCallback)); + } EXPECT_CALL(*mocks_->audio_renderer(), SetPlaybackRate(0.0f)); EXPECT_CALL(*mocks_->audio_renderer(), SetVolume(1.0f)); EXPECT_CALL(*mocks_->audio_renderer(), Seek(base::TimeDelta(), NotNull())) @@ -641,6 +648,47 @@ TEST_F(PipelineImplTest, DisableAudioRenderer) { host->NotifyEnded(); } +TEST_F(PipelineImplTest, DisableAudioRendererDuringInit) { + CreateAudioStream(); + CreateVideoStream(); + MockDemuxerStreamVector streams; + streams.push_back(audio_stream()); + streams.push_back(video_stream()); + + InitializeDataSource(); + InitializeDemuxer(&streams, base::TimeDelta()); + InitializeAudioDecoder(audio_stream()); + InitializeAudioRenderer(true); + InitializeVideoDecoder(video_stream()); + InitializeVideoRenderer(); + + EXPECT_CALL(*mocks_->data_source(), + OnAudioRendererDisabled()); + EXPECT_CALL(*mocks_->demuxer(), + OnAudioRendererDisabled()); + EXPECT_CALL(*mocks_->audio_decoder(), + OnAudioRendererDisabled()); + EXPECT_CALL(*mocks_->audio_renderer(), + OnAudioRendererDisabled()); + EXPECT_CALL(*mocks_->video_decoder(), + OnAudioRendererDisabled()); + EXPECT_CALL(*mocks_->video_renderer(), + OnAudioRendererDisabled()); + + InitializePipeline(); + EXPECT_TRUE(pipeline_->IsInitialized()); + EXPECT_EQ(PIPELINE_OK, pipeline_->GetError()); + EXPECT_FALSE(pipeline_->IsRendered(mime_type::kMajorTypeAudio)); + EXPECT_TRUE(pipeline_->IsRendered(mime_type::kMajorTypeVideo)); + + // Verify that ended event is fired when video ends. + EXPECT_CALL(*mocks_->video_renderer(), HasEnded()) + .WillOnce(Return(true)); + EXPECT_CALL(callbacks_, OnEnded()); + FilterHost* host = pipeline_; + host->NotifyEnded(); +} + TEST_F(PipelineImplTest, EndedCallback) { CreateAudioStream(); CreateVideoStream(); -- cgit v1.1