diff options
author | hclam@chromium.org <hclam@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2009-07-01 23:56:29 +0000 |
---|---|---|
committer | hclam@chromium.org <hclam@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2009-07-01 23:56:29 +0000 |
commit | 7de6dd37b467971c9cdf5bed1ee466bdbbe9d42e (patch) | |
tree | d810c003874ca63c97e9109242454084eefbd48f /media | |
parent | b4ed8a38e90d0999f0c27950f807712996a3f800 (diff) | |
download | chromium_src-7de6dd37b467971c9cdf5bed1ee466bdbbe9d42e.zip chromium_src-7de6dd37b467971c9cdf5bed1ee466bdbbe9d42e.tar.gz chromium_src-7de6dd37b467971c9cdf5bed1ee466bdbbe9d42e.tar.bz2 |
Asynchronous initialization of media::PipelineThread
The initialization of media::PipelineThread needs to
done asynchronously or a lot of dead lock will happen.
Review URL: http://codereview.chromium.org/149123
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@19787 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'media')
-rw-r--r-- | media/base/pipeline_impl.cc | 392 | ||||
-rw-r--r-- | media/base/pipeline_impl.h | 146 | ||||
-rw-r--r-- | media/base/pipeline_impl_unittest.cc | 15 |
3 files changed, 322 insertions, 231 deletions
diff --git a/media/base/pipeline_impl.cc b/media/base/pipeline_impl.cc index 5ed70e3..16148d3 100644 --- a/media/base/pipeline_impl.cc +++ b/media/base/pipeline_impl.cc @@ -3,7 +3,7 @@ // found in the LICENSE file. // // TODO(scherkus): clean up PipelineImpl... too many crazy function names, -// potential deadlocks, nested message loops, etc... +// potential deadlocks, etc... #include "base/compiler_specific.h" #include "base/condition_variable.h" @@ -289,46 +289,37 @@ PipelineThread::PipelineThread(PipelineImpl* pipeline) : pipeline_(pipeline), thread_("PipelineThread"), time_update_callback_scheduled_(false), - host_initializing_(NULL) { + state_(kCreated) { } PipelineThread::~PipelineThread() { Stop(); + DCHECK(state_ == kStopped || state_ == kError); } // This method is called on the client's thread. It starts the pipeline's -// dedicated thread and posts a task to call the StartTask method on that +// dedicated thread and posts a task to call the StartTask() method on that // thread. bool PipelineThread::Start(FilterFactory* filter_factory, const std::string& url, PipelineCallback* init_complete_callback) { + DCHECK_EQ(kCreated, state_); if (thread_.Start()) { - filter_factory->AddRef(); - PostTask(NewRunnableMethod(this, - &PipelineThread::StartTask, - filter_factory, - url, - // TODO(ralphl): what happens to this callback? - // is it copied by NewRunnableTask? Just pointer - // or is the callback itself copied? - init_complete_callback)); + filter_factory_ = filter_factory; + url_ = url; + init_callback_.reset(init_complete_callback); + PostTask(NewRunnableMethod(this, &PipelineThread::StartTask)); return true; } return false; } // Called on the client's thread. If the thread has been started, then posts -// a task to call the StopTask method, then waits until the thread has stopped. -// There is a critical section that wraps the entire duration of the StartTask -// method. This method waits for that Lock to be released so that we know -// that the thread is not executing a nested message loop. This way we know -// that that Thread::Stop call will quit the appropriate message loop. -// -// TODO(scherkus): this can potentially deadlock, hack away our lock usage!! +// a task to call the StopTask() method, then waits until the thread has +// stopped. void PipelineThread::Stop() { if (thread_.IsRunning()) { PostTask(NewRunnableMethod(this, &PipelineThread::StopTask)); - AutoLock lock_crit(initialization_lock_); thread_.Stop(); } DCHECK(filter_hosts_.empty()); @@ -352,19 +343,11 @@ void PipelineThread::SetVolume(float volume) { PostTask(NewRunnableMethod(this, &PipelineThread::SetVolumeTask, volume)); } -// May be called on any thread, and therefore we always assume the worst -// possible race condition. This could, for example, be called from a filter's -// thread just as the pipeline thread is exiting the call to the filter's -// Initialize() method. Therefore, we make NO assumptions, and post work -// in every case, even the trivial one of a thread calling this method from -// within it's Initialize method. This means that we will always run a nested -// message loop, and the InitializationCompleteTask will Quit that loop -// immediately in the trivial case. void PipelineThread::InitializationComplete(FilterHostImpl* host) { - DCHECK(host == host_initializing_); - PostTask(NewRunnableMethod(this, - &PipelineThread::InitializationCompleteTask, - host)); + if (IsPipelineOk()) { + // Continue the start task by proceeding to the next stage. + PostTask(NewRunnableMethod(this, &PipelineThread::StartTask)); + } } // Called from any thread. Updates the pipeline time and schedules a task to @@ -377,9 +360,9 @@ void PipelineThread::SetTime(base::TimeDelta time) { } } -// Called from any thread. Sets the pipeline error_ member and schedules a +// Called from any thread. Sets the pipeline |error_| member and schedules a // task to stop all the filters in the pipeline. Note that the thread will -// continue to run until the client calls Pipeline::Stop, but nothing will +// continue to run until the client calls Pipeline::Stop(), but nothing will // be processed since filters will not be able to post tasks. void PipelineThread::Error(PipelineError error) { // If this method returns false, then an error has already happened, so no @@ -389,67 +372,106 @@ void PipelineThread::Error(PipelineError error) { } } -// Called from any thread. Used by FilterHostImpl::PostTask method and used -// internally. +// This is a helper method to post task on message_loop(). This method is only +// called from this class or from Pipeline. void PipelineThread::PostTask(Task* task) { message_loop()->PostTask(FROM_HERE, task); } // Main initialization method called on the pipeline thread. This code attempts -// to use the specified filter factory to build a pipeline. It starts by -// creating a DataSource, connects it to a Demuxer, and then connects the -// Demuxer's audio stream to an AudioDecoder which is then connected to an -// AudioRenderer. If the media has video, then it connects a VideoDecoder to -// the Demuxer's video stream, and then connects the VideoDecoder to a -// VideoRenderer. When all required filters have been created and have called -// their FilterHost's InitializationComplete method, the pipeline's -// initialized_ member is set to true, and, if the client provided an -// init_complete_callback, it is called with "true". -// If initializatoin fails, the client's callback will still be called, but +// to use the specified filter factory to build a pipeline. +// Initialization step performed in this method depends on current state of this +// object, indicated by |state_|. After each step of initialization, this +// object transits to the next stage. It starts by creating a DataSource, +// connects it to a Demuxer, and then connects the Demuxer's audio stream to an +// AudioDecoder which is then connected to an AudioRenderer. If the media has +// video, then it connects a VideoDecoder to the Demuxer's video stream, and +// then connects the VideoDecoder to a VideoRenderer. +// +// When all required filters have been created and have called their +// FilterHost's InitializationComplete method, the pipeline's |initialized_| +// member is set to true, and, if the client provided an +// |init_complete_callback_|, it is called with "true". +// +// If initialization fails, the client's callback will still be called, but // the bool parameter passed to it will be false. // -// Note that at each step in this process, the initialization of any filter -// may require running the pipeline thread's message loop recursively. This is -// handled by the CreateFilter method. -void PipelineThread::StartTask(FilterFactory* filter_factory, - const std::string& url, - PipelineCallback* init_complete_callback) { - // During the entire StartTask we hold the initialization_lock_ so that - // if the client calls the Pipeline::Stop method while we are running a - // nested message loop, we can correctly unwind out of it before calling - // the Thread::Stop method. - AutoLock auto_lock(initialization_lock_); - - // Add ourselves as a destruction observer of the thread's message loop so - // we can delete filters at an appropriate time (when all tasks have been - // processed and the thread is about to be destroyed). - message_loop()->AddDestructionObserver(this); - - scoped_refptr<DataSource> data_source = CreateDataSource(filter_factory, url); - if (PipelineOk()) { - scoped_refptr<Demuxer> demuxer = - CreateFilter<Demuxer, DataSource>(filter_factory, data_source); - if (PipelineOk()) { - Render<AudioDecoder, AudioRenderer>(filter_factory, demuxer); - } - if (PipelineOk()) { - Render<VideoDecoder, VideoRenderer>(filter_factory, demuxer); +// TODO(hclam): StartTask is now starting the pipeline asynchronously. It +// works like a big state change table. If we no longer need to start filters +// in order, we need to get rid of all the state change. +void PipelineThread::StartTask() { + DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); + + // If we have received the stop signal, return immediately. + if (state_ == kStopped) + return; + + DCHECK(state_ == kCreated || IsPipelineInitializing()); + + // Just created, create data source. + if (state_ == kCreated) { + message_loop()->AddDestructionObserver(this); + state_ = kInitDataSource; + CreateDataSource(); + return; + } + + // Data source created, create demuxer. + if (state_ == kInitDataSource) { + state_ = kInitDemuxer; + CreateDemuxer(); + return; + } + + // Demuxer created, create audio decoder. + if (state_ == kInitDemuxer) { + state_ = kInitAudioDecoder; + // If this method returns false, then there's no audio stream. + if (CreateDecoder<AudioDecoder>()) + return; + } + + // Assuming audio decoder was created, create audio renderer. + if (state_ == kInitAudioDecoder) { + state_ = kInitAudioRenderer; + // Returns false if there's no audio stream. + if (CreateRenderer<AudioDecoder, AudioRenderer>()) { + pipeline_->InsertRenderedMimeType(AudioDecoder::major_mime_type()); + return; } } - if (PipelineOk() && pipeline_->rendered_mime_types_.empty()) { - Error(PIPELINE_ERROR_COULD_NOT_RENDER); + // Assuming audio renderer was created, create video decoder. + if (state_ == kInitAudioRenderer) { + // Then perform the stage of initialization, i.e. initialize video decoder. + state_ = kInitVideoDecoder; + if (CreateDecoder<VideoDecoder>()) + return; } - pipeline_->initialized_ = PipelineOk(); + // Assuming video decoder was created, create video renderer. + if (state_ == kInitVideoDecoder) { + state_ = kInitVideoRenderer; + if (CreateRenderer<VideoDecoder, VideoRenderer>()) { + pipeline_->InsertRenderedMimeType(VideoDecoder::major_mime_type()); + return; + } + } + + if (state_ == kInitVideoRenderer) { + if (!IsPipelineOk() || pipeline_->rendered_mime_types_.empty()) { + Error(PIPELINE_ERROR_COULD_NOT_RENDER); + return; + } - // No matter what, we're done with the filter factory, and - // client callback so get rid of them. - filter_factory->Release(); - if (init_complete_callback) { - init_complete_callback->Run(pipeline_->initialized_); - delete init_complete_callback; + state_ = kStarted; + pipeline_->initialized_ = true; + filter_factory_ = NULL; + if (init_callback_.get()) { + init_callback_->Run(true); + init_callback_.reset(); + } } } @@ -462,7 +484,22 @@ void PipelineThread::StartTask(FilterFactory* filter_factory, // Stop() tasks even if we've already stopped. Perhaps this should no-op for // additional calls, however most of this logic will be changing. void PipelineThread::StopTask() { - if (PipelineOk()) { + DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); + + if (IsPipelineInitializing()) { + // If IsPipelineOk() is true, the pipeline was simply stopped during + // initialization. Otherwise it is a failure. + state_ = IsPipelineOk() ? kStopped : kError; + filter_factory_ = NULL; + if (init_callback_.get()) { + init_callback_->Run(false); + init_callback_.reset(); + } + } else { + state_ = kStopped; + } + + if (IsPipelineOk()) { pipeline_->error_ = PIPELINE_STOPPING; } @@ -518,52 +555,11 @@ void PipelineThread::StopTask() { ++iter) { (*iter)->Stop(); } - - if (host_initializing_) { - host_initializing_ = NULL; - message_loop()->Quit(); - } -} - -template <class Decoder, class Renderer> -void PipelineThread::Render(FilterFactory* filter_factory, Demuxer* demuxer) { - DCHECK(PipelineOk()); - const std::string major_mime_type = Decoder::major_mime_type(); - const int num_outputs = demuxer->GetNumberOfStreams(); - for (int i = 0; i < num_outputs; ++i) { - scoped_refptr<DemuxerStream> stream = demuxer->GetStream(i); - std::string value; - if (stream->media_format().GetAsString(MediaFormat::kMimeType, &value) && - 0 == value.compare(0, major_mime_type.length(), major_mime_type)) { - scoped_refptr<Decoder> decoder = - CreateFilter<Decoder, DemuxerStream>(filter_factory, stream); - if (PipelineOk()) { - DCHECK(decoder); - CreateFilter<Renderer, Decoder>(filter_factory, decoder); - } - if (PipelineOk()) { - pipeline_->InsertRenderedMimeType(major_mime_type); - } - break; - } - } -} - - -// Task runs as a result of a filter calling InitializationComplete. If for -// some reason StopTask has been executed prior to this, the host_initializing_ -// member will be NULL, and the message loop will have been quit already, so -// we don't want to do it again. -void PipelineThread::InitializationCompleteTask(FilterHostImpl* host) { - if (host == host_initializing_) { - host_initializing_ = NULL; - message_loop()->Quit(); - } else { - DCHECK(!host_initializing_); - } } void PipelineThread::SetPlaybackRateTask(float rate) { + DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); + pipeline_->InternalSetPlaybackRate(rate); for (FilterHostVector::iterator iter = filter_hosts_.begin(); iter != filter_hosts_.end(); @@ -574,6 +570,8 @@ void PipelineThread::SetPlaybackRateTask(float rate) { void PipelineThread::SeekTask(base::TimeDelta time, PipelineCallback* seek_callback) { + DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); + for (FilterHostVector::iterator iter = filter_hosts_.begin(); iter != filter_hosts_.end(); ++iter) { @@ -595,6 +593,8 @@ void PipelineThread::SeekTask(base::TimeDelta time, } void PipelineThread::SetVolumeTask(float volume) { + DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); + pipeline_->volume_ = volume; scoped_refptr<AudioRenderer> audio_renderer; GetFilter(&audio_renderer); @@ -604,6 +604,8 @@ void PipelineThread::SetVolumeTask(float volume) { } void PipelineThread::SetTimeTask() { + DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); + time_update_callback_scheduled_ = false; for (FilterHostVector::iterator iter = filter_hosts_.begin(); iter != filter_hosts_.end(); @@ -614,6 +616,8 @@ void PipelineThread::SetTimeTask() { template <class Filter> void PipelineThread::GetFilter(scoped_refptr<Filter>* filter_out) const { + DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); + *filter_out = NULL; for (FilterHostVector::const_iterator iter = filter_hosts_.begin(); iter != filter_hosts_.end() && NULL == *filter_out; @@ -623,76 +627,100 @@ void PipelineThread::GetFilter(scoped_refptr<Filter>* filter_out) const { } template <class Filter, class Source> -scoped_refptr<Filter> PipelineThread::CreateFilter( - FilterFactory* filter_factory, - Source source, - const MediaFormat& media_format) { - DCHECK(PipelineOk()); +void PipelineThread::CreateFilter(FilterFactory* filter_factory, + Source source, + const MediaFormat& media_format) { + DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); + DCHECK(IsPipelineOk()); + scoped_refptr<Filter> filter = filter_factory->Create<Filter>(media_format); if (!filter) { Error(PIPELINE_ERROR_REQUIRED_FILTER_MISSING); } else { - DCHECK(!host_initializing_); - host_initializing_ = new FilterHostImpl(this, filter.get()); - if (NULL == host_initializing_) { - Error(PIPELINE_ERROR_OUT_OF_MEMORY); - } else { - // Create a dedicated thread for this filter. - if (SupportsSetMessageLoop<Filter>()) { - // TODO(scherkus): figure out a way to name these threads so it matches - // the filter type. - scoped_ptr<base::Thread> thread(new base::Thread("FilterThread")); - if (!thread.get() || !thread->Start()) { - NOTREACHED() << "Could not start filter thread"; - Error(PIPELINE_ERROR_INITIALIZATION_FAILED); - } else { - filter->SetMessageLoop(thread->message_loop()); - filter_threads_.push_back(thread.release()); - } + scoped_ptr<FilterHostImpl> host(new FilterHostImpl(this, filter.get())); + // Create a dedicated thread for this filter. + if (SupportsSetMessageLoop<Filter>()) { + // TODO(scherkus): figure out a way to name these threads so it matches + // the filter type. + scoped_ptr<base::Thread> thread(new base::Thread("FilterThread")); + if (!thread.get() || !thread->Start()) { + NOTREACHED() << "Could not start filter thread"; + Error(PIPELINE_ERROR_INITIALIZATION_FAILED); + } else { + filter->SetMessageLoop(thread->message_loop()); + filter_threads_.push_back(thread.release()); } + } - // Creating a thread could have failed, verify we're still OK. - if (PipelineOk()) { - filter_hosts_.push_back(host_initializing_); - filter->SetFilterHost(host_initializing_); - if (!filter->Initialize(source)) { - Error(PIPELINE_ERROR_INITIALIZATION_FAILED); - } + // Creating a thread could have failed, verify we're still OK. + if (IsPipelineOk()) { + filter_hosts_.push_back(host.get()); + filter->SetFilterHost(host.release()); + if (!filter->Initialize(source)) { + Error(PIPELINE_ERROR_INITIALIZATION_FAILED); } } } - if (PipelineOk()) { - // Now we run the thread's message loop recursively. We want all - // pending tasks to be processed, so we set nestable tasks to be allowed - // and then run the loop. The only way we exit the loop is as the result - // of a call to FilterHost::InitializationComplete, FilterHost::Error, or - // Pipeline::Stop. In each of these cases, the corresponding task method - // sets host_initializing_ to NULL to signal that the message loop's Quit - // method has already been called, and then calls message_loop()->Quit(). - // The setting of |host_initializing_| to NULL in the task prevents a - // subsequent task from accidentally quitting the wrong (non-nested) loop. - message_loop()->SetNestableTasksAllowed(true); - message_loop()->Run(); - message_loop()->SetNestableTasksAllowed(false); - DCHECK(!host_initializing_); - } else { - // This could still be set if we never ran the message loop (for example, - // if the fiter returned false from it's Initialize() method), so make sure - // to reset it. - host_initializing_ = NULL; - } - if (!PipelineOk()) { - filter = NULL; - } - return filter; } -scoped_refptr<DataSource> PipelineThread::CreateDataSource( - FilterFactory* filter_factory, const std::string& url) { +void PipelineThread::CreateDataSource() { + DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); + DCHECK(IsPipelineOk()); + MediaFormat url_format; url_format.SetAsString(MediaFormat::kMimeType, mime_type::kURL); - url_format.SetAsString(MediaFormat::kURL, url); - return CreateFilter<DataSource>(filter_factory, url, url_format); + url_format.SetAsString(MediaFormat::kURL, url_); + CreateFilter<DataSource>(filter_factory_, url_, url_format); +} + +void PipelineThread::CreateDemuxer() { + DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); + DCHECK(IsPipelineOk()); + + scoped_refptr<DataSource> data_source; + GetFilter(&data_source); + DCHECK(data_source); + CreateFilter<Demuxer, DataSource>(filter_factory_, data_source); +} + +template <class Decoder> +bool PipelineThread::CreateDecoder() { + DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); + DCHECK(IsPipelineOk()); + + scoped_refptr<Demuxer> demuxer; + GetFilter(&demuxer); + DCHECK(demuxer); + + const std::string major_mime_type = Decoder::major_mime_type(); + const int num_outputs = demuxer->GetNumberOfStreams(); + for (int i = 0; i < num_outputs; ++i) { + scoped_refptr<DemuxerStream> stream = demuxer->GetStream(i); + std::string value; + if (stream->media_format().GetAsString(MediaFormat::kMimeType, &value) && + 0 == value.compare(0, major_mime_type.length(), major_mime_type)) { + CreateFilter<Decoder, DemuxerStream>(filter_factory_, stream); + return true; + } + } + return false; +} + +template <class Decoder, class Renderer> +bool PipelineThread::CreateRenderer() { + DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); + DCHECK(IsPipelineOk()); + + scoped_refptr<Decoder> decoder; + GetFilter(&decoder); + + if (decoder) { + // If the decoder was created. + const std::string major_mime_type = Decoder::major_mime_type(); + CreateFilter<Renderer, Decoder>(filter_factory_, decoder); + return true; + } + return false; } // Called as a result of destruction of the thread. diff --git a/media/base/pipeline_impl.h b/media/base/pipeline_impl.h index 07d9c05..5e348d5 100644 --- a/media/base/pipeline_impl.h +++ b/media/base/pipeline_impl.h @@ -161,8 +161,25 @@ class PipelineImpl : public Pipeline { // The PipelineThread contains most of the logic involved with running the -// media pipeline. Filters are created and called on a dedicated thread owned -// by this object. +// media pipeline. Filters are created and called on a dedicated thread owned +// by this object. This object works like a state machine to perform +// asynchronous initialization. Initialization is done in multiple passes in +// StartTask(). In each pass a different filter is created and chained with a +// previously created filter. +// +// Here's a state diagram that describes the lifetime of this object. +// +// [ *Created ] -> [ InitDataSource ] -> [ InitDemuxer ] -> +// [ InitAudioDecoder ] -> [ InitAudioRenderer ] -> +// [ InitVideoDecoder ] -> [ InitVideoRenderer ] -> [ Started ] +// | | | +// .-> [ Error ] .-> [ Stopped ] <-. +// +// Initialization is a series of state transitions from "Created" to +// "Started". If any error happens during initialization, this object will +// transition to the "Error" state from any state. If Stop() is called during +// initialization, this object will transition to "Stopped" state. + class PipelineThread : public base::RefCountedThreadSafe<PipelineThread>, public MessageLoop::DestructionObserver { public: @@ -171,6 +188,9 @@ class PipelineThread : public base::RefCountedThreadSafe<PipelineThread>, // pipeline thread. For example, Seek posts a task to call SeekTask. explicit PipelineThread(PipelineImpl* pipeline); + // After Start() is called, a task of StartTask() is posted on the pipeline + // thread to perform initialization. See StartTask() to learn more about + // initialization. bool Start(FilterFactory* filter_factory, const std::string& url_media_source, PipelineCallback* init_complete_callback); @@ -211,6 +231,19 @@ class PipelineThread : public base::RefCountedThreadSafe<PipelineThread>, PlatformThreadId thread_id() const { return thread_.thread_id(); } private: + enum State { + kCreated, + kInitDataSource, + kInitDemuxer, + kInitAudioDecoder, + kInitAudioRenderer, + kInitVideoDecoder, + kInitVideoRenderer, + kStarted, + kStopped, + kError, + }; + // Implementation of MessageLoop::DestructionObserver. StartTask registers // this class as a destruction observer on the thread's message loop. // It is used to destroy the list of FilterHosts @@ -222,21 +255,32 @@ class PipelineThread : public base::RefCountedThreadSafe<PipelineThread>, virtual ~PipelineThread(); // Simple method used to make sure the pipeline is running normally. - bool PipelineOk() { return PIPELINE_OK == pipeline_->error_; } + bool IsPipelineOk() { return PIPELINE_OK == pipeline_->error_; } + + // Helper method to tell whether we are in the state of initializing. + bool IsPipelineInitializing() { + return state_ == kInitDataSource || + state_ == kInitDemuxer || + state_ == kInitAudioDecoder || + state_ == kInitAudioRenderer || + state_ == kInitVideoDecoder || + state_ == kInitVideoRenderer; + } // The following "task" methods correspond to the public methods, but these // methods are run as the result of posting a task to the PipelineThread's - // message loop. For example, the Start method posts a task to call the - // StartTask message on the pipeline thread. - void StartTask(FilterFactory* filter_factory, - const std::string& url, - PipelineCallback* init_complete_callback); + // message loop. + + // StartTask() is a special task that performs initialization in multiple + // passes. It is executed as a result of calling Start() or + // InitializationComplete() that advances initialization to the next state. It + // works as a hub of state transition for initialization. + void StartTask(); void StopTask(); void SetPlaybackRateTask(float rate); void SeekTask(base::TimeDelta time, PipelineCallback* seek_callback); void SetVolumeTask(float volume); void SetTimeTask(); - void InitializationCompleteTask(FilterHostImpl* FilterHost); // Internal methods used in the implementation of the pipeline thread. All // of these methods are only called on the pipeline thread. @@ -244,36 +288,34 @@ class PipelineThread : public base::RefCountedThreadSafe<PipelineThread>, // Calls the Stop method on every filter in the pipeline void StopFilters(); - // The following template funcions make use of the fact that media filter + // The following template functions make use of the fact that media filter // derived interfaces are self-describing in the sense that they all contain // the static method filter_type() which returns a FilterType enum that // uniquely identifies the filter's interface. In addition, filters that are // specific to audio or video also support a static method major_mime_type() // which returns a string of "audio/" or "video/". - + // // Uses the FilterFactory to create a new filter of the Filter class, and - // initiaializes it using the Source object. The source may be another filter + // initializes it using the Source object. The source may be another filter // or it could be a string in the case of a DataSource. // - // The CreateFilter method actually does much more than simply creating the + // The CreateFilter() method actually does much more than simply creating the // filter. It creates the FilterHostImpl object, creates the filter using - // the filter factory, calls the MediaFilter::SetHost method on the filter, + // the filter factory, calls the MediaFilter::SetHost() method on the filter, // and then calls the filter's type-specific Initialize(source) method to - // initialize the filter. It then runs the thread's message loop and waits - // until one of the following occurs: - // 1. The filter calls FilterHost::InitializationComplete() - // 2. A filter calls FilterHost::Error() - // 3. The client calls Pipeline::Stop() + // initialize the filter. If the required filter cannot be created, + // PIPELINE_ERROR_REQUIRED_FILTER_MISSING is raised, initialization is halted + // and this object will remain in the "Error" state. // // Callers can optionally use the returned Filter for further processing, // but since the call already placed the filter in the list of filter hosts, // callers can ignore the return value. In any case, if this function can - // not create and initailze the speified Filter, then this method will return - // with |pipeline_->error_| != PIPELINE_OK. + // not create and initializes the specified Filter, then this method will + // return with |pipeline_->error_| != PIPELINE_OK. template <class Filter, class Source> - scoped_refptr<Filter> CreateFilter(FilterFactory* filter_factory, - Source source, - const MediaFormat& source_media_format); + void CreateFilter(FilterFactory* filter_factory, + Source source, + const MediaFormat& source_media_format); // Creates a Filter and initilizes it with the given |source|. If a Filter // could not be created or an error occurred, this metod returns NULL and the @@ -281,28 +323,30 @@ class PipelineThread : public base::RefCountedThreadSafe<PipelineThread>, // the Source could be a filter or a DemuxerStream, but it must support the // GetMediaFormat() method. template <class Filter, class Source> - scoped_refptr<Filter> CreateFilter(FilterFactory* filter_factory, - Source* source) { - return CreateFilter<Filter, Source*>(filter_factory, - source, - source->media_format()); + void CreateFilter(FilterFactory* filter_factory, Source* source) { + CreateFilter<Filter, Source*>(filter_factory, + source, + source->media_format()); } - // Creates a DataSource (the first filter in a pipeline), and initializes it - // with the specified URL. - scoped_refptr<DataSource> CreateDataSource(FilterFactory* filter_factory, - const std::string& url); - - // If the |demuxer| contains a stream that matches Decoder::major_media_type() - // this method creates and initializes the specified Decoder and Renderer. - // Callers should examine the |pipeline_->error_| member to see if there was - // an error duing the call. The lack of the specified stream does not - // constitute an error, and no Decoder or Renderer will be created if the - // data stream does not exist in the |demuxer|. If a stream is rendered, then - // this method will call |pipeline_|->InsertRenderedMimeType() to add the - // mime type to the set of rendered major mime types for the pipeline. + // Creates a DataSource (the first filter in a pipeline). + void CreateDataSource(); + + // Creates a Demuxer. + void CreateDemuxer(); + + // Creates a decoder of type Decoder. Returns true if the asynchronous action + // of creating decoder has started. Returns false if this method did nothing + // because the corresponding audio/video stream does not exist. + template <class Decoder> + bool CreateDecoder(); + + // Creates a renderer of type Renderer and connects it with Decoder. Returns + // true if the asynchronous action of creating renderer has started. Returns + // false if this method did nothing because the corresponding audio/video + // stream does not exist. template <class Decoder, class Renderer> - void Render(FilterFactory* filter_factory, Demuxer* demuxer); + bool CreateRenderer(); // Examine the list of existing filters to find one that supports the // specified Filter interface. If one exists, the |filter_out| will contain @@ -321,13 +365,17 @@ class PipelineThread : public base::RefCountedThreadSafe<PipelineThread>, // loop's queue. bool time_update_callback_scheduled_; - // During initialization of a filter, this member points to the FilterHostImpl - // that is being initialized. - FilterHostImpl* host_initializing_; + // Member that tracks the current state. + State state_; + + // Filter factory as passed in by Start(). + scoped_refptr<FilterFactory> filter_factory_; + + // URL for the data source as passed in by Start(). + std::string url_; - // This lock is held through the entire StartTask method to prevent the - // Stop method from quitting the nested message loop of the StartTask method. - Lock initialization_lock_; + // Initialization callback as passed in by Start(). + scoped_ptr<PipelineCallback> init_callback_; // Vector of FilterHostImpl objects that contian the filters for the pipeline. typedef std::vector<FilterHostImpl*> FilterHostVector; diff --git a/media/base/pipeline_impl_unittest.cc b/media/base/pipeline_impl_unittest.cc index f28ee1c..56aa246 100644 --- a/media/base/pipeline_impl_unittest.cc +++ b/media/base/pipeline_impl_unittest.cc @@ -200,6 +200,21 @@ TEST_F(PipelineImplTest, Seek) { EXPECT_TRUE(expected == filters_->video_renderer()->seek_time()); } +// Try to execute Start()/Stop() on the Pipeline many times and very fast. This +// test is trying to simulate the situation where the pipeline can get dead +// locked very easily by quickly calling Start()/Stop(). +TEST_F(PipelineImplTest, StressTestPipelineStartStop) { + media::old_mocks::MockFilterConfig config; + const int kTimes = 1000; + for (int i = 0; i < kTimes; ++i) { + scoped_refptr<media::old_mocks::MockFilterFactory> factory = + new media::old_mocks::MockFilterFactory(&config); + media::PipelineImpl pipeline; + pipeline.Start(factory.get(), "", NULL); + pipeline.Stop(); + } +} + // TODO(ralphl): Add a unit test that makes sure that the mock audio filter // is actually called on a SetVolume() call to the pipeline. I almost checked // in code that broke this, but all unit tests were passing. |