summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--media/base/filter_host_impl.cc6
-rw-r--r--media/base/filter_host_impl.h15
-rw-r--r--media/base/pipeline.h49
-rw-r--r--media/base/pipeline_impl.cc475
-rw-r--r--media/base/pipeline_impl.h139
-rw-r--r--media/base/pipeline_impl_unittest.cc150
-rw-r--r--media/player/movie.cc8
-rw-r--r--media/player/movie.h2
-rw-r--r--webkit/glue/webmediaplayer_impl.cc54
-rw-r--r--webkit/glue/webmediaplayer_impl.h6
10 files changed, 477 insertions, 427 deletions
diff --git a/media/base/filter_host_impl.cc b/media/base/filter_host_impl.cc
index 96e4e93..09f1791 100644
--- a/media/base/filter_host_impl.cc
+++ b/media/base/filter_host_impl.cc
@@ -7,11 +7,11 @@
namespace media {
void FilterHostImpl::InitializationComplete() {
- pipeline_thread_->InitializationComplete(this);
+ pipeline_internal_->InitializationComplete(this);
}
void FilterHostImpl::Error(PipelineError error) {
- pipeline_thread_->Error(error);
+ pipeline_internal_->Error(error);
}
base::TimeDelta FilterHostImpl::GetTime() const {
@@ -19,7 +19,7 @@ base::TimeDelta FilterHostImpl::GetTime() const {
}
void FilterHostImpl::SetTime(base::TimeDelta time) {
- pipeline_thread_->SetTime(time);
+ pipeline_internal_->SetTime(time);
}
void FilterHostImpl::SetDuration(base::TimeDelta duration) {
diff --git a/media/base/filter_host_impl.h b/media/base/filter_host_impl.h
index 1fcb0c4..71e2417 100644
--- a/media/base/filter_host_impl.h
+++ b/media/base/filter_host_impl.h
@@ -27,15 +27,15 @@ class FilterHostImpl : public FilterHost {
virtual void SetVideoSize(size_t width, size_t height);
// These methods are public, but are intended for use by the
- // PipelineThread class only.
+ // PipelineInternal class only.
// Creates a FilterHostImpl object and populates the |filter_type_| member
// by calling the Filter class's static filter_type() method. This ensures
// that the GetFilter method can safely cast the filter interface from the
// MediaFilter base class interface to the specific Filter interface.
template <class Filter>
- FilterHostImpl(PipelineThread* pipeline_thread, Filter* filter)
- : pipeline_thread_(pipeline_thread),
+ FilterHostImpl(PipelineInternal* pipeline_internal, Filter* filter)
+ : pipeline_internal_(pipeline_internal),
filter_type_(Filter::filter_type()),
filter_(filter),
stopped_(false) {
@@ -54,15 +54,16 @@ class FilterHostImpl : public FilterHost {
// Stops the filter.
void Stop();
- // Used by the PipelineThread to call Seek and SetRate methods on filters.
+ // Used by the PipelineInternal to call Seek() and SetRate() methods on
+ // filters.
MediaFilter* media_filter() const { return filter_; }
private:
// Useful method for getting the pipeline.
- PipelineImpl* pipeline() const { return pipeline_thread_->pipeline(); }
+ PipelineImpl* pipeline() const { return pipeline_internal_->pipeline(); }
- // PipelineThread that owns this FilterHostImpl.
- PipelineThread* const pipeline_thread_;
+ // PipelineInternal that owns this FilterHostImpl.
+ PipelineInternal* const pipeline_internal_;
// The FilterType of the filter this host contains.
FilterType const filter_type_;
diff --git a/media/base/pipeline.h b/media/base/pipeline.h
index d418e4b..d1f5d90 100644
--- a/media/base/pipeline.h
+++ b/media/base/pipeline.h
@@ -56,41 +56,31 @@ class Pipeline {
// construct a filter chain. Returns true if successful, false otherwise
// (i.e., pipeline already started). Note that a return value of true
// only indicates that the initialization process has started successfully.
- // Pipeline initialization is an inherently asynchronous process. Clients
- // should not call SetPlaybackRate(), Seek(), or SetVolume() until
- // initialization is complete. Clients can either poll the IsInitialized()
- // method (which is discouraged) or use the |start_callback| as described
- // below.
+ // Pipeline initialization is an inherently asynchronous process. Clients can
+ // either poll the IsInitialized() method (discouraged) or use the
+ // |start_callback| as described below.
//
// This method is asynchronous and can execute a callback when completed.
// If the caller provides a |start_callback|, it will be called when the
// pipeline initialization completes. If successful, the callback's bool
// parameter will be true. If the callback is called with false, then the
- // client can use the GetError() method to obtain more information about the
- // reason initialization failed. The prototype for the client callback is:
- // void Client::PipelineInitComplete(bool init_was_successful);
- //
- // Note that clients must not call the Stop method from within the
- // |start_callback|. Other methods, including SetPlaybackRate(), Seek(), and
- // SetVolume() may be called. The client will be called on a thread owned by
- // the pipeline class, not on the thread that originally called the Start()
- // method.
+ // client can use GetError() to obtain more information about the reason
+ // initialization failed.
virtual bool Start(FilterFactory* filter_factory,
const std::string& url,
PipelineCallback* start_callback) = 0;
- // Stops the pipeline and resets to an uninitialized state. This method
- // will block the calling thread until the pipeline has been completely
- // torn down and reset to an uninitialized state. After calling Stop(), it
- // is acceptable to call Start() again since Stop() leaves the pipeline
- // in a state identical to a newly created pipeline.
+ // Asynchronously stops the pipeline and resets it to an uninitialized state.
+ // If provided, |stop_callback| will be executed when the pipeline has been
+ // completely torn down and reset to an uninitialized state. It is acceptable
+ // to call Start() again once the callback has finished executing.
//
- // Stop() must be called before destroying the pipeline.
+ // Stop() must be called before destroying the pipeline. Clients can
+ // determine whether Stop() must be called by checking IsRunning().
//
- // TODO(scherkus): it shouldn't be acceptable to call Start() again after you
- // Stop() a pipeline -- it should be destroyed and replaced with a new
- // instance.
- virtual void Stop() = 0;
+ // TODO(scherkus): ideally clients would destroy the pipeline after calling
+ // Stop() and create a new pipeline as needed.
+ virtual void Stop(PipelineCallback* stop_callback) = 0;
// Attempt to seek to the position specified by time. |seek_callback| will be
// executed when the all filters in the pipeline have processed the seek.
@@ -98,9 +88,14 @@ class Pipeline {
// (i.e., streaming media).
virtual void Seek(base::TimeDelta time, PipelineCallback* seek_callback) = 0;
- // Returns the current initialization state of the pipeline. Note that this
- // will be set to true prior to a executing |init_complete_callback| if
- // initialization is successful.
+ // Returns true if the pipeline has been started via Start(). If IsRunning()
+ // returns true, it is expected that Stop() will be called before destroying
+ // the pipeline.
+ virtual bool IsRunning() const = 0;
+
+ // Returns true if the pipeline has been started and fully initialized to a
+ // point where playback controls will be respected. Note that it is possible
+ // for a pipeline to be started but not initialized (i.e., an error occurred).
virtual bool IsInitialized() const = 0;
// If the |major_mime_type| exists in the pipeline and is being rendered, this
diff --git a/media/base/pipeline_impl.cc b/media/base/pipeline_impl.cc
index 56a71a6..68d509c 100644
--- a/media/base/pipeline_impl.cc
+++ b/media/base/pipeline_impl.cc
@@ -31,6 +31,24 @@ bool SupportsSetMessageLoop() {
}
}
+// Small helper function to help us name filter threads for debugging.
+//
+// TODO(scherkus): figure out a cleaner way to derive the filter thread name.
+template <class Filter>
+const char* GetThreadName() {
+ DCHECK(SupportsSetMessageLoop<Filter>());
+ switch (Filter::filter_type()) {
+ case FILTER_DEMUXER:
+ return "DemuxerThread";
+ case FILTER_AUDIO_DECODER:
+ return "AudioDecoderThread";
+ case FILTER_VIDEO_DECODER:
+ return "VideoDecoderThread";
+ default:
+ return "FilterThread";
+ }
+}
+
// Helper function used with NewRunnableMethod to implement a (very) crude
// blocking counter.
//
@@ -46,62 +64,61 @@ void DecrementCounter(Lock* lock, ConditionVariable* cond_var, int* count) {
} // namespace
-PipelineImpl::PipelineImpl() {
+PipelineImpl::PipelineImpl(MessageLoop* message_loop)
+ : message_loop_(message_loop) {
ResetState();
}
PipelineImpl::~PipelineImpl() {
- Stop();
+ DCHECK(!pipeline_internal_)
+ << "Stop() must complete before destroying object";
}
-// Creates the PipelineThread and calls it's start method.
+// Creates the PipelineInternal and calls it's start method.
bool PipelineImpl::Start(FilterFactory* factory,
const std::string& url,
- PipelineCallback* init_complete_callback) {
- DCHECK(!pipeline_thread_);
+ PipelineCallback* start_callback) {
+ DCHECK(!pipeline_internal_);
DCHECK(factory);
- DCHECK(!initialized_);
- DCHECK(!IsPipelineThread());
- if (!pipeline_thread_ && factory) {
- pipeline_thread_ = new PipelineThread(this);
- if (pipeline_thread_) {
- // TODO(ralphl): Does the callback get copied by these fancy templates?
- // if so, then do I want to always delete it here???
- if (pipeline_thread_->Start(factory, url, init_complete_callback)) {
- return true;
- }
- pipeline_thread_ = NULL; // Releases reference to destroy thread
- }
+ if (pipeline_internal_ || !factory) {
+ return false;
}
- delete init_complete_callback;
- return false;
-}
-// Stop the PipelineThread and return to a state identical to that of a newly
-// created PipelineImpl object.
-void PipelineImpl::Stop() {
- DCHECK(!IsPipelineThread());
+ // Create and start the PipelineInternal.
+ pipeline_internal_ = new PipelineInternal(this, message_loop_);
+ if (!pipeline_internal_) {
+ NOTREACHED() << "Could not create PipelineInternal";
+ return false;
+ }
+ pipeline_internal_->Start(factory, url, start_callback);
+ return true;
+}
- if (pipeline_thread_) {
- pipeline_thread_->Stop();
+// Stop the PipelineInternal who will NULL our reference to it and reset our
+// state to a newly created PipelineImpl object.
+void PipelineImpl::Stop(PipelineCallback* stop_callback) {
+ if (pipeline_internal_) {
+ pipeline_internal_->Stop(stop_callback);
}
- ResetState();
}
void PipelineImpl::Seek(base::TimeDelta time,
PipelineCallback* seek_callback) {
- DCHECK(!IsPipelineThread());
-
if (IsPipelineOk()) {
- pipeline_thread_->Seek(time, seek_callback);
+ pipeline_internal_->Seek(time, seek_callback);
} else {
NOTREACHED();
}
}
+bool PipelineImpl::IsRunning() const {
+ AutoLock auto_lock(const_cast<Lock&>(lock_));
+ return pipeline_internal_ != NULL;
+}
+
bool PipelineImpl::IsInitialized() const {
AutoLock auto_lock(lock_);
- return initialized_;
+ return pipeline_internal_ && pipeline_internal_->IsInitialized();
}
bool PipelineImpl::IsRendered(const std::string& major_mime_type) const {
@@ -117,10 +134,8 @@ float PipelineImpl::GetPlaybackRate() const {
}
void PipelineImpl::SetPlaybackRate(float rate) {
- DCHECK(!IsPipelineThread());
-
if (IsPipelineOk() && rate >= 0.0f) {
- pipeline_thread_->SetPlaybackRate(rate);
+ pipeline_internal_->SetPlaybackRate(rate);
} else {
// It's OK for a client to call SetPlaybackRate(0.0f) if we're stopped.
DCHECK(rate == 0.0f && playback_rate_ == 0.0f);
@@ -133,10 +148,8 @@ float PipelineImpl::GetVolume() const {
}
void PipelineImpl::SetVolume(float volume) {
- DCHECK(!IsPipelineThread());
-
if (IsPipelineOk() && volume >= 0.0f && volume <= 1.0f) {
- pipeline_thread_->SetVolume(volume);
+ pipeline_internal_->SetVolume(volume);
} else {
NOTREACHED();
}
@@ -182,8 +195,7 @@ PipelineError PipelineImpl::GetError() const {
void PipelineImpl::ResetState() {
AutoLock auto_lock(lock_);
- pipeline_thread_ = NULL;
- initialized_ = false;
+ pipeline_internal_ = NULL;
duration_ = base::TimeDelta();
buffered_time_ = base::TimeDelta();
buffered_bytes_ = 0;
@@ -198,12 +210,7 @@ void PipelineImpl::ResetState() {
}
bool PipelineImpl::IsPipelineOk() const {
- return pipeline_thread_ && initialized_ && PIPELINE_OK == error_;
-}
-
-bool PipelineImpl::IsPipelineThread() const {
- return pipeline_thread_ &&
- PlatformThread::CurrentId() == pipeline_thread_->thread_id();
+ return pipeline_internal_ && PIPELINE_OK == error_;
}
void PipelineImpl::SetDuration(base::TimeDelta duration) {
@@ -263,100 +270,86 @@ void PipelineImpl::InsertRenderedMimeType(const std::string& major_mime_type) {
//-----------------------------------------------------------------------------
-PipelineThread::PipelineThread(PipelineImpl* pipeline)
+PipelineInternal::PipelineInternal(PipelineImpl* pipeline,
+ MessageLoop* message_loop)
: pipeline_(pipeline),
- thread_("PipelineThread"),
+ message_loop_(message_loop),
state_(kCreated) {
}
-PipelineThread::~PipelineThread() {
- Stop();
- DCHECK(state_ == kStopped || state_ == kError);
+PipelineInternal::~PipelineInternal() {
+ DCHECK(state_ == kCreated || state_ == kStopped);
}
-// This method is called on the client's thread. It starts the pipeline's
-// dedicated thread and posts a task to call the StartTask() method on that
-// thread.
-bool PipelineThread::Start(FilterFactory* filter_factory,
- const std::string& url,
- PipelineCallback* init_complete_callback) {
- DCHECK_EQ(kCreated, state_);
- if (thread_.Start()) {
- filter_factory_ = filter_factory;
- url_ = url;
- init_callback_.reset(init_complete_callback);
- message_loop()->PostTask(FROM_HERE,
- NewRunnableMethod(this, &PipelineThread::StartTask));
- return true;
- }
- return false;
+// Called on client's thread.
+void PipelineInternal::Start(FilterFactory* filter_factory,
+ const std::string& url,
+ PipelineCallback* start_callback) {
+ DCHECK(filter_factory);
+ message_loop_->PostTask(FROM_HERE,
+ NewRunnableMethod(this, &PipelineInternal::StartTask, filter_factory, url,
+ start_callback));
}
-// Called on the client's thread. If the thread has been started, then posts
-// a task to call the StopTask() method, then waits until the thread has
-// stopped.
-void PipelineThread::Stop() {
- if (thread_.IsRunning()) {
- message_loop()->PostTask(FROM_HERE,
- NewRunnableMethod(this, &PipelineThread::StopTask));
- thread_.Stop();
- }
- DCHECK(filter_hosts_.empty());
- DCHECK(filter_threads_.empty());
+// Called on client's thread.
+void PipelineInternal::Stop(PipelineCallback* stop_callback) {
+ message_loop_->PostTask(FROM_HERE,
+ NewRunnableMethod(this, &PipelineInternal::StopTask, stop_callback));
}
// Called on client's thread.
-void PipelineThread::Seek(base::TimeDelta time,
+void PipelineInternal::Seek(base::TimeDelta time,
PipelineCallback* seek_callback) {
- message_loop()->PostTask(FROM_HERE,
- NewRunnableMethod(this, &PipelineThread::SeekTask, time, seek_callback));
+ message_loop_->PostTask(FROM_HERE,
+ NewRunnableMethod(this, &PipelineInternal::SeekTask, time,
+ seek_callback));
}
// Called on client's thread.
-void PipelineThread::SetPlaybackRate(float rate) {
- message_loop()->PostTask(FROM_HERE,
- NewRunnableMethod(this, &PipelineThread::SetPlaybackRateTask, rate));
+void PipelineInternal::SetPlaybackRate(float rate) {
+ message_loop_->PostTask(FROM_HERE,
+ NewRunnableMethod(this, &PipelineInternal::SetPlaybackRateTask, rate));
}
// Called on client's thread.
-void PipelineThread::SetVolume(float volume) {
- message_loop()->PostTask(FROM_HERE,
- NewRunnableMethod(this, &PipelineThread::SetVolumeTask, volume));
+void PipelineInternal::SetVolume(float volume) {
+ message_loop_->PostTask(FROM_HERE,
+ NewRunnableMethod(this, &PipelineInternal::SetVolumeTask, volume));
}
// Called from any thread.
-void PipelineThread::InitializationComplete(FilterHostImpl* host) {
+void PipelineInternal::InitializationComplete(FilterHostImpl* host) {
if (IsPipelineOk()) {
- // Continue the start task by proceeding to the next stage.
- message_loop()->PostTask(FROM_HERE,
- NewRunnableMethod(this, &PipelineThread::StartTask));
+ // Continue the initialize task by proceeding to the next stage.
+ message_loop_->PostTask(FROM_HERE,
+ NewRunnableMethod(this, &PipelineInternal::InitializeTask));
}
}
// Called from any thread. Updates the pipeline time.
-void PipelineThread::SetTime(base::TimeDelta time) {
- pipeline()->SetTime(time);
+void PipelineInternal::SetTime(base::TimeDelta time) {
+ // TODO(scherkus): why not post a task?
+ pipeline_->SetTime(time);
}
-// Called from any thread. Sets the pipeline |error_| member and schedules a
-// task to stop all the filters in the pipeline. Note that the thread will
-// continue to run until the client calls Pipeline::Stop(), but nothing will
-// be processed since filters will not be able to post tasks.
-void PipelineThread::Error(PipelineError error) {
- // If this method returns false, then an error has already happened, so no
- // reason to run the StopTask again. It's going to happen.
- if (pipeline()->InternalSetError(error)) {
- message_loop()->PostTask(FROM_HERE,
- NewRunnableMethod(this, &PipelineThread::StopTask));
- }
+// Called from any thread. Sets the pipeline |error_| member and destroys all
+// filters.
+void PipelineInternal::Error(PipelineError error) {
+ message_loop_->PostTask(FROM_HERE,
+ NewRunnableMethod(this, &PipelineInternal::ErrorTask, error));
}
-// Called as a result of destruction of the thread.
-//
-// TODO(scherkus): this can block the client due to synchronous Stop() API call.
-void PipelineThread::WillDestroyCurrentMessageLoop() {
- STLDeleteElements(&filter_hosts_);
- STLDeleteElements(&filter_threads_);
+void PipelineInternal::StartTask(FilterFactory* filter_factory,
+ const std::string& url,
+ PipelineCallback* start_callback) {
+ DCHECK_EQ(MessageLoop::current(), message_loop_);
+ DCHECK_EQ(kCreated, state_);
+ filter_factory_ = filter_factory;
+ url_ = url;
+ start_callback_.reset(start_callback);
+
+ // Kick off initialization.
+ InitializeTask();
}
// Main initialization method called on the pipeline thread. This code attempts
@@ -370,18 +363,17 @@ void PipelineThread::WillDestroyCurrentMessageLoop() {
// then connects the VideoDecoder to a VideoRenderer.
//
// When all required filters have been created and have called their
-// FilterHost's InitializationComplete method, the pipeline's |initialized_|
-// member is set to true, and, if the client provided an
-// |init_complete_callback_|, it is called with "true".
+// FilterHost's InitializationComplete() method, the pipeline will update its
+// state to kStarted and |init_callback_|, will be executed.
//
// If initialization fails, the client's callback will still be called, but
// the bool parameter passed to it will be false.
//
-// TODO(hclam): StartTask is now starting the pipeline asynchronously. It
+// TODO(hclam): InitializeTask() is now starting the pipeline asynchronously. It
// works like a big state change table. If we no longer need to start filters
// in order, we need to get rid of all the state change.
-void PipelineThread::StartTask() {
- DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id());
+void PipelineInternal::InitializeTask() {
+ DCHECK_EQ(MessageLoop::current(), message_loop_);
// If we have received the stop signal, return immediately.
if (state_ == kStopped)
@@ -391,7 +383,6 @@ void PipelineThread::StartTask() {
// Just created, create data source.
if (state_ == kCreated) {
- message_loop()->AddDestructionObserver(this);
state_ = kInitDataSource;
CreateDataSource();
return;
@@ -446,99 +437,77 @@ void PipelineThread::StartTask() {
}
state_ = kStarted;
- pipeline_->initialized_ = true;
filter_factory_ = NULL;
- if (init_callback_.get()) {
- init_callback_->Run(true);
- init_callback_.reset();
+ if (start_callback_.get()) {
+ start_callback_->Run(true);
+ start_callback_.reset();
}
}
}
// This method is called as a result of the client calling Pipeline::Stop() or
// as the result of an error condition. If there is no error, then set the
-// pipeline's error_ member to PIPELINE_STOPPING. We stop the filters in the
+// pipeline's |error_| member to PIPELINE_STOPPING. We stop the filters in the
// reverse order.
//
// TODO(scherkus): beware! this can get posted multiple times since we post
// Stop() tasks even if we've already stopped. Perhaps this should no-op for
// additional calls, however most of this logic will be changing.
-void PipelineThread::StopTask() {
- DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id());
+void PipelineInternal::StopTask(PipelineCallback* stop_callback) {
+ DCHECK_EQ(MessageLoop::current(), message_loop_);
+ stop_callback_.reset(stop_callback);
- if (IsPipelineInitializing()) {
- // If IsPipelineOk() is true, the pipeline was simply stopped during
- // initialization. Otherwise it is a failure.
- state_ = IsPipelineOk() ? kStopped : kError;
- filter_factory_ = NULL;
- if (init_callback_.get()) {
- init_callback_->Run(false);
- init_callback_.reset();
- }
- } else {
- state_ = kStopped;
+ // If we've already stopped, return immediately.
+ if (state_ == kStopped) {
+ return;
}
- if (IsPipelineOk()) {
- pipeline_->error_ = PIPELINE_STOPPING;
- }
+ // Carry out setting the error, notifying the client and destroying filters.
+ ErrorTask(PIPELINE_STOPPING);
- // Stop every filter.
- for (FilterHostVector::iterator iter = filter_hosts_.begin();
- iter != filter_hosts_.end();
- ++iter) {
- (*iter)->Stop();
- }
+ // We no longer need to examine our previous state, set it to stopped.
+ state_ = kStopped;
- // Figure out how many threads we have to stop.
- //
- // TODO(scherkus): remove the workaround for the "multiple StopTask()" issue.
- FilterThreadVector running_threads;
- for (FilterThreadVector::iterator iter = filter_threads_.begin();
- iter != filter_threads_.end();
- ++iter) {
- if ((*iter)->IsRunning()) {
- running_threads.push_back(*iter);
- }
+ // Reset the pipeline and set our reference to NULL so we don't accidentally
+ // modify the pipeline. Once remaining tasks execute we will be destroyed.
+ pipeline_->ResetState();
+ pipeline_ = NULL;
+
+ // Notify the client that stopping has finished.
+ if (stop_callback_.get()) {
+ stop_callback_->Run(true);
+ stop_callback_.reset();
}
+}
- // Crude blocking counter implementation.
- Lock lock;
- ConditionVariable wait_for_zero(&lock);
- int count = running_threads.size();
+void PipelineInternal::ErrorTask(PipelineError error) {
+ DCHECK_EQ(MessageLoop::current(), message_loop_);
+ DCHECK_NE(PIPELINE_OK, error) << "PIPELINE_OK isn't an error!";
- // Post a task to every filter's thread to ensure that they've completed their
- // stopping logic before stopping the threads themselves.
- //
- // TODO(scherkus): again, Stop() should either be synchronous or we should
- // receive a signal from filters that they have indeed stopped.
- for (FilterThreadVector::iterator iter = running_threads.begin();
- iter != running_threads.end();
- ++iter) {
- (*iter)->message_loop()->PostTask(FROM_HERE,
- NewRunnableFunction(&DecrementCounter, &lock, &wait_for_zero, &count));
+ // Suppress executing additional error logic.
+ if (state_ == kError) {
+ return;
}
- // Wait on our "blocking counter".
- {
- AutoLock auto_lock(lock);
- while (count > 0) {
- wait_for_zero.Wait();
- }
- }
+ // Update our error code first in case we execute the start callback.
+ pipeline_->error_ = error;
- // Stop every running filter thread.
- //
- // TODO(scherkus): can we watchdog this section to detect wedged threads?
- for (FilterThreadVector::iterator iter = running_threads.begin();
- iter != running_threads.end();
- ++iter) {
- (*iter)->Stop();
+ // Notify the client that starting did not complete, if necessary.
+ if (IsPipelineInitializing() && start_callback_.get()) {
+ start_callback_->Run(false);
}
+ start_callback_.reset();
+ filter_factory_ = NULL;
+
+ // We no longer need to examine our previous state, set it to stopped.
+ state_ = kError;
+
+ // Destroy every filter and reset the pipeline as well.
+ DestroyFilters();
}
-void PipelineThread::SetPlaybackRateTask(float rate) {
- DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id());
+void PipelineInternal::SetPlaybackRateTask(float rate) {
+ DCHECK_EQ(MessageLoop::current(), message_loop_);
pipeline_->InternalSetPlaybackRate(rate);
for (FilterHostVector::iterator iter = filter_hosts_.begin();
@@ -548,9 +517,10 @@ void PipelineThread::SetPlaybackRateTask(float rate) {
}
}
-void PipelineThread::SeekTask(base::TimeDelta time,
- PipelineCallback* seek_callback) {
- DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id());
+void PipelineInternal::SeekTask(base::TimeDelta time,
+ PipelineCallback* seek_callback) {
+ DCHECK_EQ(MessageLoop::current(), message_loop_);
+ seek_callback_.reset(seek_callback);
for (FilterHostVector::iterator iter = filter_hosts_.begin();
iter != filter_hosts_.end();
@@ -566,14 +536,14 @@ void PipelineThread::SeekTask(base::TimeDelta time,
// immediately here or we set time and do callback when we have new
// frames/packets.
SetTime(time);
- if (seek_callback) {
- seek_callback->Run(true);
- delete seek_callback;
+ if (seek_callback_.get()) {
+ seek_callback_->Run(true);
+ seek_callback_.reset();
}
}
-void PipelineThread::SetVolumeTask(float volume) {
- DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id());
+void PipelineInternal::SetVolumeTask(float volume) {
+ DCHECK_EQ(MessageLoop::current(), message_loop_);
pipeline_->volume_ = volume;
scoped_refptr<AudioRenderer> audio_renderer;
@@ -584,44 +554,46 @@ void PipelineThread::SetVolumeTask(float volume) {
}
template <class Filter, class Source>
-void PipelineThread::CreateFilter(FilterFactory* filter_factory,
- Source source,
- const MediaFormat& media_format) {
- DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id());
+void PipelineInternal::CreateFilter(FilterFactory* filter_factory,
+ Source source,
+ const MediaFormat& media_format) {
+ DCHECK_EQ(MessageLoop::current(), message_loop_);
DCHECK(IsPipelineOk());
+ // Create the filter.
scoped_refptr<Filter> filter = filter_factory->Create<Filter>(media_format);
if (!filter) {
Error(PIPELINE_ERROR_REQUIRED_FILTER_MISSING);
- } else {
- scoped_ptr<FilterHostImpl> host(new FilterHostImpl(this, filter.get()));
- // Create a dedicated thread for this filter.
- if (SupportsSetMessageLoop<Filter>()) {
- // TODO(scherkus): figure out a way to name these threads so it matches
- // the filter type.
- scoped_ptr<base::Thread> thread(new base::Thread("FilterThread"));
- if (!thread.get() || !thread->Start()) {
- NOTREACHED() << "Could not start filter thread";
- Error(PIPELINE_ERROR_INITIALIZATION_FAILED);
- } else {
- filter->set_message_loop(thread->message_loop());
- filter_threads_.push_back(thread.release());
- }
- }
+ return;
+ }
- // Creating a thread could have failed, verify we're still OK.
- if (IsPipelineOk()) {
- filter_hosts_.push_back(host.get());
- filter->set_host(host.release());
- if (!filter->Initialize(source)) {
- Error(PIPELINE_ERROR_INITIALIZATION_FAILED);
- }
+ // Create a dedicated thread for this filter if applicable.
+ if (SupportsSetMessageLoop<Filter>()) {
+ scoped_ptr<base::Thread> thread(new base::Thread(GetThreadName<Filter>()));
+ if (!thread.get() || !thread->Start()) {
+ NOTREACHED() << "Could not start filter thread";
+ Error(PIPELINE_ERROR_INITIALIZATION_FAILED);
+ return;
}
+
+ filter->set_message_loop(thread->message_loop());
+ filter_threads_.push_back(thread.release());
+ }
+
+ // Create the filter's host.
+ DCHECK(IsPipelineOk());
+ scoped_ptr<FilterHostImpl> host(new FilterHostImpl(this, filter.get()));
+ filter->set_host(host.get());
+ filter_hosts_.push_back(host.release());
+
+ // Now initialize the filter.
+ if (!filter->Initialize(source)) {
+ Error(PIPELINE_ERROR_INITIALIZATION_FAILED);
}
}
-void PipelineThread::CreateDataSource() {
- DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id());
+void PipelineInternal::CreateDataSource() {
+ DCHECK_EQ(MessageLoop::current(), message_loop_);
DCHECK(IsPipelineOk());
MediaFormat url_format;
@@ -630,8 +602,8 @@ void PipelineThread::CreateDataSource() {
CreateFilter<DataSource>(filter_factory_, url_, url_format);
}
-void PipelineThread::CreateDemuxer() {
- DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id());
+void PipelineInternal::CreateDemuxer() {
+ DCHECK_EQ(MessageLoop::current(), message_loop_);
DCHECK(IsPipelineOk());
scoped_refptr<DataSource> data_source;
@@ -641,8 +613,8 @@ void PipelineThread::CreateDemuxer() {
}
template <class Decoder>
-bool PipelineThread::CreateDecoder() {
- DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id());
+bool PipelineInternal::CreateDecoder() {
+ DCHECK_EQ(MessageLoop::current(), message_loop_);
DCHECK(IsPipelineOk());
scoped_refptr<Demuxer> demuxer;
@@ -664,8 +636,8 @@ bool PipelineThread::CreateDecoder() {
}
template <class Decoder, class Renderer>
-bool PipelineThread::CreateRenderer() {
- DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id());
+bool PipelineInternal::CreateRenderer() {
+ DCHECK_EQ(MessageLoop::current(), message_loop_);
DCHECK(IsPipelineOk());
scoped_refptr<Decoder> decoder;
@@ -681,8 +653,8 @@ bool PipelineThread::CreateRenderer() {
}
template <class Filter>
-void PipelineThread::GetFilter(scoped_refptr<Filter>* filter_out) const {
- DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id());
+void PipelineInternal::GetFilter(scoped_refptr<Filter>* filter_out) const {
+ DCHECK_EQ(MessageLoop::current(), message_loop_);
*filter_out = NULL;
for (FilterHostVector::const_iterator iter = filter_hosts_.begin();
@@ -692,4 +664,53 @@ void PipelineThread::GetFilter(scoped_refptr<Filter>* filter_out) const {
}
}
+void PipelineInternal::DestroyFilters() {
+ // Stop every filter.
+ for (FilterHostVector::iterator iter = filter_hosts_.begin();
+ iter != filter_hosts_.end();
+ ++iter) {
+ (*iter)->Stop();
+ }
+
+ // Crude blocking counter implementation.
+ Lock lock;
+ ConditionVariable wait_for_zero(&lock);
+ int count = filter_threads_.size();
+
+ // Post a task to every filter's thread to ensure that they've completed their
+ // stopping logic before stopping the threads themselves.
+ //
+ // TODO(scherkus): again, Stop() should either be synchronous or we should
+ // receive a signal from filters that they have indeed stopped.
+ for (FilterThreadVector::iterator iter = filter_threads_.begin();
+ iter != filter_threads_.end();
+ ++iter) {
+ (*iter)->message_loop()->PostTask(FROM_HERE,
+ NewRunnableFunction(&DecrementCounter, &lock, &wait_for_zero, &count));
+ }
+
+ // Wait on our "blocking counter".
+ {
+ AutoLock auto_lock(lock);
+ while (count > 0) {
+ wait_for_zero.Wait();
+ }
+ }
+
+ // Stop every running filter thread.
+ //
+ // TODO(scherkus): can we watchdog this section to detect wedged threads?
+ for (FilterThreadVector::iterator iter = filter_threads_.begin();
+ iter != filter_threads_.end();
+ ++iter) {
+ (*iter)->Stop();
+ }
+
+ // Reset the pipeline, which will decrement a reference to this object.
+ // We will get destroyed as soon as the remaining tasks finish executing.
+ // To be safe, we'll set our pipeline reference to NULL.
+ STLDeleteElements(&filter_hosts_);
+ STLDeleteElements(&filter_threads_);
+}
+
} // namespace media
diff --git a/media/base/pipeline_impl.h b/media/base/pipeline_impl.h
index a8e9e09..2d6b988 100644
--- a/media/base/pipeline_impl.h
+++ b/media/base/pipeline_impl.h
@@ -20,24 +20,25 @@
namespace media {
class FilterHostImpl;
-class PipelineThread;
+class PipelineInternal;
// Class which implements the Media::Pipeline contract. The majority of the
-// actual code for this object lives in the PipelineThread class, which is
+// actual code for this object lives in the PipelineInternal class, which is
// responsible for actually building and running the pipeline. This object
// is basically a simple container for state information, and is responsible
-// for creating and communicating with the PipelineThread object.
+// for creating and communicating with the PipelineInternal object.
class PipelineImpl : public Pipeline {
public:
- PipelineImpl();
+ PipelineImpl(MessageLoop* message_loop);
virtual ~PipelineImpl();
// Pipeline implementation.
virtual bool Start(FilterFactory* filter_factory,
const std::string& uri,
PipelineCallback* start_callback);
- virtual void Stop();
+ virtual void Stop(PipelineCallback* stop_callback);
virtual void Seek(base::TimeDelta time, PipelineCallback* seek_callback);
+ virtual bool IsRunning() const;
virtual bool IsInitialized() const;
virtual bool IsRendered(const std::string& major_mime_type) const;
virtual float GetPlaybackRate() const;
@@ -54,7 +55,7 @@ class PipelineImpl : public Pipeline {
private:
friend class FilterHostImpl;
- friend class PipelineThread;
+ friend class PipelineInternal;
// Reset the state of the pipeline object to the initial state. This method
// is used by the constructor, and the Stop method.
@@ -65,10 +66,6 @@ class PipelineImpl : public Pipeline {
// must not be an error.
bool IsPipelineOk() const;
- // Returns true if we're currently executing on the pipeline thread. Mostly
- // used in DCHECKs.
- bool IsPipelineThread() const;
-
// Methods called by FilterHostImpl to update pipeline state.
void SetDuration(base::TimeDelta duration);
void SetBufferedTime(base::TimeDelta buffered_time);
@@ -83,14 +80,17 @@ class PipelineImpl : public Pipeline {
// alone, and returns false.
bool InternalSetError(PipelineError error);
- // Method called by the |pipeline_thread_| to insert a mime type into
+ // Method called by the |pipeline_internal_| to insert a mime type into
// the |rendered_mime_types_| set.
void InsertRenderedMimeType(const std::string& major_mime_type);
- // Holds a ref counted reference to the PipelineThread object associated
- // with this pipeline. Prior to the call to the Start method, this member
- // will be NULL, since no thread is running.
- scoped_refptr<PipelineThread> pipeline_thread_;
+ // Message loop used to execute pipeline tasks.
+ MessageLoop* message_loop_;
+
+ // Holds a ref counted reference to the PipelineInternal object associated
+ // with this pipeline. Prior to the call to the Start() method, this member
+ // will be NULL, since we are not running.
+ scoped_refptr<PipelineInternal> pipeline_internal_;
// After calling Start, if all of the required filters are created and
// initialized, this member will be set to true by the pipeline thread.
@@ -124,14 +124,14 @@ class PipelineImpl : public Pipeline {
// Current volume level (from 0.0f to 1.0f). The volume reflects the last
// value the audio filter was called with SetVolume, so there will be a short
// period of time between the client calling SetVolume on the pipeline and
- // this value being updated. Set by the PipelineThread just prior to calling
- // the audio renderer.
+ // this value being updated. Set by the PipelineInternal just prior to
+ // calling the audio renderer.
float volume_;
// Current playback rate (>= 0.0f). This member reflects the last value
// that the filters in the pipeline were called with, so there will be a short
// period of time between the client calling SetPlaybackRate and this value
- // being updated. Set by the PipelineThread just prior to calling filters.
+ // being updated. Set by the PipelineInternal just prior to calling filters.
float playback_rate_;
// Current playback time. Set by a FilterHostImpl object on behalf of the
@@ -152,12 +152,12 @@ class PipelineImpl : public Pipeline {
};
-// The PipelineThread contains most of the logic involved with running the
-// media pipeline. Filters are created and called on a dedicated thread owned
-// by this object. This object works like a state machine to perform
-// asynchronous initialization. Initialization is done in multiple passes in
-// StartTask(). In each pass a different filter is created and chained with a
-// previously created filter.
+// PipelineInternal contains most of the logic involved with running the
+// media pipeline. Filters are created and called on the message loop injected
+// into this object. PipelineInternal works like a state machine to perform
+// asynchronous initialization. Initialization is done in multiple passes by
+// InitializeTask(). In each pass a different filter is created and chained with
+// a previously created filter.
//
// Here's a state diagram that describes the lifetime of this object.
//
@@ -172,21 +172,20 @@ class PipelineImpl : public Pipeline {
// transition to the "Error" state from any state. If Stop() is called during
// initialization, this object will transition to "Stopped" state.
-class PipelineThread : public base::RefCountedThreadSafe<PipelineThread>,
- public MessageLoop::DestructionObserver {
+class PipelineInternal : public base::RefCountedThreadSafe<PipelineInternal> {
public:
// Methods called by PipelineImpl object on the client's thread. These
// methods post a task to call a corresponding xxxTask() method on the
- // pipeline thread. For example, Seek posts a task to call SeekTask.
- explicit PipelineThread(PipelineImpl* pipeline);
+ // message loop. For example, Seek posts a task to call SeekTask.
+ explicit PipelineInternal(PipelineImpl* pipeline, MessageLoop* message_loop);
// After Start() is called, a task of StartTask() is posted on the pipeline
// thread to perform initialization. See StartTask() to learn more about
// initialization.
- bool Start(FilterFactory* filter_factory,
+ void Start(FilterFactory* filter_factory,
const std::string& url_media_source,
- PipelineCallback* init_complete_callback);
- void Stop();
+ PipelineCallback* start_callback);
+ void Stop(PipelineCallback* stop_callback);
void Seek(base::TimeDelta time, PipelineCallback* seek_callback);
void SetPlaybackRate(float rate);
void SetVolume(float volume);
@@ -209,19 +208,18 @@ class PipelineThread : public base::RefCountedThreadSafe<PipelineThread>,
// Simple accessor used by the FilterHostImpl class to get access to the
// pipeline object.
+ //
+ // TODO(scherkus): I think FilterHostImpl should not be talking to
+ // PipelineImpl but rather PipelineInternal.
PipelineImpl* pipeline() const { return pipeline_; }
- // Accessor used to post messages to thread's message loop.
- MessageLoop* message_loop() const { return thread_.message_loop(); }
-
- // Accessor used by PipelineImpl to check if we're executing on the pipeline
- // thread.
- PlatformThreadId thread_id() const { return thread_.thread_id(); }
+ // Returns true if the pipeline has fully initialized.
+ bool IsInitialized() { return state_ == kStarted; }
private:
// Only allow ourselves to be destroyed via ref-counting.
- friend class base::RefCountedThreadSafe<PipelineThread>;
- virtual ~PipelineThread();
+ friend class base::RefCountedThreadSafe<PipelineInternal>;
+ virtual ~PipelineInternal();
enum State {
kCreated,
@@ -249,23 +247,29 @@ class PipelineThread : public base::RefCountedThreadSafe<PipelineThread>,
state_ == kInitVideoRenderer;
}
- // Implementation of MessageLoop::DestructionObserver. StartTask registers
- // this class as a destruction observer on the thread's message loop.
- // It is used to destroy the list of FilterHosts
- // (and thus destroy the associated filters) when all tasks have been
- // processed and the message loop has been quit.
- virtual void WillDestroyCurrentMessageLoop();
-
// The following "task" methods correspond to the public methods, but these
- // methods are run as the result of posting a task to the PipelineThread's
+ // methods are run as the result of posting a task to the PipelineInternal's
// message loop.
+ void StartTask(FilterFactory* filter_factory,
+ const std::string& url,
+ PipelineCallback* start_callback);
+
+ // InitializeTask() performs initialization in multiple passes. It is executed
+ // as a result of calling Start() or InitializationComplete() that advances
+ // initialization to the next state. It works as a hub of state transition for
+ // initialization.
+ void InitializeTask();
+
+ // StopTask() and ErrorTask() are similar but serve different purposes:
+ // - Both destroy the filter chain.
+ // - Both will execute |start_callback| if the pipeline was initializing.
+ // - StopTask() resets the pipeline to a fresh state, where as ErrorTask()
+ // leaves the pipeline as is for client inspection.
+ // - StopTask() can be scheduled by the client calling Stop(), where as
+ // ErrorTask() is scheduled as a result of a filter calling Error().
+ void StopTask(PipelineCallback* stop_callback);
+ void ErrorTask(PipelineError error);
- // StartTask() is a special task that performs initialization in multiple
- // passes. It is executed as a result of calling Start() or
- // InitializationComplete() that advances initialization to the next state. It
- // works as a hub of state transition for initialization.
- void StartTask();
- void StopTask();
void SetPlaybackRateTask(float rate);
void SeekTask(base::TimeDelta time, PipelineCallback* seek_callback);
void SetVolumeTask(float volume);
@@ -302,8 +306,8 @@ class PipelineThread : public base::RefCountedThreadSafe<PipelineThread>,
Source source,
const MediaFormat& source_media_format);
- // Creates a Filter and initilizes it with the given |source|. If a Filter
- // could not be created or an error occurred, this metod returns NULL and the
+ // Creates a Filter and initializes it with the given |source|. If a Filter
+ // could not be created or an error occurred, this method returns NULL and the
// pipeline's |error_| member will contain a specific error code. Note that
// the Source could be a filter or a DemuxerStream, but it must support the
// GetMediaFormat() method.
@@ -339,16 +343,15 @@ class PipelineThread : public base::RefCountedThreadSafe<PipelineThread>,
template <class Filter>
void GetFilter(scoped_refptr<Filter>* filter_out) const;
- // Pointer to the pipeline that owns this PipelineThread.
- PipelineImpl* const pipeline_;
+ // Stops every filters, filter host and filter thread and releases all
+ // references to them.
+ void DestroyFilters();
- // The actual thread.
- base::Thread thread_;
+ // Pointer to the pipeline that owns this PipelineInternal.
+ PipelineImpl* pipeline_;
- // Used to avoid scheduling multiple time update tasks. If this member is
- // true then a task that will call the SetTimeTask() method is in the message
- // loop's queue.
- bool time_update_callback_scheduled_;
+ // Message loop used to execute pipeline tasks.
+ MessageLoop* message_loop_;
// Member that tracks the current state.
State state_;
@@ -359,17 +362,19 @@ class PipelineThread : public base::RefCountedThreadSafe<PipelineThread>,
// URL for the data source as passed in by Start().
std::string url_;
- // Initialization callback as passed in by Start().
- scoped_ptr<PipelineCallback> init_callback_;
+ // Callbacks for various pipeline operations.
+ scoped_ptr<PipelineCallback> start_callback_;
+ scoped_ptr<PipelineCallback> seek_callback_;
+ scoped_ptr<PipelineCallback> stop_callback_;
- // Vector of FilterHostImpl objects that contian the filters for the pipeline.
+ // Vector of FilterHostImpl objects that contain the filters for the pipeline.
typedef std::vector<FilterHostImpl*> FilterHostVector;
FilterHostVector filter_hosts_;
typedef std::vector<base::Thread*> FilterThreadVector;
FilterThreadVector filter_threads_;
- DISALLOW_COPY_AND_ASSIGN(PipelineThread);
+ DISALLOW_COPY_AND_ASSIGN(PipelineInternal);
};
} // namespace media
diff --git a/media/base/pipeline_impl_unittest.cc b/media/base/pipeline_impl_unittest.cc
index 4183126..1a550eb 100644
--- a/media/base/pipeline_impl_unittest.cc
+++ b/media/base/pipeline_impl_unittest.cc
@@ -14,45 +14,53 @@
#include "testing/gtest/include/gtest/gtest.h"
using ::testing::DoAll;
+using ::testing::Mock;
using ::testing::Return;
using ::testing::StrictMock;
namespace media {
-typedef std::vector<MockDemuxerStream*> MockDemuxerStreamVector;
+// Used for setting expectations on pipeline callbacks. Using a StrictMock
+// also lets us test for missing callbacks.
+class CallbackHelper {
+ public:
+ CallbackHelper() {}
+ virtual ~CallbackHelper() {}
+
+ MOCK_METHOD1(OnInitialize, void(bool result));
+ MOCK_METHOD1(OnSeek, void(bool result));
+ MOCK_METHOD1(OnStop, void(bool result));
+
+ private:
+ DISALLOW_COPY_AND_ASSIGN(CallbackHelper);
+};
+// TODO(scherkus): even though some filters are initialized on separate
+// threads these test aren't flaky... why? It's because filters' Initialize()
+// is executed on |message_loop_| and the mock filters instantly call
+// InitializationComplete(), which keeps the pipeline humming along. If
+// either filters don't call InitializationComplete() immediately or filter
+// initialization is moved to a separate thread this test will become flaky.
class PipelineImplTest : public ::testing::Test {
public:
PipelineImplTest()
- : mocks_(new MockFilterFactory()),
- initialize_result_(false),
- seek_result_(false),
- initialize_event_(false, false),
- seek_event_(false, false) {
+ : pipeline_(&message_loop_),
+ mocks_(new MockFilterFactory()) {
}
virtual ~PipelineImplTest() {
- // Force the pipeline to shut down its thread.
- pipeline_.Stop();
- }
-
- protected:
- // Called by tests after they have finished setting up MockFilterConfig.
- // Initializes the pipeline and returns true if the initialization callback
- // was executed, false otherwise.
- bool InitializeAndWait() {
- pipeline_.Start(mocks_, "",
- NewCallback(this, &PipelineImplTest::OnInitialize));
- return initialize_event_.TimedWait(base::TimeDelta::FromMilliseconds(500));
- }
+ if (!pipeline_.IsRunning()) {
+ return;
+ }
- // Issues a seek on the pipeline and returns true if the seek callback was
- // executed, false otherwise.
- bool SeekAndWait(const base::TimeDelta& time) {
- pipeline_.Seek(time, NewCallback(this, &PipelineImplTest::OnSeek));
- return seek_event_.TimedWait(base::TimeDelta::FromMilliseconds(500));
+ // Expect a stop callback if we were started.
+ EXPECT_CALL(callbacks_, OnStop(true));
+ pipeline_.Stop(NewCallback(reinterpret_cast<CallbackHelper*>(&callbacks_),
+ &CallbackHelper::OnStop));
+ message_loop_.RunAllPending();
}
+ protected:
// Sets up expectations to allow the data source to initialize.
void InitializeDataSource() {
EXPECT_CALL(*mocks_->data_source(), Initialize(""))
@@ -62,6 +70,7 @@ class PipelineImplTest : public ::testing::Test {
}
// Sets up expectations to allow the demuxer to initialize.
+ typedef std::vector<MockDemuxerStream*> MockDemuxerStreamVector;
void InitializeDemuxer(MockDemuxerStreamVector* streams) {
EXPECT_CALL(*mocks_->demuxer(), Initialize(mocks_->data_source()))
.WillOnce(DoAll(InitializationComplete(mocks_->demuxer()),
@@ -110,27 +119,24 @@ class PipelineImplTest : public ::testing::Test {
EXPECT_CALL(*mocks_->audio_renderer(), Stop());
}
+ // Sets up expectations on the callback and initializes the pipeline. Called
+ // afters tests have set expectations any filters they wish to use.
+ void InitializePipeline(bool callback_result) {
+ // Expect an initialization callback.
+ EXPECT_CALL(callbacks_, OnInitialize(callback_result));
+ pipeline_.Start(mocks_, "",
+ NewCallback(reinterpret_cast<CallbackHelper*>(&callbacks_),
+ &CallbackHelper::OnInitialize));
+ message_loop_.RunAllPending();
+ }
+
// Fixture members.
- media::PipelineImpl pipeline_;
+ StrictMock<CallbackHelper> callbacks_;
+ MessageLoop message_loop_;
+ PipelineImpl pipeline_;
scoped_refptr<media::MockFilterFactory> mocks_;
- bool initialize_result_;
- bool seek_result_;
private:
- void OnInitialize(bool result) {
- initialize_result_ = result;
- initialize_event_.Signal();
- }
-
- void OnSeek(bool result) {
- seek_result_ = result;
- seek_event_.Signal();
- }
-
- // Used to wait for callbacks.
- base::WaitableEvent initialize_event_;
- base::WaitableEvent seek_event_;
-
DISALLOW_COPY_AND_ASSIGN(PipelineImplTest);
};
@@ -140,20 +146,29 @@ TEST_F(PipelineImplTest, NeverInitializes) {
EXPECT_CALL(*mocks_->data_source(), Stop());
// This test hangs during initialization by never calling
- // InitializationComplete(). Make sure we tear down the pipeline properly.
- ASSERT_FALSE(InitializeAndWait());
- EXPECT_FALSE(initialize_result_);
+ // InitializationComplete(). StrictMock<> will ensure that the callback is
+ // never executed.
+ pipeline_.Start(mocks_, "",
+ NewCallback(reinterpret_cast<CallbackHelper*>(&callbacks_),
+ &CallbackHelper::OnInitialize));
+ message_loop_.RunAllPending();
+
EXPECT_FALSE(pipeline_.IsInitialized());
- EXPECT_EQ(media::PIPELINE_OK, pipeline_.GetError());
+ EXPECT_EQ(PIPELINE_OK, pipeline_.GetError());
+
+ // Because our callback will get executed when the test tears down, we'll
+ // verify that nothing has been called, then set our expectation for the call
+ // made during tear down.
+ Mock::VerifyAndClear(&callbacks_);
+ EXPECT_CALL(callbacks_, OnInitialize(false));
}
TEST_F(PipelineImplTest, RequiredFilterMissing) {
mocks_->set_creation_successful(false);
- ASSERT_TRUE(InitializeAndWait());
- EXPECT_FALSE(initialize_result_);
+ InitializePipeline(false);
EXPECT_FALSE(pipeline_.IsInitialized());
- EXPECT_EQ(media::PIPELINE_ERROR_REQUIRED_FILTER_MISSING,
+ EXPECT_EQ(PIPELINE_ERROR_REQUIRED_FILTER_MISSING,
pipeline_.GetError());
}
@@ -164,10 +179,9 @@ TEST_F(PipelineImplTest, URLNotFound) {
Return(false)));
EXPECT_CALL(*mocks_->data_source(), Stop());
- ASSERT_TRUE(InitializeAndWait());
- EXPECT_FALSE(initialize_result_);
+ InitializePipeline(false);
EXPECT_FALSE(pipeline_.IsInitialized());
- EXPECT_EQ(media::PIPELINE_ERROR_URL_NOT_FOUND, pipeline_.GetError());
+ EXPECT_EQ(PIPELINE_ERROR_URL_NOT_FOUND, pipeline_.GetError());
}
TEST_F(PipelineImplTest, NoStreams) {
@@ -175,10 +189,9 @@ TEST_F(PipelineImplTest, NoStreams) {
InitializeDataSource();
InitializeDemuxer(&streams);
- ASSERT_TRUE(InitializeAndWait());
- EXPECT_FALSE(initialize_result_);
+ InitializePipeline(false);
EXPECT_FALSE(pipeline_.IsInitialized());
- EXPECT_EQ(media::PIPELINE_ERROR_COULD_NOT_RENDER, pipeline_.GetError());
+ EXPECT_EQ(PIPELINE_ERROR_COULD_NOT_RENDER, pipeline_.GetError());
}
TEST_F(PipelineImplTest, AudioStream) {
@@ -192,10 +205,9 @@ TEST_F(PipelineImplTest, AudioStream) {
InitializeAudioDecoder(stream);
InitializeAudioRenderer();
- ASSERT_TRUE(InitializeAndWait());
- EXPECT_TRUE(initialize_result_);
+ InitializePipeline(true);
EXPECT_TRUE(pipeline_.IsInitialized());
- EXPECT_EQ(media::PIPELINE_OK, pipeline_.GetError());
+ EXPECT_EQ(PIPELINE_OK, pipeline_.GetError());
EXPECT_TRUE(pipeline_.IsRendered(media::mime_type::kMajorTypeAudio));
EXPECT_FALSE(pipeline_.IsRendered(media::mime_type::kMajorTypeVideo));
}
@@ -211,10 +223,9 @@ TEST_F(PipelineImplTest, VideoStream) {
InitializeVideoDecoder(stream);
InitializeVideoRenderer();
- ASSERT_TRUE(InitializeAndWait());
- EXPECT_TRUE(initialize_result_);
+ InitializePipeline(true);
EXPECT_TRUE(pipeline_.IsInitialized());
- EXPECT_EQ(media::PIPELINE_OK, pipeline_.GetError());
+ EXPECT_EQ(PIPELINE_OK, pipeline_.GetError());
EXPECT_FALSE(pipeline_.IsRendered(media::mime_type::kMajorTypeAudio));
EXPECT_TRUE(pipeline_.IsRendered(media::mime_type::kMajorTypeVideo));
}
@@ -235,10 +246,9 @@ TEST_F(PipelineImplTest, AudioVideoStream) {
InitializeVideoDecoder(video_stream);
InitializeVideoRenderer();
- ASSERT_TRUE(InitializeAndWait());
- EXPECT_TRUE(initialize_result_);
+ InitializePipeline(true);
EXPECT_TRUE(pipeline_.IsInitialized());
- EXPECT_EQ(media::PIPELINE_OK, pipeline_.GetError());
+ EXPECT_EQ(PIPELINE_OK, pipeline_.GetError());
EXPECT_TRUE(pipeline_.IsRendered(media::mime_type::kMajorTypeAudio));
EXPECT_TRUE(pipeline_.IsRendered(media::mime_type::kMajorTypeVideo));
}
@@ -268,10 +278,15 @@ TEST_F(PipelineImplTest, Seek) {
EXPECT_CALL(*mocks_->video_decoder(), Seek(expected));
EXPECT_CALL(*mocks_->video_renderer(), Seek(expected));
+ // We expect a successful seek callback.
+ EXPECT_CALL(callbacks_, OnSeek(true));
+
// Initialize then seek!
- ASSERT_TRUE(InitializeAndWait());
- EXPECT_TRUE(SeekAndWait(expected));
- EXPECT_TRUE(seek_result_);
+ InitializePipeline(true);
+ pipeline_.Seek(expected,
+ NewCallback(reinterpret_cast<CallbackHelper*>(&callbacks_),
+ &CallbackHelper::OnSeek));
+ message_loop_.RunAllPending();
}
TEST_F(PipelineImplTest, SetVolume) {
@@ -290,9 +305,8 @@ TEST_F(PipelineImplTest, SetVolume) {
EXPECT_CALL(*mocks_->audio_renderer(), SetVolume(expected));
// Initialize then set volume!
- ASSERT_TRUE(InitializeAndWait());
+ InitializePipeline(true);
pipeline_.SetVolume(expected);
}
} // namespace media
-
diff --git a/media/player/movie.cc b/media/player/movie.cc
index e37a497..2bb4001 100644
--- a/media/player/movie.cc
+++ b/media/player/movie.cc
@@ -79,7 +79,9 @@ bool Movie::Open(const wchar_t* url, WtlVideoRenderer* video_renderer) {
factories->AddFactory(
new media::InstanceFilterFactory<WtlVideoRenderer>(video_renderer));
- pipeline_.reset(new PipelineImpl());
+ thread_.reset(new base::Thread("PipelineThread"));
+ thread_->Start();
+ pipeline_.reset(new PipelineImpl(thread_->message_loop()));
// Create and start our pipeline.
pipeline_->Start(factories.get(), WideToUTF8(std::wstring(url)), NULL);
@@ -194,8 +196,10 @@ bool Movie::GetOpenMpEnable() {
// Teardown.
void Movie::Close() {
if (pipeline_.get()) {
- pipeline_->Stop();
+ pipeline_->Stop(NULL);
+ thread_->Stop();
pipeline_.reset();
+ thread_.reset();
}
}
diff --git a/media/player/movie.h b/media/player/movie.h
index fdaa2b5..eb23822 100644
--- a/media/player/movie.h
+++ b/media/player/movie.h
@@ -11,6 +11,7 @@
#include "base/scoped_ptr.h"
#include "base/singleton.h"
+#include "base/thread.h"
class WtlVideoRenderer;
@@ -90,6 +91,7 @@ class Movie : public Singleton<Movie> {
virtual ~Movie();
scoped_ptr<PipelineImpl> pipeline_;
+ scoped_ptr<base::Thread> thread_;
bool enable_audio_;
bool enable_swscaler_;
diff --git a/webkit/glue/webmediaplayer_impl.cc b/webkit/glue/webmediaplayer_impl.cc
index 452a9e6..9f5702f 100644
--- a/webkit/glue/webmediaplayer_impl.cc
+++ b/webkit/glue/webmediaplayer_impl.cc
@@ -143,7 +143,7 @@ void WebMediaPlayerImpl::Proxy::PipelineInitializationCallback(bool success) {
ReadyStateChanged(WebKit::WebMediaPlayer::HaveEnoughData);
NetworkStateChanged(WebKit::WebMediaPlayer::Loaded);
} else {
- // TODO(hclam): should use pipeline_.GetError() to determine the state
+ // TODO(hclam): should use pipeline_->GetError() to determine the state
// properly and reports error using MediaError.
// WebKit uses FormatError to indicate an error for bogus URL or bad file.
// Since we are at the initialization stage we can safely treat every error
@@ -166,11 +166,19 @@ WebMediaPlayerImpl::WebMediaPlayerImpl(WebKit::WebMediaPlayerClient* client,
ready_state_(WebKit::WebMediaPlayer::HaveNothing),
main_loop_(NULL),
filter_factory_(factory),
+ pipeline_thread_("PipelineThread"),
client_(client) {
// Saves the current message loop.
DCHECK(!main_loop_);
main_loop_ = MessageLoop::current();
+ // Create the pipeline and its thread.
+ if (!pipeline_thread_.Start()) {
+ NOTREACHED() << "Could not start PipelineThread";
+ } else {
+ pipeline_.reset(new media::PipelineImpl(pipeline_thread_.message_loop()));
+ }
+
// Also we want to be notified of |main_loop_| destruction.
main_loop_->AddDestructionObserver(this);
@@ -202,7 +210,7 @@ void WebMediaPlayerImpl::load(const WebKit::WebURL& url) {
// Initialize the pipeline.
SetNetworkState(WebKit::WebMediaPlayer::Loading);
SetReadyState(WebKit::WebMediaPlayer::HaveNothing);
- pipeline_.Start(
+ pipeline_->Start(
filter_factory_.get(),
url.spec(),
NewCallback(proxy_.get(),
@@ -218,13 +226,13 @@ void WebMediaPlayerImpl::play() {
// TODO(hclam): We should restore the previous playback rate rather than
// having it at 1.0.
- pipeline_.SetPlaybackRate(1.0f);
+ pipeline_->SetPlaybackRate(1.0f);
}
void WebMediaPlayerImpl::pause() {
DCHECK(MessageLoop::current() == main_loop_);
- pipeline_.SetPlaybackRate(0.0f);
+ pipeline_->SetPlaybackRate(0.0f);
}
void WebMediaPlayerImpl::seek(float seconds) {
@@ -233,7 +241,7 @@ void WebMediaPlayerImpl::seek(float seconds) {
// Try to preserve as much accuracy as possible.
float microseconds = seconds * base::Time::kMicrosecondsPerSecond;
if (seconds != 0)
- pipeline_.Seek(
+ pipeline_->Seek(
base::TimeDelta::FromMicroseconds(static_cast<int64>(microseconds)),
NewCallback(proxy_.get(),
&WebMediaPlayerImpl::Proxy::PipelineSeekCallback));
@@ -249,13 +257,13 @@ void WebMediaPlayerImpl::setEndTime(float seconds) {
void WebMediaPlayerImpl::setRate(float rate) {
DCHECK(MessageLoop::current() == main_loop_);
- pipeline_.SetPlaybackRate(rate);
+ pipeline_->SetPlaybackRate(rate);
}
void WebMediaPlayerImpl::setVolume(float volume) {
DCHECK(MessageLoop::current() == main_loop_);
- pipeline_.SetVolume(volume);
+ pipeline_->SetVolume(volume);
}
void WebMediaPlayerImpl::setVisible(bool visible) {
@@ -274,14 +282,14 @@ bool WebMediaPlayerImpl::setAutoBuffer(bool autoBuffer) {
bool WebMediaPlayerImpl::totalBytesKnown() {
DCHECK(MessageLoop::current() == main_loop_);
- return pipeline_.GetTotalBytes() != 0;
+ return pipeline_->GetTotalBytes() != 0;
}
bool WebMediaPlayerImpl::hasVideo() const {
DCHECK(MessageLoop::current() == main_loop_);
size_t width, height;
- pipeline_.GetVideoSize(&width, &height);
+ pipeline_->GetVideoSize(&width, &height);
return width != 0 && height != 0;
}
@@ -289,14 +297,14 @@ WebKit::WebSize WebMediaPlayerImpl::naturalSize() const {
DCHECK(MessageLoop::current() == main_loop_);
size_t width, height;
- pipeline_.GetVideoSize(&width, &height);
+ pipeline_->GetVideoSize(&width, &height);
return WebKit::WebSize(width, height);
}
bool WebMediaPlayerImpl::paused() const {
DCHECK(MessageLoop::current() == main_loop_);
- return pipeline_.GetPlaybackRate() == 0.0f;
+ return pipeline_->GetPlaybackRate() == 0.0f;
}
bool WebMediaPlayerImpl::seeking() const {
@@ -308,13 +316,13 @@ bool WebMediaPlayerImpl::seeking() const {
float WebMediaPlayerImpl::duration() const {
DCHECK(MessageLoop::current() == main_loop_);
- return static_cast<float>(pipeline_.GetDuration().InSecondsF());
+ return static_cast<float>(pipeline_->GetDuration().InSecondsF());
}
float WebMediaPlayerImpl::currentTime() const {
DCHECK(MessageLoop::current() == main_loop_);
- return static_cast<float>(pipeline_.GetTime().InSecondsF());
+ return static_cast<float>(pipeline_->GetTime().InSecondsF());
}
int WebMediaPlayerImpl::dataRate() const {
@@ -327,32 +335,32 @@ int WebMediaPlayerImpl::dataRate() const {
float WebMediaPlayerImpl::maxTimeBuffered() const {
DCHECK(MessageLoop::current() == main_loop_);
- return static_cast<float>(pipeline_.GetBufferedTime().InSecondsF());
+ return static_cast<float>(pipeline_->GetBufferedTime().InSecondsF());
}
float WebMediaPlayerImpl::maxTimeSeekable() const {
DCHECK(MessageLoop::current() == main_loop_);
// TODO(scherkus): move this logic down into the pipeline.
- if (pipeline_.GetTotalBytes() == 0) {
+ if (pipeline_->GetTotalBytes() == 0) {
return 0.0f;
}
- double total_bytes = static_cast<double>(pipeline_.GetTotalBytes());
- double buffered_bytes = static_cast<double>(pipeline_.GetBufferedBytes());
- double duration = static_cast<double>(pipeline_.GetDuration().InSecondsF());
+ double total_bytes = static_cast<double>(pipeline_->GetTotalBytes());
+ double buffered_bytes = static_cast<double>(pipeline_->GetBufferedBytes());
+ double duration = static_cast<double>(pipeline_->GetDuration().InSecondsF());
return static_cast<float>(duration * (buffered_bytes / total_bytes));
}
unsigned long long WebMediaPlayerImpl::bytesLoaded() const {
DCHECK(MessageLoop::current() == main_loop_);
- return pipeline_.GetBufferedBytes();
+ return pipeline_->GetBufferedBytes();
}
unsigned long long WebMediaPlayerImpl::totalBytes() const {
DCHECK(MessageLoop::current() == main_loop_);
- return pipeline_.GetTotalBytes();
+ return pipeline_->GetTotalBytes();
}
void WebMediaPlayerImpl::setSize(const WebSize& size) {
@@ -407,9 +415,9 @@ void WebMediaPlayerImpl::Destroy() {
DCHECK(MessageLoop::current() == main_loop_);
// Make sure to kill the pipeline so there's no more media threads running.
- // TODO(hclam): stopping the pipeline is synchronous so it might block
- // stopping for a long time.
- pipeline_.Stop();
+ // TODO(hclam): stopping the pipeline might block for a long time.
+ pipeline_->Stop(NULL);
+ pipeline_thread_.Stop();
// And then detach the proxy, it may live on the render thread for a little
// longer until all the tasks are finished.
diff --git a/webkit/glue/webmediaplayer_impl.h b/webkit/glue/webmediaplayer_impl.h
index 604188c..b8a44b0 100644
--- a/webkit/glue/webmediaplayer_impl.h
+++ b/webkit/glue/webmediaplayer_impl.h
@@ -248,9 +248,9 @@ class WebMediaPlayerImpl : public WebKit::WebMediaPlayer,
// A collection of factories for creating filters.
scoped_refptr<media::FilterFactoryCollection> filter_factory_;
- // The actual pipeline. We do it a composition here because we expect to have
- // the same lifetime as the pipeline.
- media::PipelineImpl pipeline_;
+ // The actual pipeline and the thread it runs on.
+ scoped_ptr<media::PipelineImpl> pipeline_;
+ base::Thread pipeline_thread_;
WebKit::WebMediaPlayerClient* client_;