summaryrefslogtreecommitdiffstats
path: root/media
diff options
context:
space:
mode:
authoracolwell@chromium.org <acolwell@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2010-12-29 18:58:23 +0000
committeracolwell@chromium.org <acolwell@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2010-12-29 18:58:23 +0000
commit253274b4e2ffeea590480f461c6a0bdbfa5e4369 (patch)
tree62fe6433ab4e1fd5801cb6db4cb783073aeaab52 /media
parent89d49f114cff9ecd0fabd6a108f3972fb0ab9027 (diff)
downloadchromium_src-253274b4e2ffeea590480f461c6a0bdbfa5e4369.zip
chromium_src-253274b4e2ffeea590480f461c6a0bdbfa5e4369.tar.gz
chromium_src-253274b4e2ffeea590480f461c6a0bdbfa5e4369.tar.bz2
Refactor PipelineImpl to use CompositeFilter to manage Filter state transitions.
BUG=54110 TEST=media_unittests CompositeFilterTest.* Review URL: http://codereview.chromium.org/5744002 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@70267 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'media')
-rw-r--r--media/base/composite_filter.cc603
-rw-r--r--media/base/composite_filter.h165
-rw-r--r--media/base/composite_filter_unittest.cc809
-rw-r--r--media/base/mock_filters.cc19
-rw-r--r--media/base/mock_filters.h46
-rw-r--r--media/base/pipeline.h2
-rw-r--r--media/base/pipeline_impl.cc159
-rw-r--r--media/base/pipeline_impl.h20
-rw-r--r--media/media.gyp3
9 files changed, 1706 insertions, 120 deletions
diff --git a/media/base/composite_filter.cc b/media/base/composite_filter.cc
new file mode 100644
index 0000000..87ff799
--- /dev/null
+++ b/media/base/composite_filter.cc
@@ -0,0 +1,603 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "media/base/composite_filter.h"
+
+#include "base/stl_util-inl.h"
+#include "media/base/callback.h"
+
+namespace media {
+
+class CompositeFilter::FilterHostImpl : public FilterHost {
+ public:
+ FilterHostImpl(CompositeFilter* parent, FilterHost* host);
+
+ FilterHost* host();
+
+ // media::FilterHost methods.
+ virtual void SetError(PipelineError error);
+ virtual base::TimeDelta GetTime() const;
+ virtual base::TimeDelta GetDuration() const;
+ virtual void SetTime(base::TimeDelta time);
+ virtual void SetDuration(base::TimeDelta duration);
+ virtual void SetBufferedTime(base::TimeDelta buffered_time);
+ virtual void SetTotalBytes(int64 total_bytes);
+ virtual void SetBufferedBytes(int64 buffered_bytes);
+ virtual void SetVideoSize(size_t width, size_t height);
+ virtual void SetStreaming(bool streaming);
+ virtual void NotifyEnded();
+ virtual void SetLoaded(bool loaded);
+ virtual void SetNetworkActivity(bool network_activity);
+ virtual void DisableAudioRenderer();
+ virtual void SetCurrentReadPosition(int64 offset);
+ virtual int64 GetCurrentReadPosition();
+
+ private:
+ CompositeFilter* parent_;
+ FilterHost* host_;
+
+ DISALLOW_COPY_AND_ASSIGN(FilterHostImpl);
+};
+
+CompositeFilter::CompositeFilter(MessageLoop* message_loop) {
+ Init(message_loop, NULL);
+}
+
+CompositeFilter::CompositeFilter(MessageLoop* message_loop,
+ ThreadFactoryFunction thread_factory) {
+ DCHECK(thread_factory);
+ Init(message_loop, thread_factory);
+}
+
+void CompositeFilter::Init(MessageLoop* message_loop,
+ ThreadFactoryFunction thread_factory) {
+ DCHECK(message_loop);
+ message_loop_ = message_loop;
+ thread_factory_ = thread_factory;
+
+ if (!thread_factory_) {
+ thread_factory_ = &CompositeFilter::DefaultThreadFactory;
+ }
+
+ state_ = kCreated;
+ sequence_index_ = 0;
+ error_ = PIPELINE_OK;
+}
+
+CompositeFilter::~CompositeFilter() {
+ DCHECK_EQ(message_loop_, MessageLoop::current());
+ DCHECK(state_ == kCreated || state_ == kStopped);
+
+ // Stop every running filter thread.
+ for (FilterThreadVector::iterator iter = filter_threads_.begin();
+ iter != filter_threads_.end();
+ ++iter) {
+ (*iter)->Stop();
+ }
+
+ // Reset the pipeline, which will decrement a reference to this object.
+ // We will get destroyed as soon as the remaining tasks finish executing.
+ // To be safe, we'll set our pipeline reference to NULL.
+ filters_.clear();
+ STLDeleteElements(&filter_threads_);
+}
+
+bool CompositeFilter::AddFilter(scoped_refptr<Filter> filter) {
+ DCHECK_EQ(message_loop_, MessageLoop::current());
+ if (!filter.get() || state_ != kCreated || !host())
+ return false;
+
+ // Create a dedicated thread for this filter if applicable.
+ if (filter->requires_message_loop()) {
+ scoped_ptr<base::Thread> thread(
+ thread_factory_(filter->message_loop_name()));
+
+ if (!thread.get() || !thread->Start()) {
+ return false;
+ }
+
+ filter->set_message_loop(thread->message_loop());
+ filter_threads_.push_back(thread.release());
+ }
+
+ // Register ourselves as the filter's host.
+ filter->set_host(host_impl_.get());
+ filters_.push_back(make_scoped_refptr(filter.get()));
+ return true;
+}
+
+const char* CompositeFilter::major_mime_type() const {
+ return "";
+}
+
+void CompositeFilter::set_host(FilterHost* host) {
+ DCHECK_EQ(message_loop_, MessageLoop::current());
+ DCHECK(host);
+ DCHECK(!host_impl_.get());
+ host_impl_.reset(new FilterHostImpl(this, host));
+}
+
+FilterHost* CompositeFilter::host() {
+ return host_impl_.get() ? host_impl_->host() : NULL;
+}
+
+bool CompositeFilter::requires_message_loop() const {
+ return false;
+}
+
+const char* CompositeFilter::message_loop_name() const {
+ return "CompositeFilter";
+}
+
+void CompositeFilter::set_message_loop(MessageLoop* message_loop) {
+ NOTREACHED() << "Message loop should not be set.";
+}
+
+MessageLoop* CompositeFilter::message_loop() {
+ return NULL;
+}
+
+void CompositeFilter::Play(FilterCallback* play_callback) {
+ DCHECK_EQ(message_loop_, MessageLoop::current());
+ scoped_ptr<FilterCallback> callback(play_callback);
+ if (callback_.get()) {
+ SendErrorToHost(PIPELINE_ERROR_OPERATION_PENDING);
+ callback->Run();
+ return;
+ } else if (state_ == kPlaying) {
+ callback->Run();
+ return;
+ } else if (!host() || (state_ != kPaused && state_ != kCreated)) {
+ SendErrorToHost(PIPELINE_ERROR_INVALID_STATE);
+ callback->Run();
+ return;
+ }
+
+ ChangeState(kPlayPending);
+ callback_.reset(callback.release());
+ StartSerialCallSequence();
+}
+
+void CompositeFilter::Pause(FilterCallback* pause_callback) {
+ DCHECK_EQ(message_loop_, MessageLoop::current());
+ scoped_ptr<FilterCallback> callback(pause_callback);
+ if (callback_.get()) {
+ SendErrorToHost(PIPELINE_ERROR_OPERATION_PENDING);
+ callback->Run();
+ return;
+ } else if (state_ == kPaused) {
+ callback->Run();
+ return;
+ } else if (!host() || state_ != kPlaying) {
+ SendErrorToHost(PIPELINE_ERROR_INVALID_STATE);
+ callback->Run();
+ return;
+ }
+
+ ChangeState(kPausePending);
+ callback_.reset(callback.release());
+ StartSerialCallSequence();
+}
+
+void CompositeFilter::Flush(FilterCallback* flush_callback) {
+ DCHECK_EQ(message_loop_, MessageLoop::current());
+ scoped_ptr<FilterCallback> callback(flush_callback);
+ if (callback_.get()) {
+ SendErrorToHost(PIPELINE_ERROR_OPERATION_PENDING);
+ callback->Run();
+ return;
+ } else if (!host() || (state_ != kCreated && state_ != kPaused)) {
+ SendErrorToHost(PIPELINE_ERROR_INVALID_STATE);
+ callback->Run();
+ return;
+ }
+
+ ChangeState(kFlushPending);
+ callback_.reset(callback.release());
+ StartParallelCallSequence();
+}
+
+void CompositeFilter::Stop(FilterCallback* stop_callback) {
+ DCHECK_EQ(message_loop_, MessageLoop::current());
+ scoped_ptr<FilterCallback> callback(stop_callback);
+ if (!host()) {
+ SendErrorToHost(PIPELINE_ERROR_INVALID_STATE);
+ callback->Run();
+ return;
+ } else if (state_ == kStopped) {
+ callback->Run();
+ return;
+ }
+
+ switch(state_) {
+ case kError:
+ case kCreated:
+ case kPaused:
+ case kPlaying:
+ ChangeState(kStopPending);
+ break;
+ case kPlayPending:
+ ChangeState(kStopWhilePlayPending);
+ break;
+ case kPausePending:
+ ChangeState(kStopWhilePausePending);
+ break;
+ case kFlushPending:
+ ChangeState(kStopWhileFlushPending);
+ break;
+ case kSeekPending:
+ ChangeState(kStopWhileSeekPending);
+ break;
+ default:
+ SendErrorToHost(PIPELINE_ERROR_INVALID_STATE);
+ callback->Run();
+ return;
+ }
+
+ callback_.reset(callback.release());
+ if (state_ == kStopPending) {
+ StartSerialCallSequence();
+ }
+}
+
+void CompositeFilter::SetPlaybackRate(float playback_rate) {
+ DCHECK_EQ(message_loop_, MessageLoop::current());
+ for (FilterVector::iterator iter = filters_.begin();
+ iter != filters_.end();
+ ++iter) {
+ (*iter)->SetPlaybackRate(playback_rate);
+ }
+}
+
+void CompositeFilter::Seek(base::TimeDelta time,
+ FilterCallback* seek_callback) {
+ DCHECK_EQ(message_loop_, MessageLoop::current());
+ scoped_ptr<FilterCallback> callback(seek_callback);
+ if (callback_.get()) {
+ SendErrorToHost(PIPELINE_ERROR_OPERATION_PENDING);
+ callback->Run();
+ return;
+ } else if (!host() || (state_ != kPaused && state_ != kCreated)) {
+ SendErrorToHost(PIPELINE_ERROR_INVALID_STATE);
+ callback->Run();
+ return;
+ }
+
+ ChangeState(kSeekPending);
+ callback_.reset(callback.release());
+ pending_seek_time_ = time;
+ StartSerialCallSequence();
+}
+
+void CompositeFilter::OnAudioRendererDisabled() {
+ DCHECK_EQ(message_loop_, MessageLoop::current());
+ for (FilterVector::iterator iter = filters_.begin();
+ iter != filters_.end();
+ ++iter) {
+ (*iter)->OnAudioRendererDisabled();
+ }
+}
+
+base::Thread* CompositeFilter::DefaultThreadFactory(
+ const char* thread_name) {
+ return new base::Thread(thread_name);
+}
+
+void CompositeFilter::ChangeState(State new_state) {
+ DCHECK_EQ(message_loop_, MessageLoop::current());
+ state_ = new_state;
+}
+
+void CompositeFilter::StartSerialCallSequence() {
+ DCHECK_EQ(message_loop_, MessageLoop::current());
+ error_ = PIPELINE_OK;
+
+ if (filters_.size() > 0) {
+ sequence_index_ = 0;
+ CallFilter(filters_[sequence_index_],
+ NewThreadSafeCallback(&CompositeFilter::SerialCallback));
+ } else {
+ sequence_index_ = 0;
+ SerialCallback();
+ }
+}
+
+void CompositeFilter::StartParallelCallSequence() {
+ DCHECK_EQ(message_loop_, MessageLoop::current());
+ error_ = PIPELINE_OK;
+
+ if (filters_.size() > 0) {
+ sequence_index_ = 0;
+ for (size_t i = 0; i < filters_.size(); i++) {
+ CallFilter(filters_[i],
+ NewThreadSafeCallback(&CompositeFilter::ParallelCallback));
+ }
+ } else {
+ sequence_index_ = 0;
+ ParallelCallback();
+ }
+}
+
+void CompositeFilter::CallFilter(scoped_refptr<Filter>& filter,
+ FilterCallback* callback) {
+ switch(state_) {
+ case kPlayPending:
+ filter->Play(callback);
+ break;
+ case kPausePending:
+ filter->Pause(callback);
+ break;
+ case kFlushPending:
+ filter->Flush(callback);
+ break;
+ case kStopPending:
+ filter->Stop(callback);
+ break;
+ case kSeekPending:
+ filter->Seek(pending_seek_time_, callback);
+ break;
+ default:
+ delete callback;
+ ChangeState(kError);
+ HandleError(PIPELINE_ERROR_INVALID_STATE);
+ }
+}
+
+void CompositeFilter::DispatchPendingCallback() {
+ if (callback_.get()) {
+ scoped_ptr<FilterCallback> callback(callback_.release());
+ callback->Run();
+ }
+}
+
+CompositeFilter::State CompositeFilter::GetNextState(State state) const {
+ State ret = kInvalid;
+ switch (state) {
+ case kPlayPending:
+ ret = kPlaying;
+ break;
+ case kPausePending:
+ ret = kPaused;
+ case kFlushPending:
+ ret = kPaused;
+ break;
+ case kStopPending:
+ ret = kStopped;
+ break;
+ case kSeekPending:
+ ret = kPaused;
+ break;
+ case kStopWhilePlayPending:
+ case kStopWhilePausePending:
+ case kStopWhileFlushPending:
+ case kStopWhileSeekPending:
+ ret = kStopPending;
+ break;
+
+ case kInvalid:
+ case kCreated:
+ case kPlaying:
+ case kPaused:
+ case kStopped:
+ case kError:
+ ret = kInvalid;
+ break;
+
+ // default: intentionally left out to catch missing states.
+ }
+
+ return ret;
+}
+
+void CompositeFilter::SerialCallback() {
+ DCHECK_EQ(message_loop_, MessageLoop::current());
+ if (error_ != PIPELINE_OK) {
+ // We encountered an error. Terminate the sequence now.
+ ChangeState(kError);
+ HandleError(error_);
+ return;
+ }
+
+ if (filters_.size() > 0)
+ sequence_index_++;
+
+ if (sequence_index_ == filters_.size()) {
+ // All filters have been successfully called without error.
+ OnCallSequenceDone();
+ } else if (GetNextState(state_) == kStopPending) {
+ // Abort sequence early and start issuing Stop() calls.
+ ChangeState(kStopPending);
+ StartSerialCallSequence();
+ } else {
+ // We aren't done with the sequence. Call the next filter.
+ CallFilter(filters_[sequence_index_],
+ NewThreadSafeCallback(&CompositeFilter::SerialCallback));
+ }
+}
+
+void CompositeFilter::ParallelCallback() {
+ DCHECK_EQ(message_loop_, MessageLoop::current());
+
+ if (filters_.size() > 0)
+ sequence_index_++;
+
+ if (sequence_index_ == filters_.size()) {
+ if (error_ != PIPELINE_OK) {
+ // We encountered an error.
+ ChangeState(kError);
+ HandleError(error_);
+ return;
+ }
+
+ OnCallSequenceDone();
+ }
+}
+
+void CompositeFilter::OnCallSequenceDone() {
+ State next_state = GetNextState(state_);
+
+ if (next_state == kInvalid) {
+ // We somehow got into an unexpected state.
+ ChangeState(kError);
+ HandleError(PIPELINE_ERROR_INVALID_STATE);
+ }
+
+ ChangeState(next_state);
+
+ if (state_ == kStopPending) {
+ // Handle a deferred Stop().
+ StartSerialCallSequence();
+ } else {
+ // Call the callback to indicate that the operation has completed.
+ DispatchPendingCallback();
+ }
+}
+
+void CompositeFilter::SendErrorToHost(PipelineError error) {
+ if (host_impl_.get())
+ host_impl_.get()->host()->SetError(error);
+}
+
+void CompositeFilter::HandleError(PipelineError error) {
+ if (error != PIPELINE_OK) {
+ SendErrorToHost(error);
+ }
+
+ DispatchPendingCallback();
+}
+
+FilterCallback* CompositeFilter::NewThreadSafeCallback(
+ void (CompositeFilter::*method)()) {
+ return TaskToCallbackAdapter::NewCallback(
+ NewRunnableMethod(this,
+ &CompositeFilter::OnCallback,
+ message_loop_,
+ method));
+}
+
+void CompositeFilter::OnCallback(MessageLoop* message_loop,
+ void (CompositeFilter::*method)()) {
+ if (MessageLoop::current() != message_loop) {
+ // Posting callback to the proper thread.
+ message_loop->PostTask(FROM_HERE, NewRunnableMethod(this, method));
+ return;
+ }
+
+ (this->*method)();
+}
+
+bool CompositeFilter::CanForwardError() {
+ return (state_ == kCreated) || (state_ == kPlaying) || (state_ == kPaused);
+}
+
+void CompositeFilter::SetError(PipelineError error) {
+ // TODO(acolwell): Temporary hack to handle errors that occur
+ // during filter initialization. In this case we just forward
+ // the error to the host even if it is on the wrong thread. We
+ // have to do this because if we defer the call, we can't be
+ // sure the host will get the error before the "init done" callback
+ // is executed. This will be cleaned up when filter init is refactored.
+ if (state_ == kCreated) {
+ SendErrorToHost(error);
+ return;
+ }
+
+ if (message_loop_ != MessageLoop::current()) {
+ message_loop_->PostTask(FROM_HERE,
+ NewRunnableMethod(this, &CompositeFilter::SetError, error));
+ return;
+ }
+
+ DCHECK_EQ(message_loop_, MessageLoop::current());
+
+ // Drop errors recieved while stopping or stopped.
+ // This shields the owner of this object from having
+ // to deal with errors it can't do anything about.
+ if (state_ == kStopPending || state_ == kStopped)
+ return;
+
+ error_ = error;
+ if (CanForwardError())
+ SendErrorToHost(error);
+}
+
+CompositeFilter::FilterHostImpl::FilterHostImpl(CompositeFilter* parent,
+ FilterHost* host) :
+ parent_(parent),
+ host_(host) {
+}
+
+FilterHost* CompositeFilter::FilterHostImpl::host() {
+ return host_;
+}
+
+// media::FilterHost methods.
+void CompositeFilter::FilterHostImpl::SetError(PipelineError error) {
+ parent_->SetError(error);
+}
+
+base::TimeDelta CompositeFilter::FilterHostImpl::GetTime() const {
+ return host_->GetTime();
+}
+
+base::TimeDelta CompositeFilter::FilterHostImpl::GetDuration() const {
+ return host_->GetDuration();
+}
+
+void CompositeFilter::FilterHostImpl::SetTime(base::TimeDelta time) {
+ host_->SetTime(time);
+}
+
+void CompositeFilter::FilterHostImpl::SetDuration(base::TimeDelta duration) {
+ host_->SetDuration(duration);
+}
+
+void CompositeFilter::FilterHostImpl::SetBufferedTime(
+ base::TimeDelta buffered_time) {
+ host_->SetBufferedTime(buffered_time);
+}
+
+void CompositeFilter::FilterHostImpl::SetTotalBytes(int64 total_bytes) {
+ host_->SetTotalBytes(total_bytes);
+}
+
+void CompositeFilter::FilterHostImpl::SetBufferedBytes(int64 buffered_bytes) {
+ host_->SetBufferedBytes(buffered_bytes);
+}
+
+void CompositeFilter::FilterHostImpl::SetVideoSize(size_t width,
+ size_t height) {
+ host_->SetVideoSize(width, height);
+}
+
+void CompositeFilter::FilterHostImpl::SetStreaming(bool streaming) {
+ host_->SetStreaming(streaming);
+}
+
+void CompositeFilter::FilterHostImpl::NotifyEnded() {
+ host_->NotifyEnded();
+}
+
+void CompositeFilter::FilterHostImpl::SetLoaded(bool loaded) {
+ host_->SetLoaded(loaded);
+}
+
+void CompositeFilter::FilterHostImpl::SetNetworkActivity(
+ bool network_activity) {
+ host_->SetNetworkActivity(network_activity);
+}
+
+void CompositeFilter::FilterHostImpl::DisableAudioRenderer() {
+ host_->DisableAudioRenderer();
+}
+
+void CompositeFilter::FilterHostImpl::SetCurrentReadPosition(int64 offset) {
+ host_->SetCurrentReadPosition(offset);
+}
+
+int64 CompositeFilter::FilterHostImpl::GetCurrentReadPosition() {
+ return host_->GetCurrentReadPosition();
+}
+
+} // namespace media
diff --git a/media/base/composite_filter.h b/media/base/composite_filter.h
new file mode 100644
index 0000000..f8bcb12
--- /dev/null
+++ b/media/base/composite_filter.h
@@ -0,0 +1,165 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef MEDIA_BASE_COMPOSITE_FILTER_H_
+#define MEDIA_BASE_COMPOSITE_FILTER_H_
+
+#include "base/thread.h"
+#include "media/base/filter_host.h"
+#include "media/base/filters.h"
+
+namespace media {
+
+class CompositeFilter : public Filter {
+ public:
+ typedef base::Thread* (*ThreadFactoryFunction)(const char* thread_name);
+
+ CompositeFilter(MessageLoop* message_loop);
+
+ // Constructor that allows the default thread creation strategy to be
+ // overridden.
+ CompositeFilter(MessageLoop* message_loop,
+ ThreadFactoryFunction thread_factory);
+
+ // Adds a filter to the composite. This is only allowed after set_host()
+ // is called and before the first state changing operation such as Play(),
+ // Flush(), Stop(), or Seek(). True is returned if the filter was successfully
+ // added to the composite. False is returned if the filter couldn't be added
+ // because the composite is in the wrong state or the filter needed a thread
+ // and the composite was unable to create one.
+ bool AddFilter(scoped_refptr<Filter> filter);
+
+ // media::Filter methods.
+ virtual const char* major_mime_type() const;
+ virtual void set_host(FilterHost* host);
+ virtual FilterHost* host();
+ virtual bool requires_message_loop() const;
+ virtual const char* message_loop_name() const;
+ virtual void set_message_loop(MessageLoop* message_loop);
+ virtual MessageLoop* message_loop();
+ virtual void Play(FilterCallback* play_callback);
+ virtual void Pause(FilterCallback* pause_callback);
+ virtual void Flush(FilterCallback* flush_callback);
+ virtual void Stop(FilterCallback* stop_callback);
+ virtual void SetPlaybackRate(float playback_rate);
+ virtual void Seek(base::TimeDelta time, FilterCallback* seek_callback);
+ virtual void OnAudioRendererDisabled();
+
+ protected:
+ virtual ~CompositeFilter();
+
+ /// Default thread factory strategy.
+ static base::Thread* DefaultThreadFactory(const char* thread_name);
+
+ void SetError(PipelineError error);
+
+ private:
+ class FilterHostImpl;
+
+ enum State {
+ kInvalid,
+ kCreated,
+ kPaused,
+ kPlayPending,
+ kStopWhilePlayPending,
+ kPlaying,
+ kPausePending,
+ kStopWhilePausePending,
+ kFlushPending,
+ kStopWhileFlushPending,
+ kSeekPending,
+ kStopWhileSeekPending,
+ kStopPending,
+ kStopped,
+ kError
+ };
+
+ // Initialization method called by constructors.
+ void Init(MessageLoop* message_loop, ThreadFactoryFunction thread_factory);
+
+ // Transition to a new state.
+ void ChangeState(State new_state);
+
+ // Start calling filters in a sequence.
+ void StartSerialCallSequence();
+
+ // Call filters in parallel.
+ void StartParallelCallSequence();
+
+ // Call the filter based on the current value of state_.
+ void CallFilter(scoped_refptr<Filter>& filter, FilterCallback* callback);
+
+ // Calls |callback_| and then clears the reference.
+ void DispatchPendingCallback();
+
+ // Gets the state to transition to given |state|.
+ State GetNextState(State state) const;
+
+ // Filter callback for a serial sequence.
+ void SerialCallback();
+
+ // Filter callback for a parallel sequence.
+ void ParallelCallback();
+
+ // Called when a parallel or serial call sequence completes.
+ void OnCallSequenceDone();
+
+ // Helper function for sending an error to the FilterHost.
+ void SendErrorToHost(PipelineError error);
+
+ // Helper function for handling errors during call sequences.
+ void HandleError(PipelineError error);
+
+ // Creates a callback that can be called from any thread, but is guaranteed
+ // to call the specified method on the thread associated with this filter.
+ FilterCallback* NewThreadSafeCallback(void (CompositeFilter::*method)());
+
+ // Helper function used by NewThreadSafeCallback() to make sure the
+ // method gets called on the right thread.
+ void OnCallback(MessageLoop* message_loop,
+ void (CompositeFilter::*method)());
+
+ // Helper function that indicates whether SetError() calls can be forwarded
+ // to the host of this filter.
+ bool CanForwardError();
+
+ // Vector of threads owned by the composite and used by filters in |filters_|.
+ typedef std::vector<base::Thread*> FilterThreadVector;
+ FilterThreadVector filter_threads_;
+
+ // Vector of the filters added to the composite.
+ typedef std::vector<scoped_refptr<Filter> > FilterVector;
+ FilterVector filters_;
+
+ // Factory function used to create filter threads.
+ ThreadFactoryFunction thread_factory_;
+
+ // Callback for the pending request.
+ scoped_ptr<FilterCallback> callback_;
+
+ // Time parameter for the pending Seek() request.
+ base::TimeDelta pending_seek_time_;
+
+ // Current state of this filter.
+ State state_;
+
+ // The index of the filter currently processing a request.
+ unsigned int sequence_index_;
+
+ // Message loop passed into the constructor.
+ MessageLoop* message_loop_;
+
+ // FilterHost implementation passed to Filters owned by this
+ // object.
+ scoped_ptr<FilterHostImpl> host_impl_;
+
+ // Error passed in the last SetError() call.
+ PipelineError error_;
+
+ DISALLOW_COPY_AND_ASSIGN(CompositeFilter);
+};
+
+} // namespace media
+
+#endif // MEDIA_BASE_COMPOSITE_FILTER_H_
diff --git a/media/base/composite_filter_unittest.cc b/media/base/composite_filter_unittest.cc
new file mode 100644
index 0000000..5a529f5
--- /dev/null
+++ b/media/base/composite_filter_unittest.cc
@@ -0,0 +1,809 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "media/base/composite_filter.h"
+#include "media/base/mock_filter_host.h"
+#include "media/base/mock_filters.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+using ::testing::_;
+using ::testing::InSequence;
+using ::testing::Return;
+using ::testing::SaveArg;
+using ::testing::StrictMock;
+
+namespace media {
+
+class CompositeFilterTest : public testing::Test {
+ public:
+ CompositeFilterTest();
+ virtual ~CompositeFilterTest();
+
+ // Sets up a new CompositeFilter in |composite_|, creates |filter_1_| and
+ // |filter_2_|, and adds these filters to |composite_|.
+ void SetupAndAdd2Filters();
+
+ // Helper enum that indicates what filter method to call.
+ enum MethodToCall {
+ PLAY,
+ PAUSE,
+ FLUSH,
+ STOP,
+ SEEK,
+ };
+
+ // Helper method that adds a filter method call expectation based on the value
+ // of |method_to_call|.
+ //
+ // |method_to_call| - Indicates which method we expect a call for.
+ // |filter| - The MockFilter to add the expectation to.
+ // |seek_time| - The time to pass to the Seek() call if |method_to_call|
+ // equals SEEK.
+ void ExpectFilterCall(MethodToCall method_to_call, MockFilter* filter,
+ base::TimeDelta seek_time);
+
+ // Helper method that calls a filter method based on the value of
+ // |method_to_call|.
+ //
+ // |method_to_call| - Indicates which method to call.
+ // |filter| - The Filter to make the method call on.
+ // |seek_time| - The time to pass to the Seek() call if |method_to_call|
+ // equals SEEK.
+ // |callback| - The callback object to pass to the method.
+ void DoFilterCall(MethodToCall method_to_call, Filter* filter,
+ base::TimeDelta seek_time,
+ FilterCallback* callback);
+
+ // Creates an expectation sequence based on the value of method_to_call.
+ //
+ // |method_to_call| - Indicates which method we want a success sequence for.
+ // |seek_time| - The time to pass in the Seek() call if |method_to_call|
+ // equals SEEK.
+ void ExpectSuccess(MethodToCall method_to_call,
+ base::TimeDelta seek_time = base::TimeDelta());
+
+ // Issue a Play(), Pause(), Flush(), Stop(), or Seek() on the composite and
+ // verify all the expected calls on the filters.
+ void DoPlay();
+ void DoPause();
+ void DoFlush();
+ void DoStop();
+ void DoSeek(base::TimeDelta time);
+
+ // Issue a Play(), Pause(), Flush(), or Seek() and expect the calls to fail
+ // with a PIPELINE_ERROR_INVALID_STATE error.
+ //
+ // |method_to_call| - Indicates whick method to call.
+ // |seek_time| - The time to pass to the Seek() call if |method_to_call|
+ // equals SEEK.
+ void ExpectInvalidStateFail(MethodToCall method_to_call,
+ base::TimeDelta seek_time = base::TimeDelta());
+
+ // Run the callback stored in |filter_1_callback_|.
+ void RunFilter1Callback();
+
+ // Run the callback stored in |filter_2_callback_|.
+ void RunFilter2Callback();
+
+ protected:
+ MessageLoop message_loop_;
+
+ // The composite object being tested.
+ scoped_refptr<CompositeFilter> composite_;
+
+ // First filter added to the composite.
+ scoped_refptr<StrictMock<MockFilter> > filter_1_;
+
+ // Callback passed to |filter_1_| during last Play(), Pause(), Flush(),
+ // Stop(), or Seek() call.
+ FilterCallback* filter_1_callback_;
+
+ // Second filter added to the composite.
+ scoped_refptr<StrictMock<MockFilter> > filter_2_;
+
+ // Callback passed to |filter_2_| during last Play(), Pause(), Flush(),
+ // Stop(), or Seek() call.
+ FilterCallback* filter_2_callback_;
+
+ // FilterHost implementation passed to |composite_| via set_host().
+ scoped_ptr<StrictMock<MockFilterHost> > mock_filter_host_;
+
+ DISALLOW_COPY_AND_ASSIGN(CompositeFilterTest);
+};
+
+CompositeFilterTest::CompositeFilterTest() :
+ composite_(new CompositeFilter(&message_loop_)),
+ filter_1_callback_(NULL),
+ filter_2_callback_(NULL),
+ mock_filter_host_(new StrictMock<MockFilterHost>()) {
+}
+
+CompositeFilterTest::~CompositeFilterTest() {}
+
+void CompositeFilterTest::SetupAndAdd2Filters() {
+ mock_filter_host_.reset(new StrictMock<MockFilterHost>());
+ composite_ = new CompositeFilter(&message_loop_);
+ composite_->set_host(mock_filter_host_.get());
+
+ // Setup |filter_1_| and arrange for methods to set
+ // |filter_1_callback_| when they are called.
+ filter_1_ = new StrictMock<MockFilter>();
+ filter_1_callback_ = NULL;
+ ON_CALL(*filter_1_, Play(_))
+ .WillByDefault(SaveArg<0>(&filter_1_callback_));
+ ON_CALL(*filter_1_, Pause(_))
+ .WillByDefault(SaveArg<0>(&filter_1_callback_));
+ ON_CALL(*filter_1_, Flush(_))
+ .WillByDefault(SaveArg<0>(&filter_1_callback_));
+ ON_CALL(*filter_1_, Stop(_))
+ .WillByDefault(SaveArg<0>(&filter_1_callback_));
+ ON_CALL(*filter_1_, Seek(_,_))
+ .WillByDefault(SaveArg<1>(&filter_1_callback_));
+
+ // Setup |filter_2_| and arrange for methods to set
+ // |filter_2_callback_| when they are called.
+ filter_2_ = new StrictMock<MockFilter>();
+ filter_2_callback_ = NULL;
+ ON_CALL(*filter_2_, Play(_))
+ .WillByDefault(SaveArg<0>(&filter_2_callback_));
+ ON_CALL(*filter_2_, Pause(_))
+ .WillByDefault(SaveArg<0>(&filter_2_callback_));
+ ON_CALL(*filter_2_, Flush(_))
+ .WillByDefault(SaveArg<0>(&filter_2_callback_));
+ ON_CALL(*filter_2_, Stop(_))
+ .WillByDefault(SaveArg<0>(&filter_2_callback_));
+ ON_CALL(*filter_2_, Seek(_,_))
+ .WillByDefault(SaveArg<1>(&filter_2_callback_));
+
+ composite_->AddFilter(filter_1_);
+ composite_->AddFilter(filter_2_);
+}
+
+void CompositeFilterTest::ExpectFilterCall(MethodToCall method_to_call,
+ MockFilter* filter,
+ base::TimeDelta seek_time) {
+ switch(method_to_call) {
+ case PLAY:
+ EXPECT_CALL(*filter, Play(_));
+ break;
+ case PAUSE:
+ EXPECT_CALL(*filter, Pause(_));
+ break;
+ case FLUSH:
+ EXPECT_CALL(*filter, Flush(_));
+ break;
+ case STOP:
+ EXPECT_CALL(*filter, Stop(_));
+ break;
+ case SEEK:
+ EXPECT_CALL(*filter, Seek(seek_time, _));
+ break;
+ };
+}
+
+void CompositeFilterTest::DoFilterCall(MethodToCall method_to_call,
+ Filter* filter,
+ base::TimeDelta seek_time,
+ FilterCallback* callback) {
+ switch(method_to_call) {
+ case PLAY:
+ filter->Play(callback);
+ break;
+ case PAUSE:
+ filter->Pause(callback);
+ break;
+ case FLUSH:
+ filter->Flush(callback);
+ break;
+ case STOP:
+ filter->Stop(callback);
+ break;
+ case SEEK:
+ filter->Seek(seek_time, callback);
+ break;
+ };
+}
+
+void CompositeFilterTest::ExpectSuccess(MethodToCall method_to_call,
+ base::TimeDelta seek_time) {
+ InSequence seq;
+
+ scoped_ptr<StrictMock<MockFilterCallback> > mock_callback(
+ new StrictMock<MockFilterCallback>(false));
+
+ bool is_parallel_call = (method_to_call == FLUSH);
+
+ ExpectFilterCall(method_to_call, filter_1_.get(), seek_time);
+
+ if (is_parallel_call) {
+ ExpectFilterCall(method_to_call, filter_2_.get(), seek_time);
+ }
+
+ // Make method call on the composite.
+ DoFilterCall(method_to_call, composite_.get(), seek_time,
+ mock_callback->NewCallback());
+
+ if (is_parallel_call) {
+ // Make sure both filters have their callbacks set.
+ EXPECT_TRUE(filter_1_callback_ != NULL);
+ EXPECT_TRUE(filter_2_callback_ != NULL);
+
+ RunFilter1Callback();
+ } else {
+ // Make sure that only |filter_1_| has its callback set.
+ EXPECT_TRUE(filter_1_callback_ != NULL);
+ EXPECT_EQ((FilterCallback*)NULL, filter_2_callback_);
+
+ ExpectFilterCall(method_to_call, filter_2_.get(), seek_time);
+
+ RunFilter1Callback();
+
+ // Verify that |filter_2_| was called by checking the callback pointer.
+ EXPECT_TRUE(filter_2_callback_ != NULL);
+ }
+
+ EXPECT_CALL(*mock_callback, OnFilterCallback());
+
+ RunFilter2Callback();
+}
+
+void CompositeFilterTest::DoPlay() {
+ ExpectSuccess(PLAY);
+}
+
+void CompositeFilterTest::DoPause() {
+ ExpectSuccess(PAUSE);
+}
+
+void CompositeFilterTest::DoFlush() {
+ ExpectSuccess(FLUSH);
+}
+
+void CompositeFilterTest::DoStop() {
+ ExpectSuccess(STOP);
+}
+
+void CompositeFilterTest::DoSeek(base::TimeDelta time) {
+ ExpectSuccess(SEEK, time);
+}
+
+void CompositeFilterTest::ExpectInvalidStateFail(MethodToCall method_to_call,
+ base::TimeDelta seek_time) {
+ InSequence seq;
+ scoped_ptr<StrictMock<MockFilterCallback> > mock_callback(
+ new StrictMock<MockFilterCallback>(false));
+
+ EXPECT_CALL(*mock_filter_host_, SetError(PIPELINE_ERROR_INVALID_STATE))
+ .WillOnce(Return());
+ EXPECT_CALL(*mock_callback, OnFilterCallback());
+
+ DoFilterCall(method_to_call, composite_, seek_time,
+ mock_callback->NewCallback());
+
+ // Make sure that neither of the filters were called by
+ // verifying that the callback pointers weren't set.
+ EXPECT_EQ((FilterCallback*)NULL, filter_1_callback_);
+ EXPECT_EQ((FilterCallback*)NULL, filter_2_callback_);
+}
+
+void CompositeFilterTest::RunFilter1Callback() {
+ EXPECT_TRUE(filter_1_callback_ != NULL);
+ FilterCallback* callback = filter_1_callback_;
+ filter_1_callback_ = NULL;
+ callback->Run();
+ delete callback;
+}
+
+void CompositeFilterTest::RunFilter2Callback() {
+ EXPECT_TRUE(filter_2_callback_ != NULL);
+ FilterCallback* callback = filter_2_callback_;
+ filter_2_callback_ = NULL;
+ callback->Run();
+ delete callback;
+}
+
+static base::Thread* NullThreadFactory(const char* thread_name) {
+ return NULL;
+}
+
+// Test AddFilter() failure cases.
+TEST_F(CompositeFilterTest, TestAddFilterFailCases) {
+ // Test adding a null pointer.
+ EXPECT_FALSE(composite_->AddFilter(NULL));
+
+ scoped_refptr<StrictMock<MockFilter> > filter =
+ new StrictMock<MockFilter>(true);
+ EXPECT_EQ(NULL, filter->host());
+ EXPECT_EQ(NULL, filter->message_loop());
+
+ // Test failing because set_host() hasn't been called yet.
+ EXPECT_FALSE(composite_->AddFilter(filter));
+
+ // Test thread creation failure.
+ composite_ = new CompositeFilter(&message_loop_, &NullThreadFactory);
+ composite_->set_host(mock_filter_host_.get());
+ EXPECT_FALSE(composite_->AddFilter(filter));
+ EXPECT_EQ(NULL, filter->host());
+ EXPECT_EQ(NULL, filter->message_loop());
+}
+
+// Test successful AddFilter() cases.
+TEST_F(CompositeFilterTest, TestAddFilter) {
+ composite_->set_host(mock_filter_host_.get());
+
+ // Add a filter that doesn't require a message loop.
+ scoped_refptr<StrictMock<MockFilter> > filter = new StrictMock<MockFilter>();
+ EXPECT_EQ(NULL, filter->host());
+ EXPECT_EQ(NULL, filter->message_loop());
+
+ EXPECT_TRUE(composite_->AddFilter(filter));
+
+ EXPECT_TRUE(filter->host() != NULL);
+ EXPECT_EQ(NULL, filter->message_loop());
+
+ // Add a filter that requires a message loop.
+ scoped_refptr<StrictMock<MockFilter> > filter_2 =
+ new StrictMock<MockFilter>(true);
+
+ EXPECT_EQ(NULL, filter_2->host());
+ EXPECT_EQ(NULL, filter_2->message_loop());
+
+ EXPECT_TRUE(composite_->AddFilter(filter_2));
+
+ EXPECT_TRUE(filter_2->host() != NULL);
+ EXPECT_TRUE(filter_2->message_loop() != NULL);
+}
+
+static bool g_thread_cleanup_called_ = false;
+class CompositeFilterThread : public base::Thread {
+ public:
+ CompositeFilterThread(const char* name) : base::Thread(name) {}
+ virtual void CleanUp() {
+ g_thread_cleanup_called_ = true;
+ base::Thread::CleanUp();
+ }
+};
+
+TEST_F(CompositeFilterTest, TestPlay) {
+ InSequence sequence;
+
+ SetupAndAdd2Filters();
+
+ // Verify successful call to Play().
+ DoPlay();
+
+ // At this point we are now in the kPlaying state.
+ scoped_ptr<StrictMock<MockFilterCallback> > mock_callback(
+ new StrictMock<MockFilterCallback>(false));
+
+ // Try calling Play() again to make sure that we simply get a callback.
+ // We are already in the Play() state so there is no point calling the
+ // filters.
+ EXPECT_CALL(*mock_callback, OnFilterCallback());
+
+ composite_->Play(mock_callback->NewCallback());
+
+ // Verify that neither of the filter callbacks were set.
+ EXPECT_EQ((FilterCallback*)NULL, filter_1_callback_);
+ EXPECT_EQ((FilterCallback*)NULL, filter_2_callback_);
+
+ // Stop playback.
+ DoStop();
+
+ // At this point we should be in the kStopped state.
+
+ // Try calling Stop() again to make sure neither filter is called.
+ EXPECT_CALL(*mock_callback, OnFilterCallback());
+
+ composite_->Stop(mock_callback->NewCallback());
+
+ // Verify that neither of the filter callbacks were set.
+ EXPECT_EQ((FilterCallback*)NULL, filter_1_callback_);
+ EXPECT_EQ((FilterCallback*)NULL, filter_2_callback_);
+
+ // Try calling Play() again to make sure we get an error.
+ ExpectInvalidStateFail(PLAY);
+}
+
+// Test errors in the middle of a serial call sequence like Play().
+TEST_F(CompositeFilterTest, TestPlayErrors) {
+ InSequence sequence;
+
+ SetupAndAdd2Filters();
+
+ scoped_ptr<StrictMock<MockFilterCallback> > mock_callback(
+ new StrictMock<MockFilterCallback>(false));
+
+ EXPECT_CALL(*filter_1_, Play(_));
+
+ // Call Play() on the composite.
+ composite_->Play(mock_callback->NewCallback());
+
+ EXPECT_CALL(*filter_2_, Play(_));
+
+ // Run callback to indicate that |filter_1_|'s Play() has completed.
+ RunFilter1Callback();
+
+ // At this point Play() has been called on |filter_2_|. Simulate an
+ // error by calling SetError() on its FilterHost interface.
+ filter_2_->host()->SetError(PIPELINE_ERROR_OUT_OF_MEMORY);
+
+ // Expect error to be reported and "play done" callback to be called.
+ EXPECT_CALL(*mock_filter_host_, SetError(PIPELINE_ERROR_OUT_OF_MEMORY));
+ EXPECT_CALL(*mock_callback, OnFilterCallback());
+
+ // Run callback to indicate that |filter_2_|'s Play() has completed.
+ RunFilter2Callback();
+
+ // Verify that Play/Pause/Flush/Seek fail now that an error occured.
+ ExpectInvalidStateFail(PLAY);
+ ExpectInvalidStateFail(PAUSE);
+ ExpectInvalidStateFail(FLUSH);
+ ExpectInvalidStateFail(SEEK);
+
+ // Make sure you can still Stop().
+ DoStop();
+}
+
+TEST_F(CompositeFilterTest, TestPause) {
+ InSequence sequence;
+
+ SetupAndAdd2Filters();
+
+ scoped_ptr<StrictMock<MockFilterCallback> > mock_callback(
+ new StrictMock<MockFilterCallback>(false));
+
+ // Try calling Pause() to make sure we get an error because we aren't in
+ // the playing state.
+ ExpectInvalidStateFail(PAUSE);
+
+ // Transition to playing state.
+ DoPlay();
+
+ // Issue a successful Pause().
+ DoPause();
+
+ // At this point we are paused.
+
+ // Try calling Pause() again to make sure that the filters aren't called
+ // because we are already in the paused state.
+ EXPECT_CALL(*mock_callback, OnFilterCallback());
+
+ composite_->Pause(mock_callback->NewCallback());
+
+ // Verify that neither of the filter callbacks were set.
+ EXPECT_EQ((FilterCallback*)NULL, filter_1_callback_);
+ EXPECT_EQ((FilterCallback*)NULL, filter_2_callback_);
+
+ // Verify that we can transition pack to the play state.
+ DoPlay();
+
+ // Go back to the pause state.
+ DoPause();
+
+ // Transition to the stop state.
+ DoStop();
+
+ // Try calling Pause() to make sure we get an error because we aren't in
+ // the playing state.
+ ExpectInvalidStateFail(PAUSE);
+}
+
+// Test errors in the middle of a serial call sequence like Pause().
+TEST_F(CompositeFilterTest, TestPauseErrors) {
+ InSequence sequence;
+
+ SetupAndAdd2Filters();
+
+ DoPlay();
+
+ scoped_ptr<StrictMock<MockFilterCallback> > mock_callback(
+ new StrictMock<MockFilterCallback>(false));
+
+ EXPECT_CALL(*filter_1_, Pause(_));
+
+ // Call Pause() on the composite.
+ composite_->Pause(mock_callback->NewCallback());
+
+ // Simulate an error by calling SetError() on |filter_1_|'s FilterHost
+ // interface.
+ filter_1_->host()->SetError(PIPELINE_ERROR_OUT_OF_MEMORY);
+
+ // Expect error to be reported and "pause done" callback to be called.
+ EXPECT_CALL(*mock_filter_host_, SetError(PIPELINE_ERROR_OUT_OF_MEMORY));
+ EXPECT_CALL(*mock_callback, OnFilterCallback());
+
+ RunFilter1Callback();
+
+ // Make sure |filter_2_callback_| was not set.
+ EXPECT_EQ((FilterCallback*)NULL, filter_2_callback_);
+
+ // Verify that Play/Pause/Flush/Seek fail now that an error occured.
+ ExpectInvalidStateFail(PLAY);
+ ExpectInvalidStateFail(PAUSE);
+ ExpectInvalidStateFail(FLUSH);
+ ExpectInvalidStateFail(SEEK);
+
+ // Make sure you can still Stop().
+ DoStop();
+}
+
+TEST_F(CompositeFilterTest, TestFlush) {
+ InSequence sequence;
+
+ SetupAndAdd2Filters();
+
+ scoped_ptr<StrictMock<MockFilterCallback> > mock_callback(
+ new StrictMock<MockFilterCallback>(false));
+
+ // Make sure Flush() works before calling Play().
+ DoFlush();
+
+ // Transition to playing state.
+ DoPlay();
+
+ // Call Flush() to make sure we get an error because we are in
+ // the playing state.
+ ExpectInvalidStateFail(FLUSH);
+
+ // Issue a successful Pause().
+ DoPause();
+
+ // Make sure Flush() works after pausing.
+ DoFlush();
+
+ // Verify that we can transition back to the play state.
+ DoPlay();
+
+ // Transition to the stop state.
+ DoStop();
+
+ // Try calling Flush() to make sure we get an error because we are stopped.
+ ExpectInvalidStateFail(FLUSH);
+}
+
+// Test errors in the middle of a parallel call sequence like Flush().
+TEST_F(CompositeFilterTest, TestFlushErrors) {
+ InSequence sequence;
+
+ SetupAndAdd2Filters();
+
+ scoped_ptr<StrictMock<MockFilterCallback> > mock_callback(
+ new StrictMock<MockFilterCallback>(false));
+
+ EXPECT_CALL(*filter_1_, Flush(_));
+ EXPECT_CALL(*filter_2_, Flush(_));
+
+ // Call Flush() on the composite.
+ composite_->Flush(mock_callback->NewCallback());
+
+ // Simulate an error by calling SetError() on |filter_1_|'s FilterHost
+ // interface.
+ filter_1_->host()->SetError(PIPELINE_ERROR_OUT_OF_MEMORY);
+
+ RunFilter1Callback();
+
+ // Expect error to be reported and "pause done" callback to be called.
+ EXPECT_CALL(*mock_filter_host_, SetError(PIPELINE_ERROR_OUT_OF_MEMORY));
+ EXPECT_CALL(*mock_callback, OnFilterCallback());
+
+ RunFilter2Callback();
+
+ // Verify that Play/Pause/Flush/Seek fail now that an error occured.
+ ExpectInvalidStateFail(PLAY);
+ ExpectInvalidStateFail(PAUSE);
+ ExpectInvalidStateFail(FLUSH);
+ ExpectInvalidStateFail(SEEK);
+
+ // Make sure you can still Stop().
+ DoStop();
+}
+
+TEST_F(CompositeFilterTest, TestSeek) {
+ InSequence sequence;
+
+ SetupAndAdd2Filters();
+
+ // Verify that seek is allowed to be called before a Play() call.
+ DoSeek(base::TimeDelta::FromSeconds(5));
+
+ // Verify we can issue a Play() after the Seek().
+ DoPlay();
+
+ scoped_ptr<StrictMock<MockFilterCallback> > mock_callback(
+ new StrictMock<MockFilterCallback>(false));
+
+ // Try calling Seek() while playing to make sure we get an error.
+ ExpectInvalidStateFail(SEEK);
+
+ // Transition to paused state.
+ DoPause();
+
+ // Verify that seek is allowed after pausing.
+ DoSeek(base::TimeDelta::FromSeconds(5));
+
+ // Verify we can still play again.
+ DoPlay();
+
+ // Stop playback.
+ DoStop();
+
+ // Try calling Seek() to make sure we get an error.
+ ExpectInvalidStateFail(SEEK);
+}
+
+TEST_F(CompositeFilterTest, TestStop) {
+ InSequence sequence;
+
+ // Test Stop() before any other call.
+ SetupAndAdd2Filters();
+ DoStop();
+
+ // Test error during Stop() sequence.
+ SetupAndAdd2Filters();
+ scoped_ptr<StrictMock<MockFilterCallback> > mock_callback(
+ new StrictMock<MockFilterCallback>(false));
+
+ EXPECT_CALL(*filter_1_, Stop(_));
+
+ composite_->Stop(mock_callback->NewCallback());
+
+ // Have |filter_1_| signal an error.
+ filter_1_->host()->SetError(PIPELINE_ERROR_READ);
+
+ EXPECT_CALL(*filter_2_, Stop(_));
+
+ RunFilter1Callback();
+
+ EXPECT_CALL(*mock_callback, OnFilterCallback());
+
+ RunFilter2Callback();
+}
+
+// Test stopping in the middle of a serial call sequence.
+TEST_F(CompositeFilterTest, TestStopWhilePlayPending) {
+ InSequence sequence;
+
+ SetupAndAdd2Filters();
+
+ scoped_ptr<StrictMock<MockFilterCallback> > mock_callback(
+ new StrictMock<MockFilterCallback>());
+
+ EXPECT_CALL(*filter_1_, Play(_));
+
+ composite_->Play(mock_callback->NewCallback());
+
+ // Note: Play() is pending on |filter_1_| right now.
+
+ scoped_ptr<StrictMock<MockFilterCallback> > mock_callback_2(
+ new StrictMock<MockFilterCallback>(false));
+
+ EXPECT_CALL(*mock_callback, OnCallbackDestroyed());
+
+ composite_->Stop(mock_callback_2->NewCallback());
+
+ EXPECT_CALL(*filter_1_, Stop(_));
+
+ // Run |filter_1_|'s callback again to indicate Play() has completed.
+ RunFilter1Callback();
+
+ EXPECT_CALL(*filter_2_, Stop(_));
+
+ // Run |filter_1_|'s callback again to indicate Stop() has completed.
+ RunFilter1Callback();
+
+ EXPECT_CALL(*mock_callback_2, OnFilterCallback());
+
+ // Run |filter_2_|'s callback to indicate Stop() has completed.
+ RunFilter2Callback();
+}
+
+// Test stopping in the middle of a parallel call sequence.
+TEST_F(CompositeFilterTest, TestStopWhileFlushPending) {
+ InSequence sequence;
+
+ SetupAndAdd2Filters();
+
+ scoped_ptr<StrictMock<MockFilterCallback> > mock_callback(
+ new StrictMock<MockFilterCallback>());
+
+ EXPECT_CALL(*filter_1_, Flush(_));
+ EXPECT_CALL(*filter_2_, Flush(_));
+
+ composite_->Flush(mock_callback->NewCallback());
+
+ // Note: |filter_1_| and |filter_2_| have pending Flush() calls at this point.
+
+ scoped_ptr<StrictMock<MockFilterCallback> > mock_callback_2(
+ new StrictMock<MockFilterCallback>(false));
+
+ EXPECT_CALL(*mock_callback, OnCallbackDestroyed());
+
+ composite_->Stop(mock_callback_2->NewCallback());
+
+ // Run callback to indicate that |filter_1_|'s Flush() has completed.
+ RunFilter1Callback();
+
+ EXPECT_CALL(*filter_1_, Stop(_));
+
+ // Run callback to indicate that |filter_2_|'s Flush() has completed.
+ RunFilter2Callback();
+
+ EXPECT_CALL(*filter_2_, Stop(_));
+
+ // Run callback to indicate that |filter_1_|'s Stop() has completed.
+ RunFilter1Callback();
+
+ EXPECT_CALL(*mock_callback_2, OnFilterCallback());
+
+ // Run callback to indicate that |filter_2_|'s Stop() has completed.
+ RunFilter2Callback();
+}
+
+TEST_F(CompositeFilterTest, TestErrorWhilePlaying) {
+ InSequence sequence;
+
+ SetupAndAdd2Filters();
+
+ // Simulate an error on |filter_2_| while in kCreated state. This
+ // can happen if an error occurs during filter initialization.
+ EXPECT_CALL(*mock_filter_host_, SetError(PIPELINE_ERROR_OUT_OF_MEMORY));
+ filter_2_->host()->SetError(PIPELINE_ERROR_OUT_OF_MEMORY);
+
+ DoPlay();
+
+ // Simulate an error on |filter_2_| while playing.
+ EXPECT_CALL(*mock_filter_host_, SetError(PIPELINE_ERROR_OUT_OF_MEMORY));
+ filter_2_->host()->SetError(PIPELINE_ERROR_OUT_OF_MEMORY);
+
+ DoPause();
+
+ // Simulate an error on |filter_2_| while paused.
+ EXPECT_CALL(*mock_filter_host_, SetError(PIPELINE_ERROR_NETWORK));
+ filter_2_->host()->SetError(PIPELINE_ERROR_NETWORK);
+
+ DoStop();
+
+ // Verify that errors are not passed to |mock_filter_host_|
+ // after Stop() has been called.
+ filter_2_->host()->SetError(PIPELINE_ERROR_NETWORK);
+}
+
+// Make sure that state transitions act as expected even
+// if the composite doesn't contain any filters.
+TEST_F(CompositeFilterTest, TestEmptyComposite) {
+ InSequence sequence;
+
+ composite_->set_host(mock_filter_host_.get());
+
+ scoped_ptr<StrictMock<MockFilterCallback> > mock_callback(
+ new StrictMock<MockFilterCallback>(false));
+
+ // Issue a Play() and expect no errors.
+ EXPECT_CALL(*mock_callback, OnFilterCallback());
+ composite_->Play(mock_callback->NewCallback());
+
+ // Issue a Pause() and expect no errors.
+ EXPECT_CALL(*mock_callback, OnFilterCallback());
+ composite_->Pause(mock_callback->NewCallback());
+
+ // Issue a Flush() and expect no errors.
+ EXPECT_CALL(*mock_callback, OnFilterCallback());
+ composite_->Flush(mock_callback->NewCallback());
+
+ // Issue a Seek() and expect no errors.
+ EXPECT_CALL(*mock_callback, OnFilterCallback());
+ composite_->Seek(base::TimeDelta::FromSeconds(5),
+ mock_callback->NewCallback());
+
+ // Issue a Play() and expect no errors.
+ EXPECT_CALL(*mock_callback, OnFilterCallback());
+ composite_->Play(mock_callback->NewCallback());
+
+ // Issue a Stop() and expect no errors.
+ EXPECT_CALL(*mock_callback, OnFilterCallback());
+ composite_->Stop(mock_callback->NewCallback());
+}
+
+} // namespace media
diff --git a/media/base/mock_filters.cc b/media/base/mock_filters.cc
index f2110ff..e2d6ad9 100644
--- a/media/base/mock_filters.cc
+++ b/media/base/mock_filters.cc
@@ -20,4 +20,23 @@ void RunStopFilterCallback(FilterCallback* callback) {
delete callback;
}
+MockFilter::MockFilter() :
+ requires_message_loop_(false) {
+}
+
+MockFilter::MockFilter(bool requires_message_loop) :
+ requires_message_loop_(requires_message_loop) {
+}
+
+MockFilter::~MockFilter() {}
+
+bool MockFilter::requires_message_loop() const {
+ return requires_message_loop_;
+}
+
+const char* MockFilter::message_loop_name() const {
+ return "MockFilter";
+}
+
+
} // namespace media
diff --git a/media/base/mock_filters.h b/media/base/mock_filters.h
index 3b1deb3..36b676f 100644
--- a/media/base/mock_filters.h
+++ b/media/base/mock_filters.h
@@ -50,7 +50,12 @@ class Destroyable : public MockClass {
// gmock will track the number of times the methods are executed.
class MockFilterCallback {
public:
- MockFilterCallback() {}
+ MockFilterCallback() : run_destroy_callback_(true) {
+ }
+
+ MockFilterCallback(bool run_destroy_callback) :
+ run_destroy_callback_(run_destroy_callback) {
+ }
virtual ~MockFilterCallback() {}
MOCK_METHOD0(OnCallbackDestroyed, void());
@@ -61,7 +66,7 @@ class MockFilterCallback {
// destroyed. Clients should use NiceMock<> or StrictMock<> depending on the
// test.
FilterCallback* NewCallback() {
- return new CallbackImpl(this);
+ return new CallbackImpl(this, run_destroy_callback_);
}
private:
@@ -69,12 +74,15 @@ class MockFilterCallback {
// MockFilterCallback.
class CallbackImpl : public CallbackRunner<Tuple0> {
public:
- explicit CallbackImpl(MockFilterCallback* mock_callback)
- : mock_callback_(mock_callback) {
+ explicit CallbackImpl(MockFilterCallback* mock_callback,
+ bool run_destroy_callback)
+ : mock_callback_(mock_callback),
+ run_destroy_callback_(run_destroy_callback) {
}
virtual ~CallbackImpl() {
- mock_callback_->OnCallbackDestroyed();
+ if (run_destroy_callback_)
+ mock_callback_->OnCallbackDestroyed();
}
virtual void RunWithParams(const Tuple0& params) {
@@ -83,13 +91,41 @@ class MockFilterCallback {
private:
MockFilterCallback* mock_callback_;
+ bool run_destroy_callback_;
DISALLOW_COPY_AND_ASSIGN(CallbackImpl);
};
+ bool run_destroy_callback_;
DISALLOW_COPY_AND_ASSIGN(MockFilterCallback);
};
+class MockFilter : public Filter {
+ public:
+ MockFilter();
+ MockFilter(bool requires_message_loop);
+
+ // Filter implementation.
+ virtual bool requires_message_loop() const;
+ virtual const char* message_loop_name() const;
+
+ MOCK_METHOD1(Play, void(FilterCallback* callback));
+ MOCK_METHOD1(Pause, void(FilterCallback* callback));
+ MOCK_METHOD1(Flush, void(FilterCallback* callback));
+ MOCK_METHOD1(Stop, void(FilterCallback* callback));
+ MOCK_METHOD1(SetPlaybackRate, void(float playback_rate));
+ MOCK_METHOD2(Seek, void(base::TimeDelta time, FilterCallback* callback));
+ MOCK_METHOD0(OnAudioRendererDisabled, void());
+
+ protected:
+ virtual ~MockFilter();
+
+ private:
+ bool requires_message_loop_;
+
+ DISALLOW_COPY_AND_ASSIGN(MockFilter);
+};
+
class MockDataSource : public DataSource {
public:
MockDataSource() {}
diff --git a/media/base/pipeline.h b/media/base/pipeline.h
index 50158d2..61a1110 100644
--- a/media/base/pipeline.h
+++ b/media/base/pipeline.h
@@ -33,6 +33,8 @@ enum PipelineError {
PIPELINE_ERROR_COULD_NOT_RENDER,
PIPELINE_ERROR_READ,
PIPELINE_ERROR_AUDIO_HARDWARE,
+ PIPELINE_ERROR_OPERATION_PENDING,
+ PIPELINE_ERROR_INVALID_STATE,
// Demuxer related errors.
DEMUXER_ERROR_COULD_NOT_OPEN,
DEMUXER_ERROR_COULD_NOT_PARSE,
diff --git a/media/base/pipeline_impl.cc b/media/base/pipeline_impl.cc
index 3b27943..3a68374 100644
--- a/media/base/pipeline_impl.cc
+++ b/media/base/pipeline_impl.cc
@@ -22,6 +22,7 @@ class PipelineImpl::PipelineInitState {
scoped_refptr<Demuxer> demuxer_;
scoped_refptr<AudioDecoder> audio_decoder_;
scoped_refptr<VideoDecoder> video_decoder_;
+ scoped_refptr<CompositeFilter> composite_;
};
PipelineImpl::PipelineImpl(MessageLoop* message_loop)
@@ -29,7 +30,6 @@ PipelineImpl::PipelineImpl(MessageLoop* message_loop)
clock_(new ClockImpl(&base::Time::Now)),
waiting_for_clock_update_(false),
state_(kCreated),
- remaining_transitions_(0),
current_bytes_(0) {
ResetState();
}
@@ -317,6 +317,10 @@ void PipelineImpl::ResetState() {
rendered_mime_types_.clear();
}
+void PipelineImpl::set_state(State next_state) {
+ state_ = next_state;
+}
+
bool PipelineImpl::IsPipelineOk() {
return PIPELINE_OK == GetError();
}
@@ -572,22 +576,25 @@ void PipelineImpl::InitializeTask() {
// Just created, create data source.
if (state_ == kCreated) {
- state_ = kInitDataSource;
+ set_state(kInitDataSource);
pipeline_init_state_.reset(new PipelineInitState());
+ pipeline_init_state_->composite_ = new CompositeFilter(message_loop_);
+ pipeline_init_state_->composite_->set_host(this);
+
InitializeDataSource();
return;
}
// Data source created, create demuxer.
if (state_ == kInitDataSource) {
- state_ = kInitDemuxer;
+ set_state(kInitDemuxer);
InitializeDemuxer(pipeline_init_state_->data_source_);
return;
}
// Demuxer created, create audio decoder.
if (state_ == kInitDemuxer) {
- state_ = kInitAudioDecoder;
+ set_state(kInitAudioDecoder);
// If this method returns false, then there's no audio stream.
if (InitializeAudioDecoder(pipeline_init_state_->demuxer_))
return;
@@ -595,7 +602,7 @@ void PipelineImpl::InitializeTask() {
// Assuming audio decoder was created, create audio renderer.
if (state_ == kInitAudioDecoder) {
- state_ = kInitAudioRenderer;
+ set_state(kInitAudioRenderer);
// Returns false if there's no audio stream.
if (InitializeAudioRenderer(pipeline_init_state_->audio_decoder_)) {
InsertRenderedMimeType(mime_type::kMajorTypeAudio);
@@ -606,14 +613,14 @@ void PipelineImpl::InitializeTask() {
// Assuming audio renderer was created, create video decoder.
if (state_ == kInitAudioRenderer) {
// Then perform the stage of initialization, i.e. initialize video decoder.
- state_ = kInitVideoDecoder;
+ set_state(kInitVideoDecoder);
if (InitializeVideoDecoder(pipeline_init_state_->demuxer_))
return;
}
// Assuming video decoder was created, create video renderer.
if (state_ == kInitVideoDecoder) {
- state_ = kInitVideoRenderer;
+ set_state(kInitVideoRenderer);
if (InitializeVideoRenderer(pipeline_init_state_->video_decoder_)) {
InsertRenderedMimeType(mime_type::kMajorTypeVideo);
return;
@@ -629,6 +636,8 @@ void PipelineImpl::InitializeTask() {
// Clear the collection of filters.
filter_collection_->Clear();
+ pipeline_filter_ = pipeline_init_state_->composite_;
+
// Clear init state since we're done initializing.
pipeline_init_state_.reset();
@@ -637,12 +646,11 @@ void PipelineImpl::InitializeTask() {
PlaybackRateChangedTask(GetPlaybackRate());
VolumeChangedTask(GetVolume());
- // Fire the initial seek request to get the filters to preroll.
+ // Fire the seek request to get the filters to preroll.
seek_pending_ = true;
- state_ = kSeeking;
- remaining_transitions_ = filters_.size();
+ set_state(kSeeking);
seek_timestamp_ = base::TimeDelta();
- filters_.front()->Seek(seek_timestamp_,
+ pipeline_filter_->Seek(seek_timestamp_,
NewCallback(this, &PipelineImpl::OnFilterStateTransition));
}
}
@@ -705,11 +713,7 @@ void PipelineImpl::PlaybackRateChangedTask(float playback_rate) {
AutoLock auto_lock(lock_);
clock_->SetPlaybackRate(playback_rate);
}
- for (FilterVector::iterator iter = filters_.begin();
- iter != filters_.end();
- ++iter) {
- (*iter)->SetPlaybackRate(playback_rate);
- }
+ pipeline_filter_->SetPlaybackRate(playback_rate);
}
void PipelineImpl::VolumeChangedTask(float volume) {
@@ -745,10 +749,9 @@ void PipelineImpl::SeekTask(base::TimeDelta time,
// kSeeking (for each filter)
// kStarting (for each filter)
// kStarted
- state_ = kPausing;
+ set_state(kPausing);
seek_timestamp_ = time;
seek_callback_.reset(seek_callback);
- remaining_transitions_ = filters_.size();
// Kick off seeking!
{
@@ -757,7 +760,7 @@ void PipelineImpl::SeekTask(base::TimeDelta time,
if (!waiting_for_clock_update_)
clock_->Pause();
}
- filters_.front()->Pause(
+ pipeline_filter_->Pause(
NewCallback(this, &PipelineImpl::OnFilterStateTransition));
}
@@ -790,7 +793,7 @@ void PipelineImpl::NotifyEndedTask() {
}
// Transition to ended, executing the callback if present.
- state_ = kEnded;
+ set_state(kEnded);
if (ended_callback_.get()) {
ended_callback_->Run();
}
@@ -814,11 +817,7 @@ void PipelineImpl::DisableAudioRendererTask() {
audio_disabled_ = true;
// Notify all filters of disabled audio renderer.
- for (FilterVector::iterator iter = filters_.begin();
- iter != filters_.end();
- ++iter) {
- (*iter)->OnAudioRendererDisabled();
- }
+ pipeline_filter_->OnAudioRendererDisabled();
}
void PipelineImpl::FilterStateTransitionTask() {
@@ -837,42 +836,31 @@ void PipelineImpl::FilterStateTransitionTask() {
// Decrement the number of remaining transitions, making sure to transition
// to the next state if needed.
- DCHECK(remaining_transitions_ <= filters_.size());
- DCHECK(remaining_transitions_ > 0u);
- if (--remaining_transitions_ == 0) {
- state_ = FindNextState(state_);
- if (state_ == kSeeking) {
- AutoLock auto_lock(lock_);
- clock_->SetTime(seek_timestamp_);
- }
-
- if (TransientState(state_)) {
- remaining_transitions_ = filters_.size();
- }
+ set_state(FindNextState(state_));
+ if (state_ == kSeeking) {
+ AutoLock auto_lock(lock_);
+ clock_->SetTime(seek_timestamp_);
}
// Carry out the action for the current state.
if (TransientState(state_)) {
- Filter* filter = filters_[filters_.size() - remaining_transitions_];
if (state_ == kPausing) {
- filter->Pause(NewCallback(this, &PipelineImpl::OnFilterStateTransition));
+ pipeline_filter_->Pause(
+ NewCallback(this, &PipelineImpl::OnFilterStateTransition));
} else if (state_ == kFlushing) {
- // We had to use parallel flushing all filters.
- if (remaining_transitions_ == filters_.size()) {
- for (size_t i = 0; i < filters_.size(); i++) {
- filters_[i]->Flush(
- NewCallback(this, &PipelineImpl::OnFilterStateTransition));
- }
- }
+ pipeline_filter_->Flush(
+ NewCallback(this, &PipelineImpl::OnFilterStateTransition));
} else if (state_ == kSeeking) {
- filter->Seek(seek_timestamp_,
+ pipeline_filter_->Seek(seek_timestamp_,
NewCallback(this, &PipelineImpl::OnFilterStateTransition));
} else if (state_ == kStarting) {
- filter->Play(NewCallback(this, &PipelineImpl::OnFilterStateTransition));
+ pipeline_filter_->Play(
+ NewCallback(this,&PipelineImpl::OnFilterStateTransition));
} else if (state_ == kStopping) {
- filter->Stop(NewCallback(this, &PipelineImpl::OnFilterStateTransition));
+ pipeline_filter_->Stop(
+ NewCallback(this, &PipelineImpl::OnFilterStateTransition));
} else {
- NOTREACHED();
+ NOTREACHED() << "Unexpected state: " << state_;
}
} else if (state_ == kStarted) {
FinishInitialization();
@@ -897,7 +885,7 @@ void PipelineImpl::FilterStateTransitionTask() {
} else if (IsPipelineStopped()) {
FinishDestroyingFiltersTask();
} else {
- NOTREACHED();
+ NOTREACHED() << "Unexpected state: " << state_;
}
}
@@ -905,24 +893,11 @@ void PipelineImpl::FinishDestroyingFiltersTask() {
DCHECK_EQ(MessageLoop::current(), message_loop_);
DCHECK(IsPipelineStopped());
- // Stop every running filter thread.
- //
- // TODO(scherkus): can we watchdog this section to detect wedged threads?
- for (FilterThreadVector::iterator iter = filter_threads_.begin();
- iter != filter_threads_.end();
- ++iter) {
- (*iter)->Stop();
- }
-
// Clear renderer references.
audio_renderer_ = NULL;
video_renderer_ = NULL;
- // Reset the pipeline, which will decrement a reference to this object.
- // We will get destroyed as soon as the remaining tasks finish executing.
- // To be safe, we'll set our pipeline reference to NULL.
- filters_.clear();
- STLDeleteElements(&filter_threads_);
+ pipeline_filter_ = NULL;
stop_pending_ = false;
tearing_down_ = false;
@@ -938,7 +913,7 @@ void PipelineImpl::FinishDestroyingFiltersTask() {
}
} else {
// Destroying filters due to SetError().
- state_ = kError;
+ set_state(kError);
// If our owner has requested to be notified of an error.
if (error_callback_.get()) {
error_callback_->Run();
@@ -947,28 +922,12 @@ void PipelineImpl::FinishDestroyingFiltersTask() {
}
bool PipelineImpl::PrepareFilter(scoped_refptr<Filter> filter) {
- DCHECK_EQ(MessageLoop::current(), message_loop_);
- DCHECK(IsPipelineOk());
-
- // Create a dedicated thread for this filter if applicable.
- if (filter->requires_message_loop()) {
- scoped_ptr<base::Thread> thread(
- new base::Thread(filter->message_loop_name()));
- if (!thread.get() || !thread->Start()) {
- NOTREACHED() << "Could not start filter thread";
- SetError(PIPELINE_ERROR_INITIALIZATION_FAILED);
- return false;
- }
+ bool ret = pipeline_init_state_->composite_->AddFilter(filter.get());
- filter->set_message_loop(thread->message_loop());
- filter_threads_.push_back(thread.release());
+ if (!ret) {
+ SetError(PIPELINE_ERROR_INITIALIZATION_FAILED);
}
-
- // Register ourselves as the filter's host.
- DCHECK(IsPipelineOk());
- filter->set_host(this);
- filters_.push_back(make_scoped_refptr(filter.get()));
- return true;
+ return ret;
}
void PipelineImpl::InitializeDataSource() {
@@ -1145,23 +1104,21 @@ void PipelineImpl::TearDownPipeline() {
tearing_down_ = true;
if (IsPipelineInitializing()) {
- // Notify the client that starting did not complete, if necessary.
- FinishInitialization();
- }
+ // Make it look like initialization was successful.
+ pipeline_filter_ = pipeline_init_state_->composite_;
+ pipeline_init_state_.reset();
- remaining_transitions_ = filters_.size();
- if (remaining_transitions_ > 0) {
- if (IsPipelineInitializing()) {
- state_ = kStopping;
- filters_.front()->Stop(NewCallback(
- this, &PipelineImpl::OnFilterStateTransition));
- } else {
- state_ = kPausing;
- filters_.front()->Pause(NewCallback(
- this, &PipelineImpl::OnFilterStateTransition));
- }
+ set_state(kStopping);
+ pipeline_filter_->Stop(NewCallback(
+ this, &PipelineImpl::OnFilterStateTransition));
+
+ FinishInitialization();
+ } else if (pipeline_filter_.get()) {
+ set_state(kPausing);
+ pipeline_filter_->Pause(NewCallback(
+ this, &PipelineImpl::OnFilterStateTransition));
} else {
- state_ = kStopped;
+ set_state(kStopped);
message_loop_->PostTask(FROM_HERE,
NewRunnableMethod(this, &PipelineImpl::FinishDestroyingFiltersTask));
}
diff --git a/media/base/pipeline_impl.h b/media/base/pipeline_impl.h
index aa99f73..fbc2b3d 100644
--- a/media/base/pipeline_impl.h
+++ b/media/base/pipeline_impl.h
@@ -18,6 +18,7 @@
#include "base/thread.h"
#include "base/time.h"
#include "media/base/clock.h"
+#include "media/base/composite_filter.h"
#include "media/base/filter_host.h"
#include "media/base/pipeline.h"
@@ -119,6 +120,9 @@ class PipelineImpl : public Pipeline, public FilterHost {
// is used by the constructor, and the Stop() method.
void ResetState();
+ // Updates |state_|. All state transitions should use this call.
+ void set_state(State next_state);
+
// Simple method used to make sure the pipeline is running normally.
bool IsPipelineOk();
@@ -353,12 +357,6 @@ class PipelineImpl : public Pipeline, public FilterHost {
// Member that tracks the current state.
State state_;
- // For kPausing, kSeeking and kStarting, we need to track how many filters
- // have completed transitioning to the destination state. When
- // |remaining_transitions_| reaches 0 the pipeline can transition out
- // of the current state.
- size_t remaining_transitions_;
-
// For kSeeking we need to remember where we're seeking between filter
// replies.
base::TimeDelta seek_timestamp_;
@@ -388,20 +386,14 @@ class PipelineImpl : public Pipeline, public FilterHost {
scoped_ptr<PipelineCallback> error_callback_;
scoped_ptr<PipelineCallback> network_callback_;
- // Vector of our filters and map maintaining the relationship between the
- // FilterType and the filter itself.
- typedef std::vector<scoped_refptr<Filter> > FilterVector;
- FilterVector filters_;
+ // Reference to the filter(s) that constitute the pipeline.
+ scoped_refptr<Filter> pipeline_filter_;
// Renderer references used for setting the volume and determining
// when playback has finished.
scoped_refptr<AudioRenderer> audio_renderer_;
scoped_refptr<VideoRenderer> video_renderer_;
- // Vector of threads owned by the pipeline and being used by filters.
- typedef std::vector<base::Thread*> FilterThreadVector;
- FilterThreadVector filter_threads_;
-
// Helper class that stores filter references during pipeline
// initialization.
class PipelineInitState;
diff --git a/media/media.gyp b/media/media.gyp
index 6fe8e83..0331a77 100644
--- a/media/media.gyp
+++ b/media/media.gyp
@@ -76,6 +76,8 @@
'base/clock.h',
'base/clock_impl.cc',
'base/clock_impl.h',
+ 'base/composite_filter.cc',
+ 'base/composite_filter.h',
'base/data_buffer.cc',
'base/data_buffer.h',
'base/djb2.cc',
@@ -269,6 +271,7 @@
'audio/mac/audio_output_mac_unittest.cc',
'audio/simple_sources_unittest.cc',
'audio/win/audio_output_win_unittest.cc',
+ 'base/composite_filter_unittest.cc',
'base/clock_impl_unittest.cc',
'base/data_buffer_unittest.cc',
'base/djb2_unittest.cc',