diff options
author | scherkus@chromium.org <scherkus@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2009-07-10 23:26:28 +0000 |
---|---|---|
committer | scherkus@chromium.org <scherkus@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2009-07-10 23:26:28 +0000 |
commit | 423df4c237150165c9f0c61136c119f35a0c434f (patch) | |
tree | f9ac977a7b22dbc302c3372faf8aadfc609c0fda /media | |
parent | 92587cb639acbfcdf0d4089187d6ac86f77101bd (diff) | |
download | chromium_src-423df4c237150165c9f0c61136c119f35a0c434f.zip chromium_src-423df4c237150165c9f0c61136c119f35a0c434f.tar.gz chromium_src-423df4c237150165c9f0c61136c119f35a0c434f.tar.bz2 |
Revert "Implemented injected message loops for PipelineImpl."
I cannot repro the layout test failures, seeing if this helps.
TBR=hclam
BUG=none
TEST=none
Review URL: http://codereview.chromium.org/155396
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@20439 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'media')
-rw-r--r-- | media/base/filter_host_impl.cc | 6 | ||||
-rw-r--r-- | media/base/filter_host_impl.h | 15 | ||||
-rw-r--r-- | media/base/pipeline.h | 49 | ||||
-rw-r--r-- | media/base/pipeline_impl.cc | 475 | ||||
-rw-r--r-- | media/base/pipeline_impl.h | 139 | ||||
-rw-r--r-- | media/base/pipeline_impl_unittest.cc | 150 | ||||
-rw-r--r-- | media/player/movie.cc | 8 | ||||
-rw-r--r-- | media/player/movie.h | 2 |
8 files changed, 401 insertions, 443 deletions
diff --git a/media/base/filter_host_impl.cc b/media/base/filter_host_impl.cc index 09f1791..96e4e93 100644 --- a/media/base/filter_host_impl.cc +++ b/media/base/filter_host_impl.cc @@ -7,11 +7,11 @@ namespace media { void FilterHostImpl::InitializationComplete() { - pipeline_internal_->InitializationComplete(this); + pipeline_thread_->InitializationComplete(this); } void FilterHostImpl::Error(PipelineError error) { - pipeline_internal_->Error(error); + pipeline_thread_->Error(error); } base::TimeDelta FilterHostImpl::GetTime() const { @@ -19,7 +19,7 @@ base::TimeDelta FilterHostImpl::GetTime() const { } void FilterHostImpl::SetTime(base::TimeDelta time) { - pipeline_internal_->SetTime(time); + pipeline_thread_->SetTime(time); } void FilterHostImpl::SetDuration(base::TimeDelta duration) { diff --git a/media/base/filter_host_impl.h b/media/base/filter_host_impl.h index 71e2417..1fcb0c4 100644 --- a/media/base/filter_host_impl.h +++ b/media/base/filter_host_impl.h @@ -27,15 +27,15 @@ class FilterHostImpl : public FilterHost { virtual void SetVideoSize(size_t width, size_t height); // These methods are public, but are intended for use by the - // PipelineInternal class only. + // PipelineThread class only. // Creates a FilterHostImpl object and populates the |filter_type_| member // by calling the Filter class's static filter_type() method. This ensures // that the GetFilter method can safely cast the filter interface from the // MediaFilter base class interface to the specific Filter interface. template <class Filter> - FilterHostImpl(PipelineInternal* pipeline_internal, Filter* filter) - : pipeline_internal_(pipeline_internal), + FilterHostImpl(PipelineThread* pipeline_thread, Filter* filter) + : pipeline_thread_(pipeline_thread), filter_type_(Filter::filter_type()), filter_(filter), stopped_(false) { @@ -54,16 +54,15 @@ class FilterHostImpl : public FilterHost { // Stops the filter. void Stop(); - // Used by the PipelineInternal to call Seek() and SetRate() methods on - // filters. + // Used by the PipelineThread to call Seek and SetRate methods on filters. MediaFilter* media_filter() const { return filter_; } private: // Useful method for getting the pipeline. - PipelineImpl* pipeline() const { return pipeline_internal_->pipeline(); } + PipelineImpl* pipeline() const { return pipeline_thread_->pipeline(); } - // PipelineInternal that owns this FilterHostImpl. - PipelineInternal* const pipeline_internal_; + // PipelineThread that owns this FilterHostImpl. + PipelineThread* const pipeline_thread_; // The FilterType of the filter this host contains. FilterType const filter_type_; diff --git a/media/base/pipeline.h b/media/base/pipeline.h index d1f5d90..d418e4b 100644 --- a/media/base/pipeline.h +++ b/media/base/pipeline.h @@ -56,31 +56,41 @@ class Pipeline { // construct a filter chain. Returns true if successful, false otherwise // (i.e., pipeline already started). Note that a return value of true // only indicates that the initialization process has started successfully. - // Pipeline initialization is an inherently asynchronous process. Clients can - // either poll the IsInitialized() method (discouraged) or use the - // |start_callback| as described below. + // Pipeline initialization is an inherently asynchronous process. Clients + // should not call SetPlaybackRate(), Seek(), or SetVolume() until + // initialization is complete. Clients can either poll the IsInitialized() + // method (which is discouraged) or use the |start_callback| as described + // below. // // This method is asynchronous and can execute a callback when completed. // If the caller provides a |start_callback|, it will be called when the // pipeline initialization completes. If successful, the callback's bool // parameter will be true. If the callback is called with false, then the - // client can use GetError() to obtain more information about the reason - // initialization failed. + // client can use the GetError() method to obtain more information about the + // reason initialization failed. The prototype for the client callback is: + // void Client::PipelineInitComplete(bool init_was_successful); + // + // Note that clients must not call the Stop method from within the + // |start_callback|. Other methods, including SetPlaybackRate(), Seek(), and + // SetVolume() may be called. The client will be called on a thread owned by + // the pipeline class, not on the thread that originally called the Start() + // method. virtual bool Start(FilterFactory* filter_factory, const std::string& url, PipelineCallback* start_callback) = 0; - // Asynchronously stops the pipeline and resets it to an uninitialized state. - // If provided, |stop_callback| will be executed when the pipeline has been - // completely torn down and reset to an uninitialized state. It is acceptable - // to call Start() again once the callback has finished executing. + // Stops the pipeline and resets to an uninitialized state. This method + // will block the calling thread until the pipeline has been completely + // torn down and reset to an uninitialized state. After calling Stop(), it + // is acceptable to call Start() again since Stop() leaves the pipeline + // in a state identical to a newly created pipeline. // - // Stop() must be called before destroying the pipeline. Clients can - // determine whether Stop() must be called by checking IsRunning(). + // Stop() must be called before destroying the pipeline. // - // TODO(scherkus): ideally clients would destroy the pipeline after calling - // Stop() and create a new pipeline as needed. - virtual void Stop(PipelineCallback* stop_callback) = 0; + // TODO(scherkus): it shouldn't be acceptable to call Start() again after you + // Stop() a pipeline -- it should be destroyed and replaced with a new + // instance. + virtual void Stop() = 0; // Attempt to seek to the position specified by time. |seek_callback| will be // executed when the all filters in the pipeline have processed the seek. @@ -88,14 +98,9 @@ class Pipeline { // (i.e., streaming media). virtual void Seek(base::TimeDelta time, PipelineCallback* seek_callback) = 0; - // Returns true if the pipeline has been started via Start(). If IsRunning() - // returns true, it is expected that Stop() will be called before destroying - // the pipeline. - virtual bool IsRunning() const = 0; - - // Returns true if the pipeline has been started and fully initialized to a - // point where playback controls will be respected. Note that it is possible - // for a pipeline to be started but not initialized (i.e., an error occurred). + // Returns the current initialization state of the pipeline. Note that this + // will be set to true prior to a executing |init_complete_callback| if + // initialization is successful. virtual bool IsInitialized() const = 0; // If the |major_mime_type| exists in the pipeline and is being rendered, this diff --git a/media/base/pipeline_impl.cc b/media/base/pipeline_impl.cc index 68d509c..56a71a6 100644 --- a/media/base/pipeline_impl.cc +++ b/media/base/pipeline_impl.cc @@ -31,24 +31,6 @@ bool SupportsSetMessageLoop() { } } -// Small helper function to help us name filter threads for debugging. -// -// TODO(scherkus): figure out a cleaner way to derive the filter thread name. -template <class Filter> -const char* GetThreadName() { - DCHECK(SupportsSetMessageLoop<Filter>()); - switch (Filter::filter_type()) { - case FILTER_DEMUXER: - return "DemuxerThread"; - case FILTER_AUDIO_DECODER: - return "AudioDecoderThread"; - case FILTER_VIDEO_DECODER: - return "VideoDecoderThread"; - default: - return "FilterThread"; - } -} - // Helper function used with NewRunnableMethod to implement a (very) crude // blocking counter. // @@ -64,61 +46,62 @@ void DecrementCounter(Lock* lock, ConditionVariable* cond_var, int* count) { } // namespace -PipelineImpl::PipelineImpl(MessageLoop* message_loop) - : message_loop_(message_loop) { +PipelineImpl::PipelineImpl() { ResetState(); } PipelineImpl::~PipelineImpl() { - DCHECK(!pipeline_internal_) - << "Stop() must complete before destroying object"; + Stop(); } -// Creates the PipelineInternal and calls it's start method. +// Creates the PipelineThread and calls it's start method. bool PipelineImpl::Start(FilterFactory* factory, const std::string& url, - PipelineCallback* start_callback) { - DCHECK(!pipeline_internal_); + PipelineCallback* init_complete_callback) { + DCHECK(!pipeline_thread_); DCHECK(factory); - if (pipeline_internal_ || !factory) { - return false; - } - - // Create and start the PipelineInternal. - pipeline_internal_ = new PipelineInternal(this, message_loop_); - if (!pipeline_internal_) { - NOTREACHED() << "Could not create PipelineInternal"; - return false; + DCHECK(!initialized_); + DCHECK(!IsPipelineThread()); + if (!pipeline_thread_ && factory) { + pipeline_thread_ = new PipelineThread(this); + if (pipeline_thread_) { + // TODO(ralphl): Does the callback get copied by these fancy templates? + // if so, then do I want to always delete it here??? + if (pipeline_thread_->Start(factory, url, init_complete_callback)) { + return true; + } + pipeline_thread_ = NULL; // Releases reference to destroy thread + } } - pipeline_internal_->Start(factory, url, start_callback); - return true; + delete init_complete_callback; + return false; } -// Stop the PipelineInternal who will NULL our reference to it and reset our -// state to a newly created PipelineImpl object. -void PipelineImpl::Stop(PipelineCallback* stop_callback) { - if (pipeline_internal_) { - pipeline_internal_->Stop(stop_callback); +// Stop the PipelineThread and return to a state identical to that of a newly +// created PipelineImpl object. +void PipelineImpl::Stop() { + DCHECK(!IsPipelineThread()); + + if (pipeline_thread_) { + pipeline_thread_->Stop(); } + ResetState(); } void PipelineImpl::Seek(base::TimeDelta time, PipelineCallback* seek_callback) { + DCHECK(!IsPipelineThread()); + if (IsPipelineOk()) { - pipeline_internal_->Seek(time, seek_callback); + pipeline_thread_->Seek(time, seek_callback); } else { NOTREACHED(); } } -bool PipelineImpl::IsRunning() const { - AutoLock auto_lock(const_cast<Lock&>(lock_)); - return pipeline_internal_ != NULL; -} - bool PipelineImpl::IsInitialized() const { AutoLock auto_lock(lock_); - return pipeline_internal_ && pipeline_internal_->IsInitialized(); + return initialized_; } bool PipelineImpl::IsRendered(const std::string& major_mime_type) const { @@ -134,8 +117,10 @@ float PipelineImpl::GetPlaybackRate() const { } void PipelineImpl::SetPlaybackRate(float rate) { + DCHECK(!IsPipelineThread()); + if (IsPipelineOk() && rate >= 0.0f) { - pipeline_internal_->SetPlaybackRate(rate); + pipeline_thread_->SetPlaybackRate(rate); } else { // It's OK for a client to call SetPlaybackRate(0.0f) if we're stopped. DCHECK(rate == 0.0f && playback_rate_ == 0.0f); @@ -148,8 +133,10 @@ float PipelineImpl::GetVolume() const { } void PipelineImpl::SetVolume(float volume) { + DCHECK(!IsPipelineThread()); + if (IsPipelineOk() && volume >= 0.0f && volume <= 1.0f) { - pipeline_internal_->SetVolume(volume); + pipeline_thread_->SetVolume(volume); } else { NOTREACHED(); } @@ -195,7 +182,8 @@ PipelineError PipelineImpl::GetError() const { void PipelineImpl::ResetState() { AutoLock auto_lock(lock_); - pipeline_internal_ = NULL; + pipeline_thread_ = NULL; + initialized_ = false; duration_ = base::TimeDelta(); buffered_time_ = base::TimeDelta(); buffered_bytes_ = 0; @@ -210,7 +198,12 @@ void PipelineImpl::ResetState() { } bool PipelineImpl::IsPipelineOk() const { - return pipeline_internal_ && PIPELINE_OK == error_; + return pipeline_thread_ && initialized_ && PIPELINE_OK == error_; +} + +bool PipelineImpl::IsPipelineThread() const { + return pipeline_thread_ && + PlatformThread::CurrentId() == pipeline_thread_->thread_id(); } void PipelineImpl::SetDuration(base::TimeDelta duration) { @@ -270,86 +263,100 @@ void PipelineImpl::InsertRenderedMimeType(const std::string& major_mime_type) { //----------------------------------------------------------------------------- -PipelineInternal::PipelineInternal(PipelineImpl* pipeline, - MessageLoop* message_loop) +PipelineThread::PipelineThread(PipelineImpl* pipeline) : pipeline_(pipeline), - message_loop_(message_loop), + thread_("PipelineThread"), state_(kCreated) { } -PipelineInternal::~PipelineInternal() { - DCHECK(state_ == kCreated || state_ == kStopped); +PipelineThread::~PipelineThread() { + Stop(); + DCHECK(state_ == kStopped || state_ == kError); } -// Called on client's thread. -void PipelineInternal::Start(FilterFactory* filter_factory, - const std::string& url, - PipelineCallback* start_callback) { - DCHECK(filter_factory); - message_loop_->PostTask(FROM_HERE, - NewRunnableMethod(this, &PipelineInternal::StartTask, filter_factory, url, - start_callback)); +// This method is called on the client's thread. It starts the pipeline's +// dedicated thread and posts a task to call the StartTask() method on that +// thread. +bool PipelineThread::Start(FilterFactory* filter_factory, + const std::string& url, + PipelineCallback* init_complete_callback) { + DCHECK_EQ(kCreated, state_); + if (thread_.Start()) { + filter_factory_ = filter_factory; + url_ = url; + init_callback_.reset(init_complete_callback); + message_loop()->PostTask(FROM_HERE, + NewRunnableMethod(this, &PipelineThread::StartTask)); + return true; + } + return false; } -// Called on client's thread. -void PipelineInternal::Stop(PipelineCallback* stop_callback) { - message_loop_->PostTask(FROM_HERE, - NewRunnableMethod(this, &PipelineInternal::StopTask, stop_callback)); +// Called on the client's thread. If the thread has been started, then posts +// a task to call the StopTask() method, then waits until the thread has +// stopped. +void PipelineThread::Stop() { + if (thread_.IsRunning()) { + message_loop()->PostTask(FROM_HERE, + NewRunnableMethod(this, &PipelineThread::StopTask)); + thread_.Stop(); + } + DCHECK(filter_hosts_.empty()); + DCHECK(filter_threads_.empty()); } // Called on client's thread. -void PipelineInternal::Seek(base::TimeDelta time, +void PipelineThread::Seek(base::TimeDelta time, PipelineCallback* seek_callback) { - message_loop_->PostTask(FROM_HERE, - NewRunnableMethod(this, &PipelineInternal::SeekTask, time, - seek_callback)); + message_loop()->PostTask(FROM_HERE, + NewRunnableMethod(this, &PipelineThread::SeekTask, time, seek_callback)); } // Called on client's thread. -void PipelineInternal::SetPlaybackRate(float rate) { - message_loop_->PostTask(FROM_HERE, - NewRunnableMethod(this, &PipelineInternal::SetPlaybackRateTask, rate)); +void PipelineThread::SetPlaybackRate(float rate) { + message_loop()->PostTask(FROM_HERE, + NewRunnableMethod(this, &PipelineThread::SetPlaybackRateTask, rate)); } // Called on client's thread. -void PipelineInternal::SetVolume(float volume) { - message_loop_->PostTask(FROM_HERE, - NewRunnableMethod(this, &PipelineInternal::SetVolumeTask, volume)); +void PipelineThread::SetVolume(float volume) { + message_loop()->PostTask(FROM_HERE, + NewRunnableMethod(this, &PipelineThread::SetVolumeTask, volume)); } // Called from any thread. -void PipelineInternal::InitializationComplete(FilterHostImpl* host) { +void PipelineThread::InitializationComplete(FilterHostImpl* host) { if (IsPipelineOk()) { - // Continue the initialize task by proceeding to the next stage. - message_loop_->PostTask(FROM_HERE, - NewRunnableMethod(this, &PipelineInternal::InitializeTask)); + // Continue the start task by proceeding to the next stage. + message_loop()->PostTask(FROM_HERE, + NewRunnableMethod(this, &PipelineThread::StartTask)); } } // Called from any thread. Updates the pipeline time. -void PipelineInternal::SetTime(base::TimeDelta time) { - // TODO(scherkus): why not post a task? - pipeline_->SetTime(time); +void PipelineThread::SetTime(base::TimeDelta time) { + pipeline()->SetTime(time); } -// Called from any thread. Sets the pipeline |error_| member and destroys all -// filters. -void PipelineInternal::Error(PipelineError error) { - message_loop_->PostTask(FROM_HERE, - NewRunnableMethod(this, &PipelineInternal::ErrorTask, error)); +// Called from any thread. Sets the pipeline |error_| member and schedules a +// task to stop all the filters in the pipeline. Note that the thread will +// continue to run until the client calls Pipeline::Stop(), but nothing will +// be processed since filters will not be able to post tasks. +void PipelineThread::Error(PipelineError error) { + // If this method returns false, then an error has already happened, so no + // reason to run the StopTask again. It's going to happen. + if (pipeline()->InternalSetError(error)) { + message_loop()->PostTask(FROM_HERE, + NewRunnableMethod(this, &PipelineThread::StopTask)); + } } -void PipelineInternal::StartTask(FilterFactory* filter_factory, - const std::string& url, - PipelineCallback* start_callback) { - DCHECK_EQ(MessageLoop::current(), message_loop_); - DCHECK_EQ(kCreated, state_); - filter_factory_ = filter_factory; - url_ = url; - start_callback_.reset(start_callback); - - // Kick off initialization. - InitializeTask(); +// Called as a result of destruction of the thread. +// +// TODO(scherkus): this can block the client due to synchronous Stop() API call. +void PipelineThread::WillDestroyCurrentMessageLoop() { + STLDeleteElements(&filter_hosts_); + STLDeleteElements(&filter_threads_); } // Main initialization method called on the pipeline thread. This code attempts @@ -363,17 +370,18 @@ void PipelineInternal::StartTask(FilterFactory* filter_factory, // then connects the VideoDecoder to a VideoRenderer. // // When all required filters have been created and have called their -// FilterHost's InitializationComplete() method, the pipeline will update its -// state to kStarted and |init_callback_|, will be executed. +// FilterHost's InitializationComplete method, the pipeline's |initialized_| +// member is set to true, and, if the client provided an +// |init_complete_callback_|, it is called with "true". // // If initialization fails, the client's callback will still be called, but // the bool parameter passed to it will be false. // -// TODO(hclam): InitializeTask() is now starting the pipeline asynchronously. It +// TODO(hclam): StartTask is now starting the pipeline asynchronously. It // works like a big state change table. If we no longer need to start filters // in order, we need to get rid of all the state change. -void PipelineInternal::InitializeTask() { - DCHECK_EQ(MessageLoop::current(), message_loop_); +void PipelineThread::StartTask() { + DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); // If we have received the stop signal, return immediately. if (state_ == kStopped) @@ -383,6 +391,7 @@ void PipelineInternal::InitializeTask() { // Just created, create data source. if (state_ == kCreated) { + message_loop()->AddDestructionObserver(this); state_ = kInitDataSource; CreateDataSource(); return; @@ -437,77 +446,99 @@ void PipelineInternal::InitializeTask() { } state_ = kStarted; + pipeline_->initialized_ = true; filter_factory_ = NULL; - if (start_callback_.get()) { - start_callback_->Run(true); - start_callback_.reset(); + if (init_callback_.get()) { + init_callback_->Run(true); + init_callback_.reset(); } } } // This method is called as a result of the client calling Pipeline::Stop() or // as the result of an error condition. If there is no error, then set the -// pipeline's |error_| member to PIPELINE_STOPPING. We stop the filters in the +// pipeline's error_ member to PIPELINE_STOPPING. We stop the filters in the // reverse order. // // TODO(scherkus): beware! this can get posted multiple times since we post // Stop() tasks even if we've already stopped. Perhaps this should no-op for // additional calls, however most of this logic will be changing. -void PipelineInternal::StopTask(PipelineCallback* stop_callback) { - DCHECK_EQ(MessageLoop::current(), message_loop_); - stop_callback_.reset(stop_callback); +void PipelineThread::StopTask() { + DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); - // If we've already stopped, return immediately. - if (state_ == kStopped) { - return; + if (IsPipelineInitializing()) { + // If IsPipelineOk() is true, the pipeline was simply stopped during + // initialization. Otherwise it is a failure. + state_ = IsPipelineOk() ? kStopped : kError; + filter_factory_ = NULL; + if (init_callback_.get()) { + init_callback_->Run(false); + init_callback_.reset(); + } + } else { + state_ = kStopped; } - // Carry out setting the error, notifying the client and destroying filters. - ErrorTask(PIPELINE_STOPPING); - - // We no longer need to examine our previous state, set it to stopped. - state_ = kStopped; - - // Reset the pipeline and set our reference to NULL so we don't accidentally - // modify the pipeline. Once remaining tasks execute we will be destroyed. - pipeline_->ResetState(); - pipeline_ = NULL; - - // Notify the client that stopping has finished. - if (stop_callback_.get()) { - stop_callback_->Run(true); - stop_callback_.reset(); + if (IsPipelineOk()) { + pipeline_->error_ = PIPELINE_STOPPING; } -} -void PipelineInternal::ErrorTask(PipelineError error) { - DCHECK_EQ(MessageLoop::current(), message_loop_); - DCHECK_NE(PIPELINE_OK, error) << "PIPELINE_OK isn't an error!"; + // Stop every filter. + for (FilterHostVector::iterator iter = filter_hosts_.begin(); + iter != filter_hosts_.end(); + ++iter) { + (*iter)->Stop(); + } - // Suppress executing additional error logic. - if (state_ == kError) { - return; + // Figure out how many threads we have to stop. + // + // TODO(scherkus): remove the workaround for the "multiple StopTask()" issue. + FilterThreadVector running_threads; + for (FilterThreadVector::iterator iter = filter_threads_.begin(); + iter != filter_threads_.end(); + ++iter) { + if ((*iter)->IsRunning()) { + running_threads.push_back(*iter); + } } - // Update our error code first in case we execute the start callback. - pipeline_->error_ = error; + // Crude blocking counter implementation. + Lock lock; + ConditionVariable wait_for_zero(&lock); + int count = running_threads.size(); - // Notify the client that starting did not complete, if necessary. - if (IsPipelineInitializing() && start_callback_.get()) { - start_callback_->Run(false); + // Post a task to every filter's thread to ensure that they've completed their + // stopping logic before stopping the threads themselves. + // + // TODO(scherkus): again, Stop() should either be synchronous or we should + // receive a signal from filters that they have indeed stopped. + for (FilterThreadVector::iterator iter = running_threads.begin(); + iter != running_threads.end(); + ++iter) { + (*iter)->message_loop()->PostTask(FROM_HERE, + NewRunnableFunction(&DecrementCounter, &lock, &wait_for_zero, &count)); } - start_callback_.reset(); - filter_factory_ = NULL; - // We no longer need to examine our previous state, set it to stopped. - state_ = kError; + // Wait on our "blocking counter". + { + AutoLock auto_lock(lock); + while (count > 0) { + wait_for_zero.Wait(); + } + } - // Destroy every filter and reset the pipeline as well. - DestroyFilters(); + // Stop every running filter thread. + // + // TODO(scherkus): can we watchdog this section to detect wedged threads? + for (FilterThreadVector::iterator iter = running_threads.begin(); + iter != running_threads.end(); + ++iter) { + (*iter)->Stop(); + } } -void PipelineInternal::SetPlaybackRateTask(float rate) { - DCHECK_EQ(MessageLoop::current(), message_loop_); +void PipelineThread::SetPlaybackRateTask(float rate) { + DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); pipeline_->InternalSetPlaybackRate(rate); for (FilterHostVector::iterator iter = filter_hosts_.begin(); @@ -517,10 +548,9 @@ void PipelineInternal::SetPlaybackRateTask(float rate) { } } -void PipelineInternal::SeekTask(base::TimeDelta time, - PipelineCallback* seek_callback) { - DCHECK_EQ(MessageLoop::current(), message_loop_); - seek_callback_.reset(seek_callback); +void PipelineThread::SeekTask(base::TimeDelta time, + PipelineCallback* seek_callback) { + DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); for (FilterHostVector::iterator iter = filter_hosts_.begin(); iter != filter_hosts_.end(); @@ -536,14 +566,14 @@ void PipelineInternal::SeekTask(base::TimeDelta time, // immediately here or we set time and do callback when we have new // frames/packets. SetTime(time); - if (seek_callback_.get()) { - seek_callback_->Run(true); - seek_callback_.reset(); + if (seek_callback) { + seek_callback->Run(true); + delete seek_callback; } } -void PipelineInternal::SetVolumeTask(float volume) { - DCHECK_EQ(MessageLoop::current(), message_loop_); +void PipelineThread::SetVolumeTask(float volume) { + DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); pipeline_->volume_ = volume; scoped_refptr<AudioRenderer> audio_renderer; @@ -554,46 +584,44 @@ void PipelineInternal::SetVolumeTask(float volume) { } template <class Filter, class Source> -void PipelineInternal::CreateFilter(FilterFactory* filter_factory, - Source source, - const MediaFormat& media_format) { - DCHECK_EQ(MessageLoop::current(), message_loop_); +void PipelineThread::CreateFilter(FilterFactory* filter_factory, + Source source, + const MediaFormat& media_format) { + DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); DCHECK(IsPipelineOk()); - // Create the filter. scoped_refptr<Filter> filter = filter_factory->Create<Filter>(media_format); if (!filter) { Error(PIPELINE_ERROR_REQUIRED_FILTER_MISSING); - return; - } - - // Create a dedicated thread for this filter if applicable. - if (SupportsSetMessageLoop<Filter>()) { - scoped_ptr<base::Thread> thread(new base::Thread(GetThreadName<Filter>())); - if (!thread.get() || !thread->Start()) { - NOTREACHED() << "Could not start filter thread"; - Error(PIPELINE_ERROR_INITIALIZATION_FAILED); - return; + } else { + scoped_ptr<FilterHostImpl> host(new FilterHostImpl(this, filter.get())); + // Create a dedicated thread for this filter. + if (SupportsSetMessageLoop<Filter>()) { + // TODO(scherkus): figure out a way to name these threads so it matches + // the filter type. + scoped_ptr<base::Thread> thread(new base::Thread("FilterThread")); + if (!thread.get() || !thread->Start()) { + NOTREACHED() << "Could not start filter thread"; + Error(PIPELINE_ERROR_INITIALIZATION_FAILED); + } else { + filter->set_message_loop(thread->message_loop()); + filter_threads_.push_back(thread.release()); + } } - filter->set_message_loop(thread->message_loop()); - filter_threads_.push_back(thread.release()); - } - - // Create the filter's host. - DCHECK(IsPipelineOk()); - scoped_ptr<FilterHostImpl> host(new FilterHostImpl(this, filter.get())); - filter->set_host(host.get()); - filter_hosts_.push_back(host.release()); - - // Now initialize the filter. - if (!filter->Initialize(source)) { - Error(PIPELINE_ERROR_INITIALIZATION_FAILED); + // Creating a thread could have failed, verify we're still OK. + if (IsPipelineOk()) { + filter_hosts_.push_back(host.get()); + filter->set_host(host.release()); + if (!filter->Initialize(source)) { + Error(PIPELINE_ERROR_INITIALIZATION_FAILED); + } + } } } -void PipelineInternal::CreateDataSource() { - DCHECK_EQ(MessageLoop::current(), message_loop_); +void PipelineThread::CreateDataSource() { + DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); DCHECK(IsPipelineOk()); MediaFormat url_format; @@ -602,8 +630,8 @@ void PipelineInternal::CreateDataSource() { CreateFilter<DataSource>(filter_factory_, url_, url_format); } -void PipelineInternal::CreateDemuxer() { - DCHECK_EQ(MessageLoop::current(), message_loop_); +void PipelineThread::CreateDemuxer() { + DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); DCHECK(IsPipelineOk()); scoped_refptr<DataSource> data_source; @@ -613,8 +641,8 @@ void PipelineInternal::CreateDemuxer() { } template <class Decoder> -bool PipelineInternal::CreateDecoder() { - DCHECK_EQ(MessageLoop::current(), message_loop_); +bool PipelineThread::CreateDecoder() { + DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); DCHECK(IsPipelineOk()); scoped_refptr<Demuxer> demuxer; @@ -636,8 +664,8 @@ bool PipelineInternal::CreateDecoder() { } template <class Decoder, class Renderer> -bool PipelineInternal::CreateRenderer() { - DCHECK_EQ(MessageLoop::current(), message_loop_); +bool PipelineThread::CreateRenderer() { + DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); DCHECK(IsPipelineOk()); scoped_refptr<Decoder> decoder; @@ -653,8 +681,8 @@ bool PipelineInternal::CreateRenderer() { } template <class Filter> -void PipelineInternal::GetFilter(scoped_refptr<Filter>* filter_out) const { - DCHECK_EQ(MessageLoop::current(), message_loop_); +void PipelineThread::GetFilter(scoped_refptr<Filter>* filter_out) const { + DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); *filter_out = NULL; for (FilterHostVector::const_iterator iter = filter_hosts_.begin(); @@ -664,53 +692,4 @@ void PipelineInternal::GetFilter(scoped_refptr<Filter>* filter_out) const { } } -void PipelineInternal::DestroyFilters() { - // Stop every filter. - for (FilterHostVector::iterator iter = filter_hosts_.begin(); - iter != filter_hosts_.end(); - ++iter) { - (*iter)->Stop(); - } - - // Crude blocking counter implementation. - Lock lock; - ConditionVariable wait_for_zero(&lock); - int count = filter_threads_.size(); - - // Post a task to every filter's thread to ensure that they've completed their - // stopping logic before stopping the threads themselves. - // - // TODO(scherkus): again, Stop() should either be synchronous or we should - // receive a signal from filters that they have indeed stopped. - for (FilterThreadVector::iterator iter = filter_threads_.begin(); - iter != filter_threads_.end(); - ++iter) { - (*iter)->message_loop()->PostTask(FROM_HERE, - NewRunnableFunction(&DecrementCounter, &lock, &wait_for_zero, &count)); - } - - // Wait on our "blocking counter". - { - AutoLock auto_lock(lock); - while (count > 0) { - wait_for_zero.Wait(); - } - } - - // Stop every running filter thread. - // - // TODO(scherkus): can we watchdog this section to detect wedged threads? - for (FilterThreadVector::iterator iter = filter_threads_.begin(); - iter != filter_threads_.end(); - ++iter) { - (*iter)->Stop(); - } - - // Reset the pipeline, which will decrement a reference to this object. - // We will get destroyed as soon as the remaining tasks finish executing. - // To be safe, we'll set our pipeline reference to NULL. - STLDeleteElements(&filter_hosts_); - STLDeleteElements(&filter_threads_); -} - } // namespace media diff --git a/media/base/pipeline_impl.h b/media/base/pipeline_impl.h index 2d6b988..a8e9e09 100644 --- a/media/base/pipeline_impl.h +++ b/media/base/pipeline_impl.h @@ -20,25 +20,24 @@ namespace media { class FilterHostImpl; -class PipelineInternal; +class PipelineThread; // Class which implements the Media::Pipeline contract. The majority of the -// actual code for this object lives in the PipelineInternal class, which is +// actual code for this object lives in the PipelineThread class, which is // responsible for actually building and running the pipeline. This object // is basically a simple container for state information, and is responsible -// for creating and communicating with the PipelineInternal object. +// for creating and communicating with the PipelineThread object. class PipelineImpl : public Pipeline { public: - PipelineImpl(MessageLoop* message_loop); + PipelineImpl(); virtual ~PipelineImpl(); // Pipeline implementation. virtual bool Start(FilterFactory* filter_factory, const std::string& uri, PipelineCallback* start_callback); - virtual void Stop(PipelineCallback* stop_callback); + virtual void Stop(); virtual void Seek(base::TimeDelta time, PipelineCallback* seek_callback); - virtual bool IsRunning() const; virtual bool IsInitialized() const; virtual bool IsRendered(const std::string& major_mime_type) const; virtual float GetPlaybackRate() const; @@ -55,7 +54,7 @@ class PipelineImpl : public Pipeline { private: friend class FilterHostImpl; - friend class PipelineInternal; + friend class PipelineThread; // Reset the state of the pipeline object to the initial state. This method // is used by the constructor, and the Stop method. @@ -66,6 +65,10 @@ class PipelineImpl : public Pipeline { // must not be an error. bool IsPipelineOk() const; + // Returns true if we're currently executing on the pipeline thread. Mostly + // used in DCHECKs. + bool IsPipelineThread() const; + // Methods called by FilterHostImpl to update pipeline state. void SetDuration(base::TimeDelta duration); void SetBufferedTime(base::TimeDelta buffered_time); @@ -80,17 +83,14 @@ class PipelineImpl : public Pipeline { // alone, and returns false. bool InternalSetError(PipelineError error); - // Method called by the |pipeline_internal_| to insert a mime type into + // Method called by the |pipeline_thread_| to insert a mime type into // the |rendered_mime_types_| set. void InsertRenderedMimeType(const std::string& major_mime_type); - // Message loop used to execute pipeline tasks. - MessageLoop* message_loop_; - - // Holds a ref counted reference to the PipelineInternal object associated - // with this pipeline. Prior to the call to the Start() method, this member - // will be NULL, since we are not running. - scoped_refptr<PipelineInternal> pipeline_internal_; + // Holds a ref counted reference to the PipelineThread object associated + // with this pipeline. Prior to the call to the Start method, this member + // will be NULL, since no thread is running. + scoped_refptr<PipelineThread> pipeline_thread_; // After calling Start, if all of the required filters are created and // initialized, this member will be set to true by the pipeline thread. @@ -124,14 +124,14 @@ class PipelineImpl : public Pipeline { // Current volume level (from 0.0f to 1.0f). The volume reflects the last // value the audio filter was called with SetVolume, so there will be a short // period of time between the client calling SetVolume on the pipeline and - // this value being updated. Set by the PipelineInternal just prior to - // calling the audio renderer. + // this value being updated. Set by the PipelineThread just prior to calling + // the audio renderer. float volume_; // Current playback rate (>= 0.0f). This member reflects the last value // that the filters in the pipeline were called with, so there will be a short // period of time between the client calling SetPlaybackRate and this value - // being updated. Set by the PipelineInternal just prior to calling filters. + // being updated. Set by the PipelineThread just prior to calling filters. float playback_rate_; // Current playback time. Set by a FilterHostImpl object on behalf of the @@ -152,12 +152,12 @@ class PipelineImpl : public Pipeline { }; -// PipelineInternal contains most of the logic involved with running the -// media pipeline. Filters are created and called on the message loop injected -// into this object. PipelineInternal works like a state machine to perform -// asynchronous initialization. Initialization is done in multiple passes by -// InitializeTask(). In each pass a different filter is created and chained with -// a previously created filter. +// The PipelineThread contains most of the logic involved with running the +// media pipeline. Filters are created and called on a dedicated thread owned +// by this object. This object works like a state machine to perform +// asynchronous initialization. Initialization is done in multiple passes in +// StartTask(). In each pass a different filter is created and chained with a +// previously created filter. // // Here's a state diagram that describes the lifetime of this object. // @@ -172,20 +172,21 @@ class PipelineImpl : public Pipeline { // transition to the "Error" state from any state. If Stop() is called during // initialization, this object will transition to "Stopped" state. -class PipelineInternal : public base::RefCountedThreadSafe<PipelineInternal> { +class PipelineThread : public base::RefCountedThreadSafe<PipelineThread>, + public MessageLoop::DestructionObserver { public: // Methods called by PipelineImpl object on the client's thread. These // methods post a task to call a corresponding xxxTask() method on the - // message loop. For example, Seek posts a task to call SeekTask. - explicit PipelineInternal(PipelineImpl* pipeline, MessageLoop* message_loop); + // pipeline thread. For example, Seek posts a task to call SeekTask. + explicit PipelineThread(PipelineImpl* pipeline); // After Start() is called, a task of StartTask() is posted on the pipeline // thread to perform initialization. See StartTask() to learn more about // initialization. - void Start(FilterFactory* filter_factory, + bool Start(FilterFactory* filter_factory, const std::string& url_media_source, - PipelineCallback* start_callback); - void Stop(PipelineCallback* stop_callback); + PipelineCallback* init_complete_callback); + void Stop(); void Seek(base::TimeDelta time, PipelineCallback* seek_callback); void SetPlaybackRate(float rate); void SetVolume(float volume); @@ -208,18 +209,19 @@ class PipelineInternal : public base::RefCountedThreadSafe<PipelineInternal> { // Simple accessor used by the FilterHostImpl class to get access to the // pipeline object. - // - // TODO(scherkus): I think FilterHostImpl should not be talking to - // PipelineImpl but rather PipelineInternal. PipelineImpl* pipeline() const { return pipeline_; } - // Returns true if the pipeline has fully initialized. - bool IsInitialized() { return state_ == kStarted; } + // Accessor used to post messages to thread's message loop. + MessageLoop* message_loop() const { return thread_.message_loop(); } + + // Accessor used by PipelineImpl to check if we're executing on the pipeline + // thread. + PlatformThreadId thread_id() const { return thread_.thread_id(); } private: // Only allow ourselves to be destroyed via ref-counting. - friend class base::RefCountedThreadSafe<PipelineInternal>; - virtual ~PipelineInternal(); + friend class base::RefCountedThreadSafe<PipelineThread>; + virtual ~PipelineThread(); enum State { kCreated, @@ -247,29 +249,23 @@ class PipelineInternal : public base::RefCountedThreadSafe<PipelineInternal> { state_ == kInitVideoRenderer; } + // Implementation of MessageLoop::DestructionObserver. StartTask registers + // this class as a destruction observer on the thread's message loop. + // It is used to destroy the list of FilterHosts + // (and thus destroy the associated filters) when all tasks have been + // processed and the message loop has been quit. + virtual void WillDestroyCurrentMessageLoop(); + // The following "task" methods correspond to the public methods, but these - // methods are run as the result of posting a task to the PipelineInternal's + // methods are run as the result of posting a task to the PipelineThread's // message loop. - void StartTask(FilterFactory* filter_factory, - const std::string& url, - PipelineCallback* start_callback); - - // InitializeTask() performs initialization in multiple passes. It is executed - // as a result of calling Start() or InitializationComplete() that advances - // initialization to the next state. It works as a hub of state transition for - // initialization. - void InitializeTask(); - - // StopTask() and ErrorTask() are similar but serve different purposes: - // - Both destroy the filter chain. - // - Both will execute |start_callback| if the pipeline was initializing. - // - StopTask() resets the pipeline to a fresh state, where as ErrorTask() - // leaves the pipeline as is for client inspection. - // - StopTask() can be scheduled by the client calling Stop(), where as - // ErrorTask() is scheduled as a result of a filter calling Error(). - void StopTask(PipelineCallback* stop_callback); - void ErrorTask(PipelineError error); + // StartTask() is a special task that performs initialization in multiple + // passes. It is executed as a result of calling Start() or + // InitializationComplete() that advances initialization to the next state. It + // works as a hub of state transition for initialization. + void StartTask(); + void StopTask(); void SetPlaybackRateTask(float rate); void SeekTask(base::TimeDelta time, PipelineCallback* seek_callback); void SetVolumeTask(float volume); @@ -306,8 +302,8 @@ class PipelineInternal : public base::RefCountedThreadSafe<PipelineInternal> { Source source, const MediaFormat& source_media_format); - // Creates a Filter and initializes it with the given |source|. If a Filter - // could not be created or an error occurred, this method returns NULL and the + // Creates a Filter and initilizes it with the given |source|. If a Filter + // could not be created or an error occurred, this metod returns NULL and the // pipeline's |error_| member will contain a specific error code. Note that // the Source could be a filter or a DemuxerStream, but it must support the // GetMediaFormat() method. @@ -343,15 +339,16 @@ class PipelineInternal : public base::RefCountedThreadSafe<PipelineInternal> { template <class Filter> void GetFilter(scoped_refptr<Filter>* filter_out) const; - // Stops every filters, filter host and filter thread and releases all - // references to them. - void DestroyFilters(); + // Pointer to the pipeline that owns this PipelineThread. + PipelineImpl* const pipeline_; - // Pointer to the pipeline that owns this PipelineInternal. - PipelineImpl* pipeline_; + // The actual thread. + base::Thread thread_; - // Message loop used to execute pipeline tasks. - MessageLoop* message_loop_; + // Used to avoid scheduling multiple time update tasks. If this member is + // true then a task that will call the SetTimeTask() method is in the message + // loop's queue. + bool time_update_callback_scheduled_; // Member that tracks the current state. State state_; @@ -362,19 +359,17 @@ class PipelineInternal : public base::RefCountedThreadSafe<PipelineInternal> { // URL for the data source as passed in by Start(). std::string url_; - // Callbacks for various pipeline operations. - scoped_ptr<PipelineCallback> start_callback_; - scoped_ptr<PipelineCallback> seek_callback_; - scoped_ptr<PipelineCallback> stop_callback_; + // Initialization callback as passed in by Start(). + scoped_ptr<PipelineCallback> init_callback_; - // Vector of FilterHostImpl objects that contain the filters for the pipeline. + // Vector of FilterHostImpl objects that contian the filters for the pipeline. typedef std::vector<FilterHostImpl*> FilterHostVector; FilterHostVector filter_hosts_; typedef std::vector<base::Thread*> FilterThreadVector; FilterThreadVector filter_threads_; - DISALLOW_COPY_AND_ASSIGN(PipelineInternal); + DISALLOW_COPY_AND_ASSIGN(PipelineThread); }; } // namespace media diff --git a/media/base/pipeline_impl_unittest.cc b/media/base/pipeline_impl_unittest.cc index 1a550eb..4183126 100644 --- a/media/base/pipeline_impl_unittest.cc +++ b/media/base/pipeline_impl_unittest.cc @@ -14,53 +14,45 @@ #include "testing/gtest/include/gtest/gtest.h" using ::testing::DoAll; -using ::testing::Mock; using ::testing::Return; using ::testing::StrictMock; namespace media { -// Used for setting expectations on pipeline callbacks. Using a StrictMock -// also lets us test for missing callbacks. -class CallbackHelper { - public: - CallbackHelper() {} - virtual ~CallbackHelper() {} - - MOCK_METHOD1(OnInitialize, void(bool result)); - MOCK_METHOD1(OnSeek, void(bool result)); - MOCK_METHOD1(OnStop, void(bool result)); - - private: - DISALLOW_COPY_AND_ASSIGN(CallbackHelper); -}; +typedef std::vector<MockDemuxerStream*> MockDemuxerStreamVector; -// TODO(scherkus): even though some filters are initialized on separate -// threads these test aren't flaky... why? It's because filters' Initialize() -// is executed on |message_loop_| and the mock filters instantly call -// InitializationComplete(), which keeps the pipeline humming along. If -// either filters don't call InitializationComplete() immediately or filter -// initialization is moved to a separate thread this test will become flaky. class PipelineImplTest : public ::testing::Test { public: PipelineImplTest() - : pipeline_(&message_loop_), - mocks_(new MockFilterFactory()) { + : mocks_(new MockFilterFactory()), + initialize_result_(false), + seek_result_(false), + initialize_event_(false, false), + seek_event_(false, false) { } virtual ~PipelineImplTest() { - if (!pipeline_.IsRunning()) { - return; - } - - // Expect a stop callback if we were started. - EXPECT_CALL(callbacks_, OnStop(true)); - pipeline_.Stop(NewCallback(reinterpret_cast<CallbackHelper*>(&callbacks_), - &CallbackHelper::OnStop)); - message_loop_.RunAllPending(); + // Force the pipeline to shut down its thread. + pipeline_.Stop(); } protected: + // Called by tests after they have finished setting up MockFilterConfig. + // Initializes the pipeline and returns true if the initialization callback + // was executed, false otherwise. + bool InitializeAndWait() { + pipeline_.Start(mocks_, "", + NewCallback(this, &PipelineImplTest::OnInitialize)); + return initialize_event_.TimedWait(base::TimeDelta::FromMilliseconds(500)); + } + + // Issues a seek on the pipeline and returns true if the seek callback was + // executed, false otherwise. + bool SeekAndWait(const base::TimeDelta& time) { + pipeline_.Seek(time, NewCallback(this, &PipelineImplTest::OnSeek)); + return seek_event_.TimedWait(base::TimeDelta::FromMilliseconds(500)); + } + // Sets up expectations to allow the data source to initialize. void InitializeDataSource() { EXPECT_CALL(*mocks_->data_source(), Initialize("")) @@ -70,7 +62,6 @@ class PipelineImplTest : public ::testing::Test { } // Sets up expectations to allow the demuxer to initialize. - typedef std::vector<MockDemuxerStream*> MockDemuxerStreamVector; void InitializeDemuxer(MockDemuxerStreamVector* streams) { EXPECT_CALL(*mocks_->demuxer(), Initialize(mocks_->data_source())) .WillOnce(DoAll(InitializationComplete(mocks_->demuxer()), @@ -119,24 +110,27 @@ class PipelineImplTest : public ::testing::Test { EXPECT_CALL(*mocks_->audio_renderer(), Stop()); } - // Sets up expectations on the callback and initializes the pipeline. Called - // afters tests have set expectations any filters they wish to use. - void InitializePipeline(bool callback_result) { - // Expect an initialization callback. - EXPECT_CALL(callbacks_, OnInitialize(callback_result)); - pipeline_.Start(mocks_, "", - NewCallback(reinterpret_cast<CallbackHelper*>(&callbacks_), - &CallbackHelper::OnInitialize)); - message_loop_.RunAllPending(); - } - // Fixture members. - StrictMock<CallbackHelper> callbacks_; - MessageLoop message_loop_; - PipelineImpl pipeline_; + media::PipelineImpl pipeline_; scoped_refptr<media::MockFilterFactory> mocks_; + bool initialize_result_; + bool seek_result_; private: + void OnInitialize(bool result) { + initialize_result_ = result; + initialize_event_.Signal(); + } + + void OnSeek(bool result) { + seek_result_ = result; + seek_event_.Signal(); + } + + // Used to wait for callbacks. + base::WaitableEvent initialize_event_; + base::WaitableEvent seek_event_; + DISALLOW_COPY_AND_ASSIGN(PipelineImplTest); }; @@ -146,29 +140,20 @@ TEST_F(PipelineImplTest, NeverInitializes) { EXPECT_CALL(*mocks_->data_source(), Stop()); // This test hangs during initialization by never calling - // InitializationComplete(). StrictMock<> will ensure that the callback is - // never executed. - pipeline_.Start(mocks_, "", - NewCallback(reinterpret_cast<CallbackHelper*>(&callbacks_), - &CallbackHelper::OnInitialize)); - message_loop_.RunAllPending(); - + // InitializationComplete(). Make sure we tear down the pipeline properly. + ASSERT_FALSE(InitializeAndWait()); + EXPECT_FALSE(initialize_result_); EXPECT_FALSE(pipeline_.IsInitialized()); - EXPECT_EQ(PIPELINE_OK, pipeline_.GetError()); - - // Because our callback will get executed when the test tears down, we'll - // verify that nothing has been called, then set our expectation for the call - // made during tear down. - Mock::VerifyAndClear(&callbacks_); - EXPECT_CALL(callbacks_, OnInitialize(false)); + EXPECT_EQ(media::PIPELINE_OK, pipeline_.GetError()); } TEST_F(PipelineImplTest, RequiredFilterMissing) { mocks_->set_creation_successful(false); - InitializePipeline(false); + ASSERT_TRUE(InitializeAndWait()); + EXPECT_FALSE(initialize_result_); EXPECT_FALSE(pipeline_.IsInitialized()); - EXPECT_EQ(PIPELINE_ERROR_REQUIRED_FILTER_MISSING, + EXPECT_EQ(media::PIPELINE_ERROR_REQUIRED_FILTER_MISSING, pipeline_.GetError()); } @@ -179,9 +164,10 @@ TEST_F(PipelineImplTest, URLNotFound) { Return(false))); EXPECT_CALL(*mocks_->data_source(), Stop()); - InitializePipeline(false); + ASSERT_TRUE(InitializeAndWait()); + EXPECT_FALSE(initialize_result_); EXPECT_FALSE(pipeline_.IsInitialized()); - EXPECT_EQ(PIPELINE_ERROR_URL_NOT_FOUND, pipeline_.GetError()); + EXPECT_EQ(media::PIPELINE_ERROR_URL_NOT_FOUND, pipeline_.GetError()); } TEST_F(PipelineImplTest, NoStreams) { @@ -189,9 +175,10 @@ TEST_F(PipelineImplTest, NoStreams) { InitializeDataSource(); InitializeDemuxer(&streams); - InitializePipeline(false); + ASSERT_TRUE(InitializeAndWait()); + EXPECT_FALSE(initialize_result_); EXPECT_FALSE(pipeline_.IsInitialized()); - EXPECT_EQ(PIPELINE_ERROR_COULD_NOT_RENDER, pipeline_.GetError()); + EXPECT_EQ(media::PIPELINE_ERROR_COULD_NOT_RENDER, pipeline_.GetError()); } TEST_F(PipelineImplTest, AudioStream) { @@ -205,9 +192,10 @@ TEST_F(PipelineImplTest, AudioStream) { InitializeAudioDecoder(stream); InitializeAudioRenderer(); - InitializePipeline(true); + ASSERT_TRUE(InitializeAndWait()); + EXPECT_TRUE(initialize_result_); EXPECT_TRUE(pipeline_.IsInitialized()); - EXPECT_EQ(PIPELINE_OK, pipeline_.GetError()); + EXPECT_EQ(media::PIPELINE_OK, pipeline_.GetError()); EXPECT_TRUE(pipeline_.IsRendered(media::mime_type::kMajorTypeAudio)); EXPECT_FALSE(pipeline_.IsRendered(media::mime_type::kMajorTypeVideo)); } @@ -223,9 +211,10 @@ TEST_F(PipelineImplTest, VideoStream) { InitializeVideoDecoder(stream); InitializeVideoRenderer(); - InitializePipeline(true); + ASSERT_TRUE(InitializeAndWait()); + EXPECT_TRUE(initialize_result_); EXPECT_TRUE(pipeline_.IsInitialized()); - EXPECT_EQ(PIPELINE_OK, pipeline_.GetError()); + EXPECT_EQ(media::PIPELINE_OK, pipeline_.GetError()); EXPECT_FALSE(pipeline_.IsRendered(media::mime_type::kMajorTypeAudio)); EXPECT_TRUE(pipeline_.IsRendered(media::mime_type::kMajorTypeVideo)); } @@ -246,9 +235,10 @@ TEST_F(PipelineImplTest, AudioVideoStream) { InitializeVideoDecoder(video_stream); InitializeVideoRenderer(); - InitializePipeline(true); + ASSERT_TRUE(InitializeAndWait()); + EXPECT_TRUE(initialize_result_); EXPECT_TRUE(pipeline_.IsInitialized()); - EXPECT_EQ(PIPELINE_OK, pipeline_.GetError()); + EXPECT_EQ(media::PIPELINE_OK, pipeline_.GetError()); EXPECT_TRUE(pipeline_.IsRendered(media::mime_type::kMajorTypeAudio)); EXPECT_TRUE(pipeline_.IsRendered(media::mime_type::kMajorTypeVideo)); } @@ -278,15 +268,10 @@ TEST_F(PipelineImplTest, Seek) { EXPECT_CALL(*mocks_->video_decoder(), Seek(expected)); EXPECT_CALL(*mocks_->video_renderer(), Seek(expected)); - // We expect a successful seek callback. - EXPECT_CALL(callbacks_, OnSeek(true)); - // Initialize then seek! - InitializePipeline(true); - pipeline_.Seek(expected, - NewCallback(reinterpret_cast<CallbackHelper*>(&callbacks_), - &CallbackHelper::OnSeek)); - message_loop_.RunAllPending(); + ASSERT_TRUE(InitializeAndWait()); + EXPECT_TRUE(SeekAndWait(expected)); + EXPECT_TRUE(seek_result_); } TEST_F(PipelineImplTest, SetVolume) { @@ -305,8 +290,9 @@ TEST_F(PipelineImplTest, SetVolume) { EXPECT_CALL(*mocks_->audio_renderer(), SetVolume(expected)); // Initialize then set volume! - InitializePipeline(true); + ASSERT_TRUE(InitializeAndWait()); pipeline_.SetVolume(expected); } } // namespace media + diff --git a/media/player/movie.cc b/media/player/movie.cc index 2bb4001..e37a497 100644 --- a/media/player/movie.cc +++ b/media/player/movie.cc @@ -79,9 +79,7 @@ bool Movie::Open(const wchar_t* url, WtlVideoRenderer* video_renderer) { factories->AddFactory( new media::InstanceFilterFactory<WtlVideoRenderer>(video_renderer)); - thread_.reset(new base::Thread("PipelineThread")); - thread_->Start(); - pipeline_.reset(new PipelineImpl(thread_->message_loop())); + pipeline_.reset(new PipelineImpl()); // Create and start our pipeline. pipeline_->Start(factories.get(), WideToUTF8(std::wstring(url)), NULL); @@ -196,10 +194,8 @@ bool Movie::GetOpenMpEnable() { // Teardown. void Movie::Close() { if (pipeline_.get()) { - pipeline_->Stop(NULL); - thread_->Stop(); + pipeline_->Stop(); pipeline_.reset(); - thread_.reset(); } } diff --git a/media/player/movie.h b/media/player/movie.h index eb23822..fdaa2b5 100644 --- a/media/player/movie.h +++ b/media/player/movie.h @@ -11,7 +11,6 @@ #include "base/scoped_ptr.h" #include "base/singleton.h" -#include "base/thread.h" class WtlVideoRenderer; @@ -91,7 +90,6 @@ class Movie : public Singleton<Movie> { virtual ~Movie(); scoped_ptr<PipelineImpl> pipeline_; - scoped_ptr<base::Thread> thread_; bool enable_audio_; bool enable_swscaler_; |