diff options
-rw-r--r-- | media/base/filter_host_impl.cc | 6 | ||||
-rw-r--r-- | media/base/filter_host_impl.h | 15 | ||||
-rw-r--r-- | media/base/pipeline.h | 49 | ||||
-rw-r--r-- | media/base/pipeline_impl.cc | 475 | ||||
-rw-r--r-- | media/base/pipeline_impl.h | 139 | ||||
-rw-r--r-- | media/base/pipeline_impl_unittest.cc | 150 | ||||
-rw-r--r-- | media/player/movie.cc | 8 | ||||
-rw-r--r-- | media/player/movie.h | 2 | ||||
-rw-r--r-- | webkit/glue/webmediaplayer_impl.cc | 54 | ||||
-rw-r--r-- | webkit/glue/webmediaplayer_impl.h | 6 |
10 files changed, 477 insertions, 427 deletions
diff --git a/media/base/filter_host_impl.cc b/media/base/filter_host_impl.cc index 96e4e93..09f1791 100644 --- a/media/base/filter_host_impl.cc +++ b/media/base/filter_host_impl.cc @@ -7,11 +7,11 @@ namespace media { void FilterHostImpl::InitializationComplete() { - pipeline_thread_->InitializationComplete(this); + pipeline_internal_->InitializationComplete(this); } void FilterHostImpl::Error(PipelineError error) { - pipeline_thread_->Error(error); + pipeline_internal_->Error(error); } base::TimeDelta FilterHostImpl::GetTime() const { @@ -19,7 +19,7 @@ base::TimeDelta FilterHostImpl::GetTime() const { } void FilterHostImpl::SetTime(base::TimeDelta time) { - pipeline_thread_->SetTime(time); + pipeline_internal_->SetTime(time); } void FilterHostImpl::SetDuration(base::TimeDelta duration) { diff --git a/media/base/filter_host_impl.h b/media/base/filter_host_impl.h index 1fcb0c4..71e2417 100644 --- a/media/base/filter_host_impl.h +++ b/media/base/filter_host_impl.h @@ -27,15 +27,15 @@ class FilterHostImpl : public FilterHost { virtual void SetVideoSize(size_t width, size_t height); // These methods are public, but are intended for use by the - // PipelineThread class only. + // PipelineInternal class only. // Creates a FilterHostImpl object and populates the |filter_type_| member // by calling the Filter class's static filter_type() method. This ensures // that the GetFilter method can safely cast the filter interface from the // MediaFilter base class interface to the specific Filter interface. template <class Filter> - FilterHostImpl(PipelineThread* pipeline_thread, Filter* filter) - : pipeline_thread_(pipeline_thread), + FilterHostImpl(PipelineInternal* pipeline_internal, Filter* filter) + : pipeline_internal_(pipeline_internal), filter_type_(Filter::filter_type()), filter_(filter), stopped_(false) { @@ -54,15 +54,16 @@ class FilterHostImpl : public FilterHost { // Stops the filter. void Stop(); - // Used by the PipelineThread to call Seek and SetRate methods on filters. + // Used by the PipelineInternal to call Seek() and SetRate() methods on + // filters. MediaFilter* media_filter() const { return filter_; } private: // Useful method for getting the pipeline. - PipelineImpl* pipeline() const { return pipeline_thread_->pipeline(); } + PipelineImpl* pipeline() const { return pipeline_internal_->pipeline(); } - // PipelineThread that owns this FilterHostImpl. - PipelineThread* const pipeline_thread_; + // PipelineInternal that owns this FilterHostImpl. + PipelineInternal* const pipeline_internal_; // The FilterType of the filter this host contains. FilterType const filter_type_; diff --git a/media/base/pipeline.h b/media/base/pipeline.h index d418e4b..d1f5d90 100644 --- a/media/base/pipeline.h +++ b/media/base/pipeline.h @@ -56,41 +56,31 @@ class Pipeline { // construct a filter chain. Returns true if successful, false otherwise // (i.e., pipeline already started). Note that a return value of true // only indicates that the initialization process has started successfully. - // Pipeline initialization is an inherently asynchronous process. Clients - // should not call SetPlaybackRate(), Seek(), or SetVolume() until - // initialization is complete. Clients can either poll the IsInitialized() - // method (which is discouraged) or use the |start_callback| as described - // below. + // Pipeline initialization is an inherently asynchronous process. Clients can + // either poll the IsInitialized() method (discouraged) or use the + // |start_callback| as described below. // // This method is asynchronous and can execute a callback when completed. // If the caller provides a |start_callback|, it will be called when the // pipeline initialization completes. If successful, the callback's bool // parameter will be true. If the callback is called with false, then the - // client can use the GetError() method to obtain more information about the - // reason initialization failed. The prototype for the client callback is: - // void Client::PipelineInitComplete(bool init_was_successful); - // - // Note that clients must not call the Stop method from within the - // |start_callback|. Other methods, including SetPlaybackRate(), Seek(), and - // SetVolume() may be called. The client will be called on a thread owned by - // the pipeline class, not on the thread that originally called the Start() - // method. + // client can use GetError() to obtain more information about the reason + // initialization failed. virtual bool Start(FilterFactory* filter_factory, const std::string& url, PipelineCallback* start_callback) = 0; - // Stops the pipeline and resets to an uninitialized state. This method - // will block the calling thread until the pipeline has been completely - // torn down and reset to an uninitialized state. After calling Stop(), it - // is acceptable to call Start() again since Stop() leaves the pipeline - // in a state identical to a newly created pipeline. + // Asynchronously stops the pipeline and resets it to an uninitialized state. + // If provided, |stop_callback| will be executed when the pipeline has been + // completely torn down and reset to an uninitialized state. It is acceptable + // to call Start() again once the callback has finished executing. // - // Stop() must be called before destroying the pipeline. + // Stop() must be called before destroying the pipeline. Clients can + // determine whether Stop() must be called by checking IsRunning(). // - // TODO(scherkus): it shouldn't be acceptable to call Start() again after you - // Stop() a pipeline -- it should be destroyed and replaced with a new - // instance. - virtual void Stop() = 0; + // TODO(scherkus): ideally clients would destroy the pipeline after calling + // Stop() and create a new pipeline as needed. + virtual void Stop(PipelineCallback* stop_callback) = 0; // Attempt to seek to the position specified by time. |seek_callback| will be // executed when the all filters in the pipeline have processed the seek. @@ -98,9 +88,14 @@ class Pipeline { // (i.e., streaming media). virtual void Seek(base::TimeDelta time, PipelineCallback* seek_callback) = 0; - // Returns the current initialization state of the pipeline. Note that this - // will be set to true prior to a executing |init_complete_callback| if - // initialization is successful. + // Returns true if the pipeline has been started via Start(). If IsRunning() + // returns true, it is expected that Stop() will be called before destroying + // the pipeline. + virtual bool IsRunning() const = 0; + + // Returns true if the pipeline has been started and fully initialized to a + // point where playback controls will be respected. Note that it is possible + // for a pipeline to be started but not initialized (i.e., an error occurred). virtual bool IsInitialized() const = 0; // If the |major_mime_type| exists in the pipeline and is being rendered, this diff --git a/media/base/pipeline_impl.cc b/media/base/pipeline_impl.cc index 56a71a6..68d509c 100644 --- a/media/base/pipeline_impl.cc +++ b/media/base/pipeline_impl.cc @@ -31,6 +31,24 @@ bool SupportsSetMessageLoop() { } } +// Small helper function to help us name filter threads for debugging. +// +// TODO(scherkus): figure out a cleaner way to derive the filter thread name. +template <class Filter> +const char* GetThreadName() { + DCHECK(SupportsSetMessageLoop<Filter>()); + switch (Filter::filter_type()) { + case FILTER_DEMUXER: + return "DemuxerThread"; + case FILTER_AUDIO_DECODER: + return "AudioDecoderThread"; + case FILTER_VIDEO_DECODER: + return "VideoDecoderThread"; + default: + return "FilterThread"; + } +} + // Helper function used with NewRunnableMethod to implement a (very) crude // blocking counter. // @@ -46,62 +64,61 @@ void DecrementCounter(Lock* lock, ConditionVariable* cond_var, int* count) { } // namespace -PipelineImpl::PipelineImpl() { +PipelineImpl::PipelineImpl(MessageLoop* message_loop) + : message_loop_(message_loop) { ResetState(); } PipelineImpl::~PipelineImpl() { - Stop(); + DCHECK(!pipeline_internal_) + << "Stop() must complete before destroying object"; } -// Creates the PipelineThread and calls it's start method. +// Creates the PipelineInternal and calls it's start method. bool PipelineImpl::Start(FilterFactory* factory, const std::string& url, - PipelineCallback* init_complete_callback) { - DCHECK(!pipeline_thread_); + PipelineCallback* start_callback) { + DCHECK(!pipeline_internal_); DCHECK(factory); - DCHECK(!initialized_); - DCHECK(!IsPipelineThread()); - if (!pipeline_thread_ && factory) { - pipeline_thread_ = new PipelineThread(this); - if (pipeline_thread_) { - // TODO(ralphl): Does the callback get copied by these fancy templates? - // if so, then do I want to always delete it here??? - if (pipeline_thread_->Start(factory, url, init_complete_callback)) { - return true; - } - pipeline_thread_ = NULL; // Releases reference to destroy thread - } + if (pipeline_internal_ || !factory) { + return false; } - delete init_complete_callback; - return false; -} -// Stop the PipelineThread and return to a state identical to that of a newly -// created PipelineImpl object. -void PipelineImpl::Stop() { - DCHECK(!IsPipelineThread()); + // Create and start the PipelineInternal. + pipeline_internal_ = new PipelineInternal(this, message_loop_); + if (!pipeline_internal_) { + NOTREACHED() << "Could not create PipelineInternal"; + return false; + } + pipeline_internal_->Start(factory, url, start_callback); + return true; +} - if (pipeline_thread_) { - pipeline_thread_->Stop(); +// Stop the PipelineInternal who will NULL our reference to it and reset our +// state to a newly created PipelineImpl object. +void PipelineImpl::Stop(PipelineCallback* stop_callback) { + if (pipeline_internal_) { + pipeline_internal_->Stop(stop_callback); } - ResetState(); } void PipelineImpl::Seek(base::TimeDelta time, PipelineCallback* seek_callback) { - DCHECK(!IsPipelineThread()); - if (IsPipelineOk()) { - pipeline_thread_->Seek(time, seek_callback); + pipeline_internal_->Seek(time, seek_callback); } else { NOTREACHED(); } } +bool PipelineImpl::IsRunning() const { + AutoLock auto_lock(const_cast<Lock&>(lock_)); + return pipeline_internal_ != NULL; +} + bool PipelineImpl::IsInitialized() const { AutoLock auto_lock(lock_); - return initialized_; + return pipeline_internal_ && pipeline_internal_->IsInitialized(); } bool PipelineImpl::IsRendered(const std::string& major_mime_type) const { @@ -117,10 +134,8 @@ float PipelineImpl::GetPlaybackRate() const { } void PipelineImpl::SetPlaybackRate(float rate) { - DCHECK(!IsPipelineThread()); - if (IsPipelineOk() && rate >= 0.0f) { - pipeline_thread_->SetPlaybackRate(rate); + pipeline_internal_->SetPlaybackRate(rate); } else { // It's OK for a client to call SetPlaybackRate(0.0f) if we're stopped. DCHECK(rate == 0.0f && playback_rate_ == 0.0f); @@ -133,10 +148,8 @@ float PipelineImpl::GetVolume() const { } void PipelineImpl::SetVolume(float volume) { - DCHECK(!IsPipelineThread()); - if (IsPipelineOk() && volume >= 0.0f && volume <= 1.0f) { - pipeline_thread_->SetVolume(volume); + pipeline_internal_->SetVolume(volume); } else { NOTREACHED(); } @@ -182,8 +195,7 @@ PipelineError PipelineImpl::GetError() const { void PipelineImpl::ResetState() { AutoLock auto_lock(lock_); - pipeline_thread_ = NULL; - initialized_ = false; + pipeline_internal_ = NULL; duration_ = base::TimeDelta(); buffered_time_ = base::TimeDelta(); buffered_bytes_ = 0; @@ -198,12 +210,7 @@ void PipelineImpl::ResetState() { } bool PipelineImpl::IsPipelineOk() const { - return pipeline_thread_ && initialized_ && PIPELINE_OK == error_; -} - -bool PipelineImpl::IsPipelineThread() const { - return pipeline_thread_ && - PlatformThread::CurrentId() == pipeline_thread_->thread_id(); + return pipeline_internal_ && PIPELINE_OK == error_; } void PipelineImpl::SetDuration(base::TimeDelta duration) { @@ -263,100 +270,86 @@ void PipelineImpl::InsertRenderedMimeType(const std::string& major_mime_type) { //----------------------------------------------------------------------------- -PipelineThread::PipelineThread(PipelineImpl* pipeline) +PipelineInternal::PipelineInternal(PipelineImpl* pipeline, + MessageLoop* message_loop) : pipeline_(pipeline), - thread_("PipelineThread"), + message_loop_(message_loop), state_(kCreated) { } -PipelineThread::~PipelineThread() { - Stop(); - DCHECK(state_ == kStopped || state_ == kError); +PipelineInternal::~PipelineInternal() { + DCHECK(state_ == kCreated || state_ == kStopped); } -// This method is called on the client's thread. It starts the pipeline's -// dedicated thread and posts a task to call the StartTask() method on that -// thread. -bool PipelineThread::Start(FilterFactory* filter_factory, - const std::string& url, - PipelineCallback* init_complete_callback) { - DCHECK_EQ(kCreated, state_); - if (thread_.Start()) { - filter_factory_ = filter_factory; - url_ = url; - init_callback_.reset(init_complete_callback); - message_loop()->PostTask(FROM_HERE, - NewRunnableMethod(this, &PipelineThread::StartTask)); - return true; - } - return false; +// Called on client's thread. +void PipelineInternal::Start(FilterFactory* filter_factory, + const std::string& url, + PipelineCallback* start_callback) { + DCHECK(filter_factory); + message_loop_->PostTask(FROM_HERE, + NewRunnableMethod(this, &PipelineInternal::StartTask, filter_factory, url, + start_callback)); } -// Called on the client's thread. If the thread has been started, then posts -// a task to call the StopTask() method, then waits until the thread has -// stopped. -void PipelineThread::Stop() { - if (thread_.IsRunning()) { - message_loop()->PostTask(FROM_HERE, - NewRunnableMethod(this, &PipelineThread::StopTask)); - thread_.Stop(); - } - DCHECK(filter_hosts_.empty()); - DCHECK(filter_threads_.empty()); +// Called on client's thread. +void PipelineInternal::Stop(PipelineCallback* stop_callback) { + message_loop_->PostTask(FROM_HERE, + NewRunnableMethod(this, &PipelineInternal::StopTask, stop_callback)); } // Called on client's thread. -void PipelineThread::Seek(base::TimeDelta time, +void PipelineInternal::Seek(base::TimeDelta time, PipelineCallback* seek_callback) { - message_loop()->PostTask(FROM_HERE, - NewRunnableMethod(this, &PipelineThread::SeekTask, time, seek_callback)); + message_loop_->PostTask(FROM_HERE, + NewRunnableMethod(this, &PipelineInternal::SeekTask, time, + seek_callback)); } // Called on client's thread. -void PipelineThread::SetPlaybackRate(float rate) { - message_loop()->PostTask(FROM_HERE, - NewRunnableMethod(this, &PipelineThread::SetPlaybackRateTask, rate)); +void PipelineInternal::SetPlaybackRate(float rate) { + message_loop_->PostTask(FROM_HERE, + NewRunnableMethod(this, &PipelineInternal::SetPlaybackRateTask, rate)); } // Called on client's thread. -void PipelineThread::SetVolume(float volume) { - message_loop()->PostTask(FROM_HERE, - NewRunnableMethod(this, &PipelineThread::SetVolumeTask, volume)); +void PipelineInternal::SetVolume(float volume) { + message_loop_->PostTask(FROM_HERE, + NewRunnableMethod(this, &PipelineInternal::SetVolumeTask, volume)); } // Called from any thread. -void PipelineThread::InitializationComplete(FilterHostImpl* host) { +void PipelineInternal::InitializationComplete(FilterHostImpl* host) { if (IsPipelineOk()) { - // Continue the start task by proceeding to the next stage. - message_loop()->PostTask(FROM_HERE, - NewRunnableMethod(this, &PipelineThread::StartTask)); + // Continue the initialize task by proceeding to the next stage. + message_loop_->PostTask(FROM_HERE, + NewRunnableMethod(this, &PipelineInternal::InitializeTask)); } } // Called from any thread. Updates the pipeline time. -void PipelineThread::SetTime(base::TimeDelta time) { - pipeline()->SetTime(time); +void PipelineInternal::SetTime(base::TimeDelta time) { + // TODO(scherkus): why not post a task? + pipeline_->SetTime(time); } -// Called from any thread. Sets the pipeline |error_| member and schedules a -// task to stop all the filters in the pipeline. Note that the thread will -// continue to run until the client calls Pipeline::Stop(), but nothing will -// be processed since filters will not be able to post tasks. -void PipelineThread::Error(PipelineError error) { - // If this method returns false, then an error has already happened, so no - // reason to run the StopTask again. It's going to happen. - if (pipeline()->InternalSetError(error)) { - message_loop()->PostTask(FROM_HERE, - NewRunnableMethod(this, &PipelineThread::StopTask)); - } +// Called from any thread. Sets the pipeline |error_| member and destroys all +// filters. +void PipelineInternal::Error(PipelineError error) { + message_loop_->PostTask(FROM_HERE, + NewRunnableMethod(this, &PipelineInternal::ErrorTask, error)); } -// Called as a result of destruction of the thread. -// -// TODO(scherkus): this can block the client due to synchronous Stop() API call. -void PipelineThread::WillDestroyCurrentMessageLoop() { - STLDeleteElements(&filter_hosts_); - STLDeleteElements(&filter_threads_); +void PipelineInternal::StartTask(FilterFactory* filter_factory, + const std::string& url, + PipelineCallback* start_callback) { + DCHECK_EQ(MessageLoop::current(), message_loop_); + DCHECK_EQ(kCreated, state_); + filter_factory_ = filter_factory; + url_ = url; + start_callback_.reset(start_callback); + + // Kick off initialization. + InitializeTask(); } // Main initialization method called on the pipeline thread. This code attempts @@ -370,18 +363,17 @@ void PipelineThread::WillDestroyCurrentMessageLoop() { // then connects the VideoDecoder to a VideoRenderer. // // When all required filters have been created and have called their -// FilterHost's InitializationComplete method, the pipeline's |initialized_| -// member is set to true, and, if the client provided an -// |init_complete_callback_|, it is called with "true". +// FilterHost's InitializationComplete() method, the pipeline will update its +// state to kStarted and |init_callback_|, will be executed. // // If initialization fails, the client's callback will still be called, but // the bool parameter passed to it will be false. // -// TODO(hclam): StartTask is now starting the pipeline asynchronously. It +// TODO(hclam): InitializeTask() is now starting the pipeline asynchronously. It // works like a big state change table. If we no longer need to start filters // in order, we need to get rid of all the state change. -void PipelineThread::StartTask() { - DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); +void PipelineInternal::InitializeTask() { + DCHECK_EQ(MessageLoop::current(), message_loop_); // If we have received the stop signal, return immediately. if (state_ == kStopped) @@ -391,7 +383,6 @@ void PipelineThread::StartTask() { // Just created, create data source. if (state_ == kCreated) { - message_loop()->AddDestructionObserver(this); state_ = kInitDataSource; CreateDataSource(); return; @@ -446,99 +437,77 @@ void PipelineThread::StartTask() { } state_ = kStarted; - pipeline_->initialized_ = true; filter_factory_ = NULL; - if (init_callback_.get()) { - init_callback_->Run(true); - init_callback_.reset(); + if (start_callback_.get()) { + start_callback_->Run(true); + start_callback_.reset(); } } } // This method is called as a result of the client calling Pipeline::Stop() or // as the result of an error condition. If there is no error, then set the -// pipeline's error_ member to PIPELINE_STOPPING. We stop the filters in the +// pipeline's |error_| member to PIPELINE_STOPPING. We stop the filters in the // reverse order. // // TODO(scherkus): beware! this can get posted multiple times since we post // Stop() tasks even if we've already stopped. Perhaps this should no-op for // additional calls, however most of this logic will be changing. -void PipelineThread::StopTask() { - DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); +void PipelineInternal::StopTask(PipelineCallback* stop_callback) { + DCHECK_EQ(MessageLoop::current(), message_loop_); + stop_callback_.reset(stop_callback); - if (IsPipelineInitializing()) { - // If IsPipelineOk() is true, the pipeline was simply stopped during - // initialization. Otherwise it is a failure. - state_ = IsPipelineOk() ? kStopped : kError; - filter_factory_ = NULL; - if (init_callback_.get()) { - init_callback_->Run(false); - init_callback_.reset(); - } - } else { - state_ = kStopped; + // If we've already stopped, return immediately. + if (state_ == kStopped) { + return; } - if (IsPipelineOk()) { - pipeline_->error_ = PIPELINE_STOPPING; - } + // Carry out setting the error, notifying the client and destroying filters. + ErrorTask(PIPELINE_STOPPING); - // Stop every filter. - for (FilterHostVector::iterator iter = filter_hosts_.begin(); - iter != filter_hosts_.end(); - ++iter) { - (*iter)->Stop(); - } + // We no longer need to examine our previous state, set it to stopped. + state_ = kStopped; - // Figure out how many threads we have to stop. - // - // TODO(scherkus): remove the workaround for the "multiple StopTask()" issue. - FilterThreadVector running_threads; - for (FilterThreadVector::iterator iter = filter_threads_.begin(); - iter != filter_threads_.end(); - ++iter) { - if ((*iter)->IsRunning()) { - running_threads.push_back(*iter); - } + // Reset the pipeline and set our reference to NULL so we don't accidentally + // modify the pipeline. Once remaining tasks execute we will be destroyed. + pipeline_->ResetState(); + pipeline_ = NULL; + + // Notify the client that stopping has finished. + if (stop_callback_.get()) { + stop_callback_->Run(true); + stop_callback_.reset(); } +} - // Crude blocking counter implementation. - Lock lock; - ConditionVariable wait_for_zero(&lock); - int count = running_threads.size(); +void PipelineInternal::ErrorTask(PipelineError error) { + DCHECK_EQ(MessageLoop::current(), message_loop_); + DCHECK_NE(PIPELINE_OK, error) << "PIPELINE_OK isn't an error!"; - // Post a task to every filter's thread to ensure that they've completed their - // stopping logic before stopping the threads themselves. - // - // TODO(scherkus): again, Stop() should either be synchronous or we should - // receive a signal from filters that they have indeed stopped. - for (FilterThreadVector::iterator iter = running_threads.begin(); - iter != running_threads.end(); - ++iter) { - (*iter)->message_loop()->PostTask(FROM_HERE, - NewRunnableFunction(&DecrementCounter, &lock, &wait_for_zero, &count)); + // Suppress executing additional error logic. + if (state_ == kError) { + return; } - // Wait on our "blocking counter". - { - AutoLock auto_lock(lock); - while (count > 0) { - wait_for_zero.Wait(); - } - } + // Update our error code first in case we execute the start callback. + pipeline_->error_ = error; - // Stop every running filter thread. - // - // TODO(scherkus): can we watchdog this section to detect wedged threads? - for (FilterThreadVector::iterator iter = running_threads.begin(); - iter != running_threads.end(); - ++iter) { - (*iter)->Stop(); + // Notify the client that starting did not complete, if necessary. + if (IsPipelineInitializing() && start_callback_.get()) { + start_callback_->Run(false); } + start_callback_.reset(); + filter_factory_ = NULL; + + // We no longer need to examine our previous state, set it to stopped. + state_ = kError; + + // Destroy every filter and reset the pipeline as well. + DestroyFilters(); } -void PipelineThread::SetPlaybackRateTask(float rate) { - DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); +void PipelineInternal::SetPlaybackRateTask(float rate) { + DCHECK_EQ(MessageLoop::current(), message_loop_); pipeline_->InternalSetPlaybackRate(rate); for (FilterHostVector::iterator iter = filter_hosts_.begin(); @@ -548,9 +517,10 @@ void PipelineThread::SetPlaybackRateTask(float rate) { } } -void PipelineThread::SeekTask(base::TimeDelta time, - PipelineCallback* seek_callback) { - DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); +void PipelineInternal::SeekTask(base::TimeDelta time, + PipelineCallback* seek_callback) { + DCHECK_EQ(MessageLoop::current(), message_loop_); + seek_callback_.reset(seek_callback); for (FilterHostVector::iterator iter = filter_hosts_.begin(); iter != filter_hosts_.end(); @@ -566,14 +536,14 @@ void PipelineThread::SeekTask(base::TimeDelta time, // immediately here or we set time and do callback when we have new // frames/packets. SetTime(time); - if (seek_callback) { - seek_callback->Run(true); - delete seek_callback; + if (seek_callback_.get()) { + seek_callback_->Run(true); + seek_callback_.reset(); } } -void PipelineThread::SetVolumeTask(float volume) { - DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); +void PipelineInternal::SetVolumeTask(float volume) { + DCHECK_EQ(MessageLoop::current(), message_loop_); pipeline_->volume_ = volume; scoped_refptr<AudioRenderer> audio_renderer; @@ -584,44 +554,46 @@ void PipelineThread::SetVolumeTask(float volume) { } template <class Filter, class Source> -void PipelineThread::CreateFilter(FilterFactory* filter_factory, - Source source, - const MediaFormat& media_format) { - DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); +void PipelineInternal::CreateFilter(FilterFactory* filter_factory, + Source source, + const MediaFormat& media_format) { + DCHECK_EQ(MessageLoop::current(), message_loop_); DCHECK(IsPipelineOk()); + // Create the filter. scoped_refptr<Filter> filter = filter_factory->Create<Filter>(media_format); if (!filter) { Error(PIPELINE_ERROR_REQUIRED_FILTER_MISSING); - } else { - scoped_ptr<FilterHostImpl> host(new FilterHostImpl(this, filter.get())); - // Create a dedicated thread for this filter. - if (SupportsSetMessageLoop<Filter>()) { - // TODO(scherkus): figure out a way to name these threads so it matches - // the filter type. - scoped_ptr<base::Thread> thread(new base::Thread("FilterThread")); - if (!thread.get() || !thread->Start()) { - NOTREACHED() << "Could not start filter thread"; - Error(PIPELINE_ERROR_INITIALIZATION_FAILED); - } else { - filter->set_message_loop(thread->message_loop()); - filter_threads_.push_back(thread.release()); - } - } + return; + } - // Creating a thread could have failed, verify we're still OK. - if (IsPipelineOk()) { - filter_hosts_.push_back(host.get()); - filter->set_host(host.release()); - if (!filter->Initialize(source)) { - Error(PIPELINE_ERROR_INITIALIZATION_FAILED); - } + // Create a dedicated thread for this filter if applicable. + if (SupportsSetMessageLoop<Filter>()) { + scoped_ptr<base::Thread> thread(new base::Thread(GetThreadName<Filter>())); + if (!thread.get() || !thread->Start()) { + NOTREACHED() << "Could not start filter thread"; + Error(PIPELINE_ERROR_INITIALIZATION_FAILED); + return; } + + filter->set_message_loop(thread->message_loop()); + filter_threads_.push_back(thread.release()); + } + + // Create the filter's host. + DCHECK(IsPipelineOk()); + scoped_ptr<FilterHostImpl> host(new FilterHostImpl(this, filter.get())); + filter->set_host(host.get()); + filter_hosts_.push_back(host.release()); + + // Now initialize the filter. + if (!filter->Initialize(source)) { + Error(PIPELINE_ERROR_INITIALIZATION_FAILED); } } -void PipelineThread::CreateDataSource() { - DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); +void PipelineInternal::CreateDataSource() { + DCHECK_EQ(MessageLoop::current(), message_loop_); DCHECK(IsPipelineOk()); MediaFormat url_format; @@ -630,8 +602,8 @@ void PipelineThread::CreateDataSource() { CreateFilter<DataSource>(filter_factory_, url_, url_format); } -void PipelineThread::CreateDemuxer() { - DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); +void PipelineInternal::CreateDemuxer() { + DCHECK_EQ(MessageLoop::current(), message_loop_); DCHECK(IsPipelineOk()); scoped_refptr<DataSource> data_source; @@ -641,8 +613,8 @@ void PipelineThread::CreateDemuxer() { } template <class Decoder> -bool PipelineThread::CreateDecoder() { - DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); +bool PipelineInternal::CreateDecoder() { + DCHECK_EQ(MessageLoop::current(), message_loop_); DCHECK(IsPipelineOk()); scoped_refptr<Demuxer> demuxer; @@ -664,8 +636,8 @@ bool PipelineThread::CreateDecoder() { } template <class Decoder, class Renderer> -bool PipelineThread::CreateRenderer() { - DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); +bool PipelineInternal::CreateRenderer() { + DCHECK_EQ(MessageLoop::current(), message_loop_); DCHECK(IsPipelineOk()); scoped_refptr<Decoder> decoder; @@ -681,8 +653,8 @@ bool PipelineThread::CreateRenderer() { } template <class Filter> -void PipelineThread::GetFilter(scoped_refptr<Filter>* filter_out) const { - DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); +void PipelineInternal::GetFilter(scoped_refptr<Filter>* filter_out) const { + DCHECK_EQ(MessageLoop::current(), message_loop_); *filter_out = NULL; for (FilterHostVector::const_iterator iter = filter_hosts_.begin(); @@ -692,4 +664,53 @@ void PipelineThread::GetFilter(scoped_refptr<Filter>* filter_out) const { } } +void PipelineInternal::DestroyFilters() { + // Stop every filter. + for (FilterHostVector::iterator iter = filter_hosts_.begin(); + iter != filter_hosts_.end(); + ++iter) { + (*iter)->Stop(); + } + + // Crude blocking counter implementation. + Lock lock; + ConditionVariable wait_for_zero(&lock); + int count = filter_threads_.size(); + + // Post a task to every filter's thread to ensure that they've completed their + // stopping logic before stopping the threads themselves. + // + // TODO(scherkus): again, Stop() should either be synchronous or we should + // receive a signal from filters that they have indeed stopped. + for (FilterThreadVector::iterator iter = filter_threads_.begin(); + iter != filter_threads_.end(); + ++iter) { + (*iter)->message_loop()->PostTask(FROM_HERE, + NewRunnableFunction(&DecrementCounter, &lock, &wait_for_zero, &count)); + } + + // Wait on our "blocking counter". + { + AutoLock auto_lock(lock); + while (count > 0) { + wait_for_zero.Wait(); + } + } + + // Stop every running filter thread. + // + // TODO(scherkus): can we watchdog this section to detect wedged threads? + for (FilterThreadVector::iterator iter = filter_threads_.begin(); + iter != filter_threads_.end(); + ++iter) { + (*iter)->Stop(); + } + + // Reset the pipeline, which will decrement a reference to this object. + // We will get destroyed as soon as the remaining tasks finish executing. + // To be safe, we'll set our pipeline reference to NULL. + STLDeleteElements(&filter_hosts_); + STLDeleteElements(&filter_threads_); +} + } // namespace media diff --git a/media/base/pipeline_impl.h b/media/base/pipeline_impl.h index a8e9e09..2d6b988 100644 --- a/media/base/pipeline_impl.h +++ b/media/base/pipeline_impl.h @@ -20,24 +20,25 @@ namespace media { class FilterHostImpl; -class PipelineThread; +class PipelineInternal; // Class which implements the Media::Pipeline contract. The majority of the -// actual code for this object lives in the PipelineThread class, which is +// actual code for this object lives in the PipelineInternal class, which is // responsible for actually building and running the pipeline. This object // is basically a simple container for state information, and is responsible -// for creating and communicating with the PipelineThread object. +// for creating and communicating with the PipelineInternal object. class PipelineImpl : public Pipeline { public: - PipelineImpl(); + PipelineImpl(MessageLoop* message_loop); virtual ~PipelineImpl(); // Pipeline implementation. virtual bool Start(FilterFactory* filter_factory, const std::string& uri, PipelineCallback* start_callback); - virtual void Stop(); + virtual void Stop(PipelineCallback* stop_callback); virtual void Seek(base::TimeDelta time, PipelineCallback* seek_callback); + virtual bool IsRunning() const; virtual bool IsInitialized() const; virtual bool IsRendered(const std::string& major_mime_type) const; virtual float GetPlaybackRate() const; @@ -54,7 +55,7 @@ class PipelineImpl : public Pipeline { private: friend class FilterHostImpl; - friend class PipelineThread; + friend class PipelineInternal; // Reset the state of the pipeline object to the initial state. This method // is used by the constructor, and the Stop method. @@ -65,10 +66,6 @@ class PipelineImpl : public Pipeline { // must not be an error. bool IsPipelineOk() const; - // Returns true if we're currently executing on the pipeline thread. Mostly - // used in DCHECKs. - bool IsPipelineThread() const; - // Methods called by FilterHostImpl to update pipeline state. void SetDuration(base::TimeDelta duration); void SetBufferedTime(base::TimeDelta buffered_time); @@ -83,14 +80,17 @@ class PipelineImpl : public Pipeline { // alone, and returns false. bool InternalSetError(PipelineError error); - // Method called by the |pipeline_thread_| to insert a mime type into + // Method called by the |pipeline_internal_| to insert a mime type into // the |rendered_mime_types_| set. void InsertRenderedMimeType(const std::string& major_mime_type); - // Holds a ref counted reference to the PipelineThread object associated - // with this pipeline. Prior to the call to the Start method, this member - // will be NULL, since no thread is running. - scoped_refptr<PipelineThread> pipeline_thread_; + // Message loop used to execute pipeline tasks. + MessageLoop* message_loop_; + + // Holds a ref counted reference to the PipelineInternal object associated + // with this pipeline. Prior to the call to the Start() method, this member + // will be NULL, since we are not running. + scoped_refptr<PipelineInternal> pipeline_internal_; // After calling Start, if all of the required filters are created and // initialized, this member will be set to true by the pipeline thread. @@ -124,14 +124,14 @@ class PipelineImpl : public Pipeline { // Current volume level (from 0.0f to 1.0f). The volume reflects the last // value the audio filter was called with SetVolume, so there will be a short // period of time between the client calling SetVolume on the pipeline and - // this value being updated. Set by the PipelineThread just prior to calling - // the audio renderer. + // this value being updated. Set by the PipelineInternal just prior to + // calling the audio renderer. float volume_; // Current playback rate (>= 0.0f). This member reflects the last value // that the filters in the pipeline were called with, so there will be a short // period of time between the client calling SetPlaybackRate and this value - // being updated. Set by the PipelineThread just prior to calling filters. + // being updated. Set by the PipelineInternal just prior to calling filters. float playback_rate_; // Current playback time. Set by a FilterHostImpl object on behalf of the @@ -152,12 +152,12 @@ class PipelineImpl : public Pipeline { }; -// The PipelineThread contains most of the logic involved with running the -// media pipeline. Filters are created and called on a dedicated thread owned -// by this object. This object works like a state machine to perform -// asynchronous initialization. Initialization is done in multiple passes in -// StartTask(). In each pass a different filter is created and chained with a -// previously created filter. +// PipelineInternal contains most of the logic involved with running the +// media pipeline. Filters are created and called on the message loop injected +// into this object. PipelineInternal works like a state machine to perform +// asynchronous initialization. Initialization is done in multiple passes by +// InitializeTask(). In each pass a different filter is created and chained with +// a previously created filter. // // Here's a state diagram that describes the lifetime of this object. // @@ -172,21 +172,20 @@ class PipelineImpl : public Pipeline { // transition to the "Error" state from any state. If Stop() is called during // initialization, this object will transition to "Stopped" state. -class PipelineThread : public base::RefCountedThreadSafe<PipelineThread>, - public MessageLoop::DestructionObserver { +class PipelineInternal : public base::RefCountedThreadSafe<PipelineInternal> { public: // Methods called by PipelineImpl object on the client's thread. These // methods post a task to call a corresponding xxxTask() method on the - // pipeline thread. For example, Seek posts a task to call SeekTask. - explicit PipelineThread(PipelineImpl* pipeline); + // message loop. For example, Seek posts a task to call SeekTask. + explicit PipelineInternal(PipelineImpl* pipeline, MessageLoop* message_loop); // After Start() is called, a task of StartTask() is posted on the pipeline // thread to perform initialization. See StartTask() to learn more about // initialization. - bool Start(FilterFactory* filter_factory, + void Start(FilterFactory* filter_factory, const std::string& url_media_source, - PipelineCallback* init_complete_callback); - void Stop(); + PipelineCallback* start_callback); + void Stop(PipelineCallback* stop_callback); void Seek(base::TimeDelta time, PipelineCallback* seek_callback); void SetPlaybackRate(float rate); void SetVolume(float volume); @@ -209,19 +208,18 @@ class PipelineThread : public base::RefCountedThreadSafe<PipelineThread>, // Simple accessor used by the FilterHostImpl class to get access to the // pipeline object. + // + // TODO(scherkus): I think FilterHostImpl should not be talking to + // PipelineImpl but rather PipelineInternal. PipelineImpl* pipeline() const { return pipeline_; } - // Accessor used to post messages to thread's message loop. - MessageLoop* message_loop() const { return thread_.message_loop(); } - - // Accessor used by PipelineImpl to check if we're executing on the pipeline - // thread. - PlatformThreadId thread_id() const { return thread_.thread_id(); } + // Returns true if the pipeline has fully initialized. + bool IsInitialized() { return state_ == kStarted; } private: // Only allow ourselves to be destroyed via ref-counting. - friend class base::RefCountedThreadSafe<PipelineThread>; - virtual ~PipelineThread(); + friend class base::RefCountedThreadSafe<PipelineInternal>; + virtual ~PipelineInternal(); enum State { kCreated, @@ -249,23 +247,29 @@ class PipelineThread : public base::RefCountedThreadSafe<PipelineThread>, state_ == kInitVideoRenderer; } - // Implementation of MessageLoop::DestructionObserver. StartTask registers - // this class as a destruction observer on the thread's message loop. - // It is used to destroy the list of FilterHosts - // (and thus destroy the associated filters) when all tasks have been - // processed and the message loop has been quit. - virtual void WillDestroyCurrentMessageLoop(); - // The following "task" methods correspond to the public methods, but these - // methods are run as the result of posting a task to the PipelineThread's + // methods are run as the result of posting a task to the PipelineInternal's // message loop. + void StartTask(FilterFactory* filter_factory, + const std::string& url, + PipelineCallback* start_callback); + + // InitializeTask() performs initialization in multiple passes. It is executed + // as a result of calling Start() or InitializationComplete() that advances + // initialization to the next state. It works as a hub of state transition for + // initialization. + void InitializeTask(); + + // StopTask() and ErrorTask() are similar but serve different purposes: + // - Both destroy the filter chain. + // - Both will execute |start_callback| if the pipeline was initializing. + // - StopTask() resets the pipeline to a fresh state, where as ErrorTask() + // leaves the pipeline as is for client inspection. + // - StopTask() can be scheduled by the client calling Stop(), where as + // ErrorTask() is scheduled as a result of a filter calling Error(). + void StopTask(PipelineCallback* stop_callback); + void ErrorTask(PipelineError error); - // StartTask() is a special task that performs initialization in multiple - // passes. It is executed as a result of calling Start() or - // InitializationComplete() that advances initialization to the next state. It - // works as a hub of state transition for initialization. - void StartTask(); - void StopTask(); void SetPlaybackRateTask(float rate); void SeekTask(base::TimeDelta time, PipelineCallback* seek_callback); void SetVolumeTask(float volume); @@ -302,8 +306,8 @@ class PipelineThread : public base::RefCountedThreadSafe<PipelineThread>, Source source, const MediaFormat& source_media_format); - // Creates a Filter and initilizes it with the given |source|. If a Filter - // could not be created or an error occurred, this metod returns NULL and the + // Creates a Filter and initializes it with the given |source|. If a Filter + // could not be created or an error occurred, this method returns NULL and the // pipeline's |error_| member will contain a specific error code. Note that // the Source could be a filter or a DemuxerStream, but it must support the // GetMediaFormat() method. @@ -339,16 +343,15 @@ class PipelineThread : public base::RefCountedThreadSafe<PipelineThread>, template <class Filter> void GetFilter(scoped_refptr<Filter>* filter_out) const; - // Pointer to the pipeline that owns this PipelineThread. - PipelineImpl* const pipeline_; + // Stops every filters, filter host and filter thread and releases all + // references to them. + void DestroyFilters(); - // The actual thread. - base::Thread thread_; + // Pointer to the pipeline that owns this PipelineInternal. + PipelineImpl* pipeline_; - // Used to avoid scheduling multiple time update tasks. If this member is - // true then a task that will call the SetTimeTask() method is in the message - // loop's queue. - bool time_update_callback_scheduled_; + // Message loop used to execute pipeline tasks. + MessageLoop* message_loop_; // Member that tracks the current state. State state_; @@ -359,17 +362,19 @@ class PipelineThread : public base::RefCountedThreadSafe<PipelineThread>, // URL for the data source as passed in by Start(). std::string url_; - // Initialization callback as passed in by Start(). - scoped_ptr<PipelineCallback> init_callback_; + // Callbacks for various pipeline operations. + scoped_ptr<PipelineCallback> start_callback_; + scoped_ptr<PipelineCallback> seek_callback_; + scoped_ptr<PipelineCallback> stop_callback_; - // Vector of FilterHostImpl objects that contian the filters for the pipeline. + // Vector of FilterHostImpl objects that contain the filters for the pipeline. typedef std::vector<FilterHostImpl*> FilterHostVector; FilterHostVector filter_hosts_; typedef std::vector<base::Thread*> FilterThreadVector; FilterThreadVector filter_threads_; - DISALLOW_COPY_AND_ASSIGN(PipelineThread); + DISALLOW_COPY_AND_ASSIGN(PipelineInternal); }; } // namespace media diff --git a/media/base/pipeline_impl_unittest.cc b/media/base/pipeline_impl_unittest.cc index 4183126..1a550eb 100644 --- a/media/base/pipeline_impl_unittest.cc +++ b/media/base/pipeline_impl_unittest.cc @@ -14,45 +14,53 @@ #include "testing/gtest/include/gtest/gtest.h" using ::testing::DoAll; +using ::testing::Mock; using ::testing::Return; using ::testing::StrictMock; namespace media { -typedef std::vector<MockDemuxerStream*> MockDemuxerStreamVector; +// Used for setting expectations on pipeline callbacks. Using a StrictMock +// also lets us test for missing callbacks. +class CallbackHelper { + public: + CallbackHelper() {} + virtual ~CallbackHelper() {} + + MOCK_METHOD1(OnInitialize, void(bool result)); + MOCK_METHOD1(OnSeek, void(bool result)); + MOCK_METHOD1(OnStop, void(bool result)); + + private: + DISALLOW_COPY_AND_ASSIGN(CallbackHelper); +}; +// TODO(scherkus): even though some filters are initialized on separate +// threads these test aren't flaky... why? It's because filters' Initialize() +// is executed on |message_loop_| and the mock filters instantly call +// InitializationComplete(), which keeps the pipeline humming along. If +// either filters don't call InitializationComplete() immediately or filter +// initialization is moved to a separate thread this test will become flaky. class PipelineImplTest : public ::testing::Test { public: PipelineImplTest() - : mocks_(new MockFilterFactory()), - initialize_result_(false), - seek_result_(false), - initialize_event_(false, false), - seek_event_(false, false) { + : pipeline_(&message_loop_), + mocks_(new MockFilterFactory()) { } virtual ~PipelineImplTest() { - // Force the pipeline to shut down its thread. - pipeline_.Stop(); - } - - protected: - // Called by tests after they have finished setting up MockFilterConfig. - // Initializes the pipeline and returns true if the initialization callback - // was executed, false otherwise. - bool InitializeAndWait() { - pipeline_.Start(mocks_, "", - NewCallback(this, &PipelineImplTest::OnInitialize)); - return initialize_event_.TimedWait(base::TimeDelta::FromMilliseconds(500)); - } + if (!pipeline_.IsRunning()) { + return; + } - // Issues a seek on the pipeline and returns true if the seek callback was - // executed, false otherwise. - bool SeekAndWait(const base::TimeDelta& time) { - pipeline_.Seek(time, NewCallback(this, &PipelineImplTest::OnSeek)); - return seek_event_.TimedWait(base::TimeDelta::FromMilliseconds(500)); + // Expect a stop callback if we were started. + EXPECT_CALL(callbacks_, OnStop(true)); + pipeline_.Stop(NewCallback(reinterpret_cast<CallbackHelper*>(&callbacks_), + &CallbackHelper::OnStop)); + message_loop_.RunAllPending(); } + protected: // Sets up expectations to allow the data source to initialize. void InitializeDataSource() { EXPECT_CALL(*mocks_->data_source(), Initialize("")) @@ -62,6 +70,7 @@ class PipelineImplTest : public ::testing::Test { } // Sets up expectations to allow the demuxer to initialize. + typedef std::vector<MockDemuxerStream*> MockDemuxerStreamVector; void InitializeDemuxer(MockDemuxerStreamVector* streams) { EXPECT_CALL(*mocks_->demuxer(), Initialize(mocks_->data_source())) .WillOnce(DoAll(InitializationComplete(mocks_->demuxer()), @@ -110,27 +119,24 @@ class PipelineImplTest : public ::testing::Test { EXPECT_CALL(*mocks_->audio_renderer(), Stop()); } + // Sets up expectations on the callback and initializes the pipeline. Called + // afters tests have set expectations any filters they wish to use. + void InitializePipeline(bool callback_result) { + // Expect an initialization callback. + EXPECT_CALL(callbacks_, OnInitialize(callback_result)); + pipeline_.Start(mocks_, "", + NewCallback(reinterpret_cast<CallbackHelper*>(&callbacks_), + &CallbackHelper::OnInitialize)); + message_loop_.RunAllPending(); + } + // Fixture members. - media::PipelineImpl pipeline_; + StrictMock<CallbackHelper> callbacks_; + MessageLoop message_loop_; + PipelineImpl pipeline_; scoped_refptr<media::MockFilterFactory> mocks_; - bool initialize_result_; - bool seek_result_; private: - void OnInitialize(bool result) { - initialize_result_ = result; - initialize_event_.Signal(); - } - - void OnSeek(bool result) { - seek_result_ = result; - seek_event_.Signal(); - } - - // Used to wait for callbacks. - base::WaitableEvent initialize_event_; - base::WaitableEvent seek_event_; - DISALLOW_COPY_AND_ASSIGN(PipelineImplTest); }; @@ -140,20 +146,29 @@ TEST_F(PipelineImplTest, NeverInitializes) { EXPECT_CALL(*mocks_->data_source(), Stop()); // This test hangs during initialization by never calling - // InitializationComplete(). Make sure we tear down the pipeline properly. - ASSERT_FALSE(InitializeAndWait()); - EXPECT_FALSE(initialize_result_); + // InitializationComplete(). StrictMock<> will ensure that the callback is + // never executed. + pipeline_.Start(mocks_, "", + NewCallback(reinterpret_cast<CallbackHelper*>(&callbacks_), + &CallbackHelper::OnInitialize)); + message_loop_.RunAllPending(); + EXPECT_FALSE(pipeline_.IsInitialized()); - EXPECT_EQ(media::PIPELINE_OK, pipeline_.GetError()); + EXPECT_EQ(PIPELINE_OK, pipeline_.GetError()); + + // Because our callback will get executed when the test tears down, we'll + // verify that nothing has been called, then set our expectation for the call + // made during tear down. + Mock::VerifyAndClear(&callbacks_); + EXPECT_CALL(callbacks_, OnInitialize(false)); } TEST_F(PipelineImplTest, RequiredFilterMissing) { mocks_->set_creation_successful(false); - ASSERT_TRUE(InitializeAndWait()); - EXPECT_FALSE(initialize_result_); + InitializePipeline(false); EXPECT_FALSE(pipeline_.IsInitialized()); - EXPECT_EQ(media::PIPELINE_ERROR_REQUIRED_FILTER_MISSING, + EXPECT_EQ(PIPELINE_ERROR_REQUIRED_FILTER_MISSING, pipeline_.GetError()); } @@ -164,10 +179,9 @@ TEST_F(PipelineImplTest, URLNotFound) { Return(false))); EXPECT_CALL(*mocks_->data_source(), Stop()); - ASSERT_TRUE(InitializeAndWait()); - EXPECT_FALSE(initialize_result_); + InitializePipeline(false); EXPECT_FALSE(pipeline_.IsInitialized()); - EXPECT_EQ(media::PIPELINE_ERROR_URL_NOT_FOUND, pipeline_.GetError()); + EXPECT_EQ(PIPELINE_ERROR_URL_NOT_FOUND, pipeline_.GetError()); } TEST_F(PipelineImplTest, NoStreams) { @@ -175,10 +189,9 @@ TEST_F(PipelineImplTest, NoStreams) { InitializeDataSource(); InitializeDemuxer(&streams); - ASSERT_TRUE(InitializeAndWait()); - EXPECT_FALSE(initialize_result_); + InitializePipeline(false); EXPECT_FALSE(pipeline_.IsInitialized()); - EXPECT_EQ(media::PIPELINE_ERROR_COULD_NOT_RENDER, pipeline_.GetError()); + EXPECT_EQ(PIPELINE_ERROR_COULD_NOT_RENDER, pipeline_.GetError()); } TEST_F(PipelineImplTest, AudioStream) { @@ -192,10 +205,9 @@ TEST_F(PipelineImplTest, AudioStream) { InitializeAudioDecoder(stream); InitializeAudioRenderer(); - ASSERT_TRUE(InitializeAndWait()); - EXPECT_TRUE(initialize_result_); + InitializePipeline(true); EXPECT_TRUE(pipeline_.IsInitialized()); - EXPECT_EQ(media::PIPELINE_OK, pipeline_.GetError()); + EXPECT_EQ(PIPELINE_OK, pipeline_.GetError()); EXPECT_TRUE(pipeline_.IsRendered(media::mime_type::kMajorTypeAudio)); EXPECT_FALSE(pipeline_.IsRendered(media::mime_type::kMajorTypeVideo)); } @@ -211,10 +223,9 @@ TEST_F(PipelineImplTest, VideoStream) { InitializeVideoDecoder(stream); InitializeVideoRenderer(); - ASSERT_TRUE(InitializeAndWait()); - EXPECT_TRUE(initialize_result_); + InitializePipeline(true); EXPECT_TRUE(pipeline_.IsInitialized()); - EXPECT_EQ(media::PIPELINE_OK, pipeline_.GetError()); + EXPECT_EQ(PIPELINE_OK, pipeline_.GetError()); EXPECT_FALSE(pipeline_.IsRendered(media::mime_type::kMajorTypeAudio)); EXPECT_TRUE(pipeline_.IsRendered(media::mime_type::kMajorTypeVideo)); } @@ -235,10 +246,9 @@ TEST_F(PipelineImplTest, AudioVideoStream) { InitializeVideoDecoder(video_stream); InitializeVideoRenderer(); - ASSERT_TRUE(InitializeAndWait()); - EXPECT_TRUE(initialize_result_); + InitializePipeline(true); EXPECT_TRUE(pipeline_.IsInitialized()); - EXPECT_EQ(media::PIPELINE_OK, pipeline_.GetError()); + EXPECT_EQ(PIPELINE_OK, pipeline_.GetError()); EXPECT_TRUE(pipeline_.IsRendered(media::mime_type::kMajorTypeAudio)); EXPECT_TRUE(pipeline_.IsRendered(media::mime_type::kMajorTypeVideo)); } @@ -268,10 +278,15 @@ TEST_F(PipelineImplTest, Seek) { EXPECT_CALL(*mocks_->video_decoder(), Seek(expected)); EXPECT_CALL(*mocks_->video_renderer(), Seek(expected)); + // We expect a successful seek callback. + EXPECT_CALL(callbacks_, OnSeek(true)); + // Initialize then seek! - ASSERT_TRUE(InitializeAndWait()); - EXPECT_TRUE(SeekAndWait(expected)); - EXPECT_TRUE(seek_result_); + InitializePipeline(true); + pipeline_.Seek(expected, + NewCallback(reinterpret_cast<CallbackHelper*>(&callbacks_), + &CallbackHelper::OnSeek)); + message_loop_.RunAllPending(); } TEST_F(PipelineImplTest, SetVolume) { @@ -290,9 +305,8 @@ TEST_F(PipelineImplTest, SetVolume) { EXPECT_CALL(*mocks_->audio_renderer(), SetVolume(expected)); // Initialize then set volume! - ASSERT_TRUE(InitializeAndWait()); + InitializePipeline(true); pipeline_.SetVolume(expected); } } // namespace media - diff --git a/media/player/movie.cc b/media/player/movie.cc index e37a497..2bb4001 100644 --- a/media/player/movie.cc +++ b/media/player/movie.cc @@ -79,7 +79,9 @@ bool Movie::Open(const wchar_t* url, WtlVideoRenderer* video_renderer) { factories->AddFactory( new media::InstanceFilterFactory<WtlVideoRenderer>(video_renderer)); - pipeline_.reset(new PipelineImpl()); + thread_.reset(new base::Thread("PipelineThread")); + thread_->Start(); + pipeline_.reset(new PipelineImpl(thread_->message_loop())); // Create and start our pipeline. pipeline_->Start(factories.get(), WideToUTF8(std::wstring(url)), NULL); @@ -194,8 +196,10 @@ bool Movie::GetOpenMpEnable() { // Teardown. void Movie::Close() { if (pipeline_.get()) { - pipeline_->Stop(); + pipeline_->Stop(NULL); + thread_->Stop(); pipeline_.reset(); + thread_.reset(); } } diff --git a/media/player/movie.h b/media/player/movie.h index fdaa2b5..eb23822 100644 --- a/media/player/movie.h +++ b/media/player/movie.h @@ -11,6 +11,7 @@ #include "base/scoped_ptr.h" #include "base/singleton.h" +#include "base/thread.h" class WtlVideoRenderer; @@ -90,6 +91,7 @@ class Movie : public Singleton<Movie> { virtual ~Movie(); scoped_ptr<PipelineImpl> pipeline_; + scoped_ptr<base::Thread> thread_; bool enable_audio_; bool enable_swscaler_; diff --git a/webkit/glue/webmediaplayer_impl.cc b/webkit/glue/webmediaplayer_impl.cc index 452a9e6..9f5702f 100644 --- a/webkit/glue/webmediaplayer_impl.cc +++ b/webkit/glue/webmediaplayer_impl.cc @@ -143,7 +143,7 @@ void WebMediaPlayerImpl::Proxy::PipelineInitializationCallback(bool success) { ReadyStateChanged(WebKit::WebMediaPlayer::HaveEnoughData); NetworkStateChanged(WebKit::WebMediaPlayer::Loaded); } else { - // TODO(hclam): should use pipeline_.GetError() to determine the state + // TODO(hclam): should use pipeline_->GetError() to determine the state // properly and reports error using MediaError. // WebKit uses FormatError to indicate an error for bogus URL or bad file. // Since we are at the initialization stage we can safely treat every error @@ -166,11 +166,19 @@ WebMediaPlayerImpl::WebMediaPlayerImpl(WebKit::WebMediaPlayerClient* client, ready_state_(WebKit::WebMediaPlayer::HaveNothing), main_loop_(NULL), filter_factory_(factory), + pipeline_thread_("PipelineThread"), client_(client) { // Saves the current message loop. DCHECK(!main_loop_); main_loop_ = MessageLoop::current(); + // Create the pipeline and its thread. + if (!pipeline_thread_.Start()) { + NOTREACHED() << "Could not start PipelineThread"; + } else { + pipeline_.reset(new media::PipelineImpl(pipeline_thread_.message_loop())); + } + // Also we want to be notified of |main_loop_| destruction. main_loop_->AddDestructionObserver(this); @@ -202,7 +210,7 @@ void WebMediaPlayerImpl::load(const WebKit::WebURL& url) { // Initialize the pipeline. SetNetworkState(WebKit::WebMediaPlayer::Loading); SetReadyState(WebKit::WebMediaPlayer::HaveNothing); - pipeline_.Start( + pipeline_->Start( filter_factory_.get(), url.spec(), NewCallback(proxy_.get(), @@ -218,13 +226,13 @@ void WebMediaPlayerImpl::play() { // TODO(hclam): We should restore the previous playback rate rather than // having it at 1.0. - pipeline_.SetPlaybackRate(1.0f); + pipeline_->SetPlaybackRate(1.0f); } void WebMediaPlayerImpl::pause() { DCHECK(MessageLoop::current() == main_loop_); - pipeline_.SetPlaybackRate(0.0f); + pipeline_->SetPlaybackRate(0.0f); } void WebMediaPlayerImpl::seek(float seconds) { @@ -233,7 +241,7 @@ void WebMediaPlayerImpl::seek(float seconds) { // Try to preserve as much accuracy as possible. float microseconds = seconds * base::Time::kMicrosecondsPerSecond; if (seconds != 0) - pipeline_.Seek( + pipeline_->Seek( base::TimeDelta::FromMicroseconds(static_cast<int64>(microseconds)), NewCallback(proxy_.get(), &WebMediaPlayerImpl::Proxy::PipelineSeekCallback)); @@ -249,13 +257,13 @@ void WebMediaPlayerImpl::setEndTime(float seconds) { void WebMediaPlayerImpl::setRate(float rate) { DCHECK(MessageLoop::current() == main_loop_); - pipeline_.SetPlaybackRate(rate); + pipeline_->SetPlaybackRate(rate); } void WebMediaPlayerImpl::setVolume(float volume) { DCHECK(MessageLoop::current() == main_loop_); - pipeline_.SetVolume(volume); + pipeline_->SetVolume(volume); } void WebMediaPlayerImpl::setVisible(bool visible) { @@ -274,14 +282,14 @@ bool WebMediaPlayerImpl::setAutoBuffer(bool autoBuffer) { bool WebMediaPlayerImpl::totalBytesKnown() { DCHECK(MessageLoop::current() == main_loop_); - return pipeline_.GetTotalBytes() != 0; + return pipeline_->GetTotalBytes() != 0; } bool WebMediaPlayerImpl::hasVideo() const { DCHECK(MessageLoop::current() == main_loop_); size_t width, height; - pipeline_.GetVideoSize(&width, &height); + pipeline_->GetVideoSize(&width, &height); return width != 0 && height != 0; } @@ -289,14 +297,14 @@ WebKit::WebSize WebMediaPlayerImpl::naturalSize() const { DCHECK(MessageLoop::current() == main_loop_); size_t width, height; - pipeline_.GetVideoSize(&width, &height); + pipeline_->GetVideoSize(&width, &height); return WebKit::WebSize(width, height); } bool WebMediaPlayerImpl::paused() const { DCHECK(MessageLoop::current() == main_loop_); - return pipeline_.GetPlaybackRate() == 0.0f; + return pipeline_->GetPlaybackRate() == 0.0f; } bool WebMediaPlayerImpl::seeking() const { @@ -308,13 +316,13 @@ bool WebMediaPlayerImpl::seeking() const { float WebMediaPlayerImpl::duration() const { DCHECK(MessageLoop::current() == main_loop_); - return static_cast<float>(pipeline_.GetDuration().InSecondsF()); + return static_cast<float>(pipeline_->GetDuration().InSecondsF()); } float WebMediaPlayerImpl::currentTime() const { DCHECK(MessageLoop::current() == main_loop_); - return static_cast<float>(pipeline_.GetTime().InSecondsF()); + return static_cast<float>(pipeline_->GetTime().InSecondsF()); } int WebMediaPlayerImpl::dataRate() const { @@ -327,32 +335,32 @@ int WebMediaPlayerImpl::dataRate() const { float WebMediaPlayerImpl::maxTimeBuffered() const { DCHECK(MessageLoop::current() == main_loop_); - return static_cast<float>(pipeline_.GetBufferedTime().InSecondsF()); + return static_cast<float>(pipeline_->GetBufferedTime().InSecondsF()); } float WebMediaPlayerImpl::maxTimeSeekable() const { DCHECK(MessageLoop::current() == main_loop_); // TODO(scherkus): move this logic down into the pipeline. - if (pipeline_.GetTotalBytes() == 0) { + if (pipeline_->GetTotalBytes() == 0) { return 0.0f; } - double total_bytes = static_cast<double>(pipeline_.GetTotalBytes()); - double buffered_bytes = static_cast<double>(pipeline_.GetBufferedBytes()); - double duration = static_cast<double>(pipeline_.GetDuration().InSecondsF()); + double total_bytes = static_cast<double>(pipeline_->GetTotalBytes()); + double buffered_bytes = static_cast<double>(pipeline_->GetBufferedBytes()); + double duration = static_cast<double>(pipeline_->GetDuration().InSecondsF()); return static_cast<float>(duration * (buffered_bytes / total_bytes)); } unsigned long long WebMediaPlayerImpl::bytesLoaded() const { DCHECK(MessageLoop::current() == main_loop_); - return pipeline_.GetBufferedBytes(); + return pipeline_->GetBufferedBytes(); } unsigned long long WebMediaPlayerImpl::totalBytes() const { DCHECK(MessageLoop::current() == main_loop_); - return pipeline_.GetTotalBytes(); + return pipeline_->GetTotalBytes(); } void WebMediaPlayerImpl::setSize(const WebSize& size) { @@ -407,9 +415,9 @@ void WebMediaPlayerImpl::Destroy() { DCHECK(MessageLoop::current() == main_loop_); // Make sure to kill the pipeline so there's no more media threads running. - // TODO(hclam): stopping the pipeline is synchronous so it might block - // stopping for a long time. - pipeline_.Stop(); + // TODO(hclam): stopping the pipeline might block for a long time. + pipeline_->Stop(NULL); + pipeline_thread_.Stop(); // And then detach the proxy, it may live on the render thread for a little // longer until all the tasks are finished. diff --git a/webkit/glue/webmediaplayer_impl.h b/webkit/glue/webmediaplayer_impl.h index 604188c..b8a44b0 100644 --- a/webkit/glue/webmediaplayer_impl.h +++ b/webkit/glue/webmediaplayer_impl.h @@ -248,9 +248,9 @@ class WebMediaPlayerImpl : public WebKit::WebMediaPlayer, // A collection of factories for creating filters. scoped_refptr<media::FilterFactoryCollection> filter_factory_; - // The actual pipeline. We do it a composition here because we expect to have - // the same lifetime as the pipeline. - media::PipelineImpl pipeline_; + // The actual pipeline and the thread it runs on. + scoped_ptr<media::PipelineImpl> pipeline_; + base::Thread pipeline_thread_; WebKit::WebMediaPlayerClient* client_; |