summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--media/base/pipeline_impl.cc392
-rw-r--r--media/base/pipeline_impl.h146
-rw-r--r--media/base/pipeline_impl_unittest.cc15
3 files changed, 322 insertions, 231 deletions
diff --git a/media/base/pipeline_impl.cc b/media/base/pipeline_impl.cc
index 5ed70e3..16148d3 100644
--- a/media/base/pipeline_impl.cc
+++ b/media/base/pipeline_impl.cc
@@ -3,7 +3,7 @@
// found in the LICENSE file.
//
// TODO(scherkus): clean up PipelineImpl... too many crazy function names,
-// potential deadlocks, nested message loops, etc...
+// potential deadlocks, etc...
#include "base/compiler_specific.h"
#include "base/condition_variable.h"
@@ -289,46 +289,37 @@ PipelineThread::PipelineThread(PipelineImpl* pipeline)
: pipeline_(pipeline),
thread_("PipelineThread"),
time_update_callback_scheduled_(false),
- host_initializing_(NULL) {
+ state_(kCreated) {
}
PipelineThread::~PipelineThread() {
Stop();
+ DCHECK(state_ == kStopped || state_ == kError);
}
// This method is called on the client's thread. It starts the pipeline's
-// dedicated thread and posts a task to call the StartTask method on that
+// dedicated thread and posts a task to call the StartTask() method on that
// thread.
bool PipelineThread::Start(FilterFactory* filter_factory,
const std::string& url,
PipelineCallback* init_complete_callback) {
+ DCHECK_EQ(kCreated, state_);
if (thread_.Start()) {
- filter_factory->AddRef();
- PostTask(NewRunnableMethod(this,
- &PipelineThread::StartTask,
- filter_factory,
- url,
- // TODO(ralphl): what happens to this callback?
- // is it copied by NewRunnableTask? Just pointer
- // or is the callback itself copied?
- init_complete_callback));
+ filter_factory_ = filter_factory;
+ url_ = url;
+ init_callback_.reset(init_complete_callback);
+ PostTask(NewRunnableMethod(this, &PipelineThread::StartTask));
return true;
}
return false;
}
// Called on the client's thread. If the thread has been started, then posts
-// a task to call the StopTask method, then waits until the thread has stopped.
-// There is a critical section that wraps the entire duration of the StartTask
-// method. This method waits for that Lock to be released so that we know
-// that the thread is not executing a nested message loop. This way we know
-// that that Thread::Stop call will quit the appropriate message loop.
-//
-// TODO(scherkus): this can potentially deadlock, hack away our lock usage!!
+// a task to call the StopTask() method, then waits until the thread has
+// stopped.
void PipelineThread::Stop() {
if (thread_.IsRunning()) {
PostTask(NewRunnableMethod(this, &PipelineThread::StopTask));
- AutoLock lock_crit(initialization_lock_);
thread_.Stop();
}
DCHECK(filter_hosts_.empty());
@@ -352,19 +343,11 @@ void PipelineThread::SetVolume(float volume) {
PostTask(NewRunnableMethod(this, &PipelineThread::SetVolumeTask, volume));
}
-// May be called on any thread, and therefore we always assume the worst
-// possible race condition. This could, for example, be called from a filter's
-// thread just as the pipeline thread is exiting the call to the filter's
-// Initialize() method. Therefore, we make NO assumptions, and post work
-// in every case, even the trivial one of a thread calling this method from
-// within it's Initialize method. This means that we will always run a nested
-// message loop, and the InitializationCompleteTask will Quit that loop
-// immediately in the trivial case.
void PipelineThread::InitializationComplete(FilterHostImpl* host) {
- DCHECK(host == host_initializing_);
- PostTask(NewRunnableMethod(this,
- &PipelineThread::InitializationCompleteTask,
- host));
+ if (IsPipelineOk()) {
+ // Continue the start task by proceeding to the next stage.
+ PostTask(NewRunnableMethod(this, &PipelineThread::StartTask));
+ }
}
// Called from any thread. Updates the pipeline time and schedules a task to
@@ -377,9 +360,9 @@ void PipelineThread::SetTime(base::TimeDelta time) {
}
}
-// Called from any thread. Sets the pipeline error_ member and schedules a
+// Called from any thread. Sets the pipeline |error_| member and schedules a
// task to stop all the filters in the pipeline. Note that the thread will
-// continue to run until the client calls Pipeline::Stop, but nothing will
+// continue to run until the client calls Pipeline::Stop(), but nothing will
// be processed since filters will not be able to post tasks.
void PipelineThread::Error(PipelineError error) {
// If this method returns false, then an error has already happened, so no
@@ -389,67 +372,106 @@ void PipelineThread::Error(PipelineError error) {
}
}
-// Called from any thread. Used by FilterHostImpl::PostTask method and used
-// internally.
+// This is a helper method to post task on message_loop(). This method is only
+// called from this class or from Pipeline.
void PipelineThread::PostTask(Task* task) {
message_loop()->PostTask(FROM_HERE, task);
}
// Main initialization method called on the pipeline thread. This code attempts
-// to use the specified filter factory to build a pipeline. It starts by
-// creating a DataSource, connects it to a Demuxer, and then connects the
-// Demuxer's audio stream to an AudioDecoder which is then connected to an
-// AudioRenderer. If the media has video, then it connects a VideoDecoder to
-// the Demuxer's video stream, and then connects the VideoDecoder to a
-// VideoRenderer. When all required filters have been created and have called
-// their FilterHost's InitializationComplete method, the pipeline's
-// initialized_ member is set to true, and, if the client provided an
-// init_complete_callback, it is called with "true".
-// If initializatoin fails, the client's callback will still be called, but
+// to use the specified filter factory to build a pipeline.
+// Initialization step performed in this method depends on current state of this
+// object, indicated by |state_|. After each step of initialization, this
+// object transits to the next stage. It starts by creating a DataSource,
+// connects it to a Demuxer, and then connects the Demuxer's audio stream to an
+// AudioDecoder which is then connected to an AudioRenderer. If the media has
+// video, then it connects a VideoDecoder to the Demuxer's video stream, and
+// then connects the VideoDecoder to a VideoRenderer.
+//
+// When all required filters have been created and have called their
+// FilterHost's InitializationComplete method, the pipeline's |initialized_|
+// member is set to true, and, if the client provided an
+// |init_complete_callback_|, it is called with "true".
+//
+// If initialization fails, the client's callback will still be called, but
// the bool parameter passed to it will be false.
//
-// Note that at each step in this process, the initialization of any filter
-// may require running the pipeline thread's message loop recursively. This is
-// handled by the CreateFilter method.
-void PipelineThread::StartTask(FilterFactory* filter_factory,
- const std::string& url,
- PipelineCallback* init_complete_callback) {
- // During the entire StartTask we hold the initialization_lock_ so that
- // if the client calls the Pipeline::Stop method while we are running a
- // nested message loop, we can correctly unwind out of it before calling
- // the Thread::Stop method.
- AutoLock auto_lock(initialization_lock_);
-
- // Add ourselves as a destruction observer of the thread's message loop so
- // we can delete filters at an appropriate time (when all tasks have been
- // processed and the thread is about to be destroyed).
- message_loop()->AddDestructionObserver(this);
-
- scoped_refptr<DataSource> data_source = CreateDataSource(filter_factory, url);
- if (PipelineOk()) {
- scoped_refptr<Demuxer> demuxer =
- CreateFilter<Demuxer, DataSource>(filter_factory, data_source);
- if (PipelineOk()) {
- Render<AudioDecoder, AudioRenderer>(filter_factory, demuxer);
- }
- if (PipelineOk()) {
- Render<VideoDecoder, VideoRenderer>(filter_factory, demuxer);
+// TODO(hclam): StartTask is now starting the pipeline asynchronously. It
+// works like a big state change table. If we no longer need to start filters
+// in order, we need to get rid of all the state change.
+void PipelineThread::StartTask() {
+ DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id());
+
+ // If we have received the stop signal, return immediately.
+ if (state_ == kStopped)
+ return;
+
+ DCHECK(state_ == kCreated || IsPipelineInitializing());
+
+ // Just created, create data source.
+ if (state_ == kCreated) {
+ message_loop()->AddDestructionObserver(this);
+ state_ = kInitDataSource;
+ CreateDataSource();
+ return;
+ }
+
+ // Data source created, create demuxer.
+ if (state_ == kInitDataSource) {
+ state_ = kInitDemuxer;
+ CreateDemuxer();
+ return;
+ }
+
+ // Demuxer created, create audio decoder.
+ if (state_ == kInitDemuxer) {
+ state_ = kInitAudioDecoder;
+ // If this method returns false, then there's no audio stream.
+ if (CreateDecoder<AudioDecoder>())
+ return;
+ }
+
+ // Assuming audio decoder was created, create audio renderer.
+ if (state_ == kInitAudioDecoder) {
+ state_ = kInitAudioRenderer;
+ // Returns false if there's no audio stream.
+ if (CreateRenderer<AudioDecoder, AudioRenderer>()) {
+ pipeline_->InsertRenderedMimeType(AudioDecoder::major_mime_type());
+ return;
}
}
- if (PipelineOk() && pipeline_->rendered_mime_types_.empty()) {
- Error(PIPELINE_ERROR_COULD_NOT_RENDER);
+ // Assuming audio renderer was created, create video decoder.
+ if (state_ == kInitAudioRenderer) {
+ // Then perform the stage of initialization, i.e. initialize video decoder.
+ state_ = kInitVideoDecoder;
+ if (CreateDecoder<VideoDecoder>())
+ return;
}
- pipeline_->initialized_ = PipelineOk();
+ // Assuming video decoder was created, create video renderer.
+ if (state_ == kInitVideoDecoder) {
+ state_ = kInitVideoRenderer;
+ if (CreateRenderer<VideoDecoder, VideoRenderer>()) {
+ pipeline_->InsertRenderedMimeType(VideoDecoder::major_mime_type());
+ return;
+ }
+ }
+
+ if (state_ == kInitVideoRenderer) {
+ if (!IsPipelineOk() || pipeline_->rendered_mime_types_.empty()) {
+ Error(PIPELINE_ERROR_COULD_NOT_RENDER);
+ return;
+ }
- // No matter what, we're done with the filter factory, and
- // client callback so get rid of them.
- filter_factory->Release();
- if (init_complete_callback) {
- init_complete_callback->Run(pipeline_->initialized_);
- delete init_complete_callback;
+ state_ = kStarted;
+ pipeline_->initialized_ = true;
+ filter_factory_ = NULL;
+ if (init_callback_.get()) {
+ init_callback_->Run(true);
+ init_callback_.reset();
+ }
}
}
@@ -462,7 +484,22 @@ void PipelineThread::StartTask(FilterFactory* filter_factory,
// Stop() tasks even if we've already stopped. Perhaps this should no-op for
// additional calls, however most of this logic will be changing.
void PipelineThread::StopTask() {
- if (PipelineOk()) {
+ DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id());
+
+ if (IsPipelineInitializing()) {
+ // If IsPipelineOk() is true, the pipeline was simply stopped during
+ // initialization. Otherwise it is a failure.
+ state_ = IsPipelineOk() ? kStopped : kError;
+ filter_factory_ = NULL;
+ if (init_callback_.get()) {
+ init_callback_->Run(false);
+ init_callback_.reset();
+ }
+ } else {
+ state_ = kStopped;
+ }
+
+ if (IsPipelineOk()) {
pipeline_->error_ = PIPELINE_STOPPING;
}
@@ -518,52 +555,11 @@ void PipelineThread::StopTask() {
++iter) {
(*iter)->Stop();
}
-
- if (host_initializing_) {
- host_initializing_ = NULL;
- message_loop()->Quit();
- }
-}
-
-template <class Decoder, class Renderer>
-void PipelineThread::Render(FilterFactory* filter_factory, Demuxer* demuxer) {
- DCHECK(PipelineOk());
- const std::string major_mime_type = Decoder::major_mime_type();
- const int num_outputs = demuxer->GetNumberOfStreams();
- for (int i = 0; i < num_outputs; ++i) {
- scoped_refptr<DemuxerStream> stream = demuxer->GetStream(i);
- std::string value;
- if (stream->media_format().GetAsString(MediaFormat::kMimeType, &value) &&
- 0 == value.compare(0, major_mime_type.length(), major_mime_type)) {
- scoped_refptr<Decoder> decoder =
- CreateFilter<Decoder, DemuxerStream>(filter_factory, stream);
- if (PipelineOk()) {
- DCHECK(decoder);
- CreateFilter<Renderer, Decoder>(filter_factory, decoder);
- }
- if (PipelineOk()) {
- pipeline_->InsertRenderedMimeType(major_mime_type);
- }
- break;
- }
- }
-}
-
-
-// Task runs as a result of a filter calling InitializationComplete. If for
-// some reason StopTask has been executed prior to this, the host_initializing_
-// member will be NULL, and the message loop will have been quit already, so
-// we don't want to do it again.
-void PipelineThread::InitializationCompleteTask(FilterHostImpl* host) {
- if (host == host_initializing_) {
- host_initializing_ = NULL;
- message_loop()->Quit();
- } else {
- DCHECK(!host_initializing_);
- }
}
void PipelineThread::SetPlaybackRateTask(float rate) {
+ DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id());
+
pipeline_->InternalSetPlaybackRate(rate);
for (FilterHostVector::iterator iter = filter_hosts_.begin();
iter != filter_hosts_.end();
@@ -574,6 +570,8 @@ void PipelineThread::SetPlaybackRateTask(float rate) {
void PipelineThread::SeekTask(base::TimeDelta time,
PipelineCallback* seek_callback) {
+ DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id());
+
for (FilterHostVector::iterator iter = filter_hosts_.begin();
iter != filter_hosts_.end();
++iter) {
@@ -595,6 +593,8 @@ void PipelineThread::SeekTask(base::TimeDelta time,
}
void PipelineThread::SetVolumeTask(float volume) {
+ DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id());
+
pipeline_->volume_ = volume;
scoped_refptr<AudioRenderer> audio_renderer;
GetFilter(&audio_renderer);
@@ -604,6 +604,8 @@ void PipelineThread::SetVolumeTask(float volume) {
}
void PipelineThread::SetTimeTask() {
+ DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id());
+
time_update_callback_scheduled_ = false;
for (FilterHostVector::iterator iter = filter_hosts_.begin();
iter != filter_hosts_.end();
@@ -614,6 +616,8 @@ void PipelineThread::SetTimeTask() {
template <class Filter>
void PipelineThread::GetFilter(scoped_refptr<Filter>* filter_out) const {
+ DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id());
+
*filter_out = NULL;
for (FilterHostVector::const_iterator iter = filter_hosts_.begin();
iter != filter_hosts_.end() && NULL == *filter_out;
@@ -623,76 +627,100 @@ void PipelineThread::GetFilter(scoped_refptr<Filter>* filter_out) const {
}
template <class Filter, class Source>
-scoped_refptr<Filter> PipelineThread::CreateFilter(
- FilterFactory* filter_factory,
- Source source,
- const MediaFormat& media_format) {
- DCHECK(PipelineOk());
+void PipelineThread::CreateFilter(FilterFactory* filter_factory,
+ Source source,
+ const MediaFormat& media_format) {
+ DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id());
+ DCHECK(IsPipelineOk());
+
scoped_refptr<Filter> filter = filter_factory->Create<Filter>(media_format);
if (!filter) {
Error(PIPELINE_ERROR_REQUIRED_FILTER_MISSING);
} else {
- DCHECK(!host_initializing_);
- host_initializing_ = new FilterHostImpl(this, filter.get());
- if (NULL == host_initializing_) {
- Error(PIPELINE_ERROR_OUT_OF_MEMORY);
- } else {
- // Create a dedicated thread for this filter.
- if (SupportsSetMessageLoop<Filter>()) {
- // TODO(scherkus): figure out a way to name these threads so it matches
- // the filter type.
- scoped_ptr<base::Thread> thread(new base::Thread("FilterThread"));
- if (!thread.get() || !thread->Start()) {
- NOTREACHED() << "Could not start filter thread";
- Error(PIPELINE_ERROR_INITIALIZATION_FAILED);
- } else {
- filter->SetMessageLoop(thread->message_loop());
- filter_threads_.push_back(thread.release());
- }
+ scoped_ptr<FilterHostImpl> host(new FilterHostImpl(this, filter.get()));
+ // Create a dedicated thread for this filter.
+ if (SupportsSetMessageLoop<Filter>()) {
+ // TODO(scherkus): figure out a way to name these threads so it matches
+ // the filter type.
+ scoped_ptr<base::Thread> thread(new base::Thread("FilterThread"));
+ if (!thread.get() || !thread->Start()) {
+ NOTREACHED() << "Could not start filter thread";
+ Error(PIPELINE_ERROR_INITIALIZATION_FAILED);
+ } else {
+ filter->SetMessageLoop(thread->message_loop());
+ filter_threads_.push_back(thread.release());
}
+ }
- // Creating a thread could have failed, verify we're still OK.
- if (PipelineOk()) {
- filter_hosts_.push_back(host_initializing_);
- filter->SetFilterHost(host_initializing_);
- if (!filter->Initialize(source)) {
- Error(PIPELINE_ERROR_INITIALIZATION_FAILED);
- }
+ // Creating a thread could have failed, verify we're still OK.
+ if (IsPipelineOk()) {
+ filter_hosts_.push_back(host.get());
+ filter->SetFilterHost(host.release());
+ if (!filter->Initialize(source)) {
+ Error(PIPELINE_ERROR_INITIALIZATION_FAILED);
}
}
}
- if (PipelineOk()) {
- // Now we run the thread's message loop recursively. We want all
- // pending tasks to be processed, so we set nestable tasks to be allowed
- // and then run the loop. The only way we exit the loop is as the result
- // of a call to FilterHost::InitializationComplete, FilterHost::Error, or
- // Pipeline::Stop. In each of these cases, the corresponding task method
- // sets host_initializing_ to NULL to signal that the message loop's Quit
- // method has already been called, and then calls message_loop()->Quit().
- // The setting of |host_initializing_| to NULL in the task prevents a
- // subsequent task from accidentally quitting the wrong (non-nested) loop.
- message_loop()->SetNestableTasksAllowed(true);
- message_loop()->Run();
- message_loop()->SetNestableTasksAllowed(false);
- DCHECK(!host_initializing_);
- } else {
- // This could still be set if we never ran the message loop (for example,
- // if the fiter returned false from it's Initialize() method), so make sure
- // to reset it.
- host_initializing_ = NULL;
- }
- if (!PipelineOk()) {
- filter = NULL;
- }
- return filter;
}
-scoped_refptr<DataSource> PipelineThread::CreateDataSource(
- FilterFactory* filter_factory, const std::string& url) {
+void PipelineThread::CreateDataSource() {
+ DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id());
+ DCHECK(IsPipelineOk());
+
MediaFormat url_format;
url_format.SetAsString(MediaFormat::kMimeType, mime_type::kURL);
- url_format.SetAsString(MediaFormat::kURL, url);
- return CreateFilter<DataSource>(filter_factory, url, url_format);
+ url_format.SetAsString(MediaFormat::kURL, url_);
+ CreateFilter<DataSource>(filter_factory_, url_, url_format);
+}
+
+void PipelineThread::CreateDemuxer() {
+ DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id());
+ DCHECK(IsPipelineOk());
+
+ scoped_refptr<DataSource> data_source;
+ GetFilter(&data_source);
+ DCHECK(data_source);
+ CreateFilter<Demuxer, DataSource>(filter_factory_, data_source);
+}
+
+template <class Decoder>
+bool PipelineThread::CreateDecoder() {
+ DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id());
+ DCHECK(IsPipelineOk());
+
+ scoped_refptr<Demuxer> demuxer;
+ GetFilter(&demuxer);
+ DCHECK(demuxer);
+
+ const std::string major_mime_type = Decoder::major_mime_type();
+ const int num_outputs = demuxer->GetNumberOfStreams();
+ for (int i = 0; i < num_outputs; ++i) {
+ scoped_refptr<DemuxerStream> stream = demuxer->GetStream(i);
+ std::string value;
+ if (stream->media_format().GetAsString(MediaFormat::kMimeType, &value) &&
+ 0 == value.compare(0, major_mime_type.length(), major_mime_type)) {
+ CreateFilter<Decoder, DemuxerStream>(filter_factory_, stream);
+ return true;
+ }
+ }
+ return false;
+}
+
+template <class Decoder, class Renderer>
+bool PipelineThread::CreateRenderer() {
+ DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id());
+ DCHECK(IsPipelineOk());
+
+ scoped_refptr<Decoder> decoder;
+ GetFilter(&decoder);
+
+ if (decoder) {
+ // If the decoder was created.
+ const std::string major_mime_type = Decoder::major_mime_type();
+ CreateFilter<Renderer, Decoder>(filter_factory_, decoder);
+ return true;
+ }
+ return false;
}
// Called as a result of destruction of the thread.
diff --git a/media/base/pipeline_impl.h b/media/base/pipeline_impl.h
index 07d9c05..5e348d5 100644
--- a/media/base/pipeline_impl.h
+++ b/media/base/pipeline_impl.h
@@ -161,8 +161,25 @@ class PipelineImpl : public Pipeline {
// The PipelineThread contains most of the logic involved with running the
-// media pipeline. Filters are created and called on a dedicated thread owned
-// by this object.
+// media pipeline. Filters are created and called on a dedicated thread owned
+// by this object. This object works like a state machine to perform
+// asynchronous initialization. Initialization is done in multiple passes in
+// StartTask(). In each pass a different filter is created and chained with a
+// previously created filter.
+//
+// Here's a state diagram that describes the lifetime of this object.
+//
+// [ *Created ] -> [ InitDataSource ] -> [ InitDemuxer ] ->
+// [ InitAudioDecoder ] -> [ InitAudioRenderer ] ->
+// [ InitVideoDecoder ] -> [ InitVideoRenderer ] -> [ Started ]
+// | | |
+// .-> [ Error ] .-> [ Stopped ] <-.
+//
+// Initialization is a series of state transitions from "Created" to
+// "Started". If any error happens during initialization, this object will
+// transition to the "Error" state from any state. If Stop() is called during
+// initialization, this object will transition to "Stopped" state.
+
class PipelineThread : public base::RefCountedThreadSafe<PipelineThread>,
public MessageLoop::DestructionObserver {
public:
@@ -171,6 +188,9 @@ class PipelineThread : public base::RefCountedThreadSafe<PipelineThread>,
// pipeline thread. For example, Seek posts a task to call SeekTask.
explicit PipelineThread(PipelineImpl* pipeline);
+ // After Start() is called, a task of StartTask() is posted on the pipeline
+ // thread to perform initialization. See StartTask() to learn more about
+ // initialization.
bool Start(FilterFactory* filter_factory,
const std::string& url_media_source,
PipelineCallback* init_complete_callback);
@@ -211,6 +231,19 @@ class PipelineThread : public base::RefCountedThreadSafe<PipelineThread>,
PlatformThreadId thread_id() const { return thread_.thread_id(); }
private:
+ enum State {
+ kCreated,
+ kInitDataSource,
+ kInitDemuxer,
+ kInitAudioDecoder,
+ kInitAudioRenderer,
+ kInitVideoDecoder,
+ kInitVideoRenderer,
+ kStarted,
+ kStopped,
+ kError,
+ };
+
// Implementation of MessageLoop::DestructionObserver. StartTask registers
// this class as a destruction observer on the thread's message loop.
// It is used to destroy the list of FilterHosts
@@ -222,21 +255,32 @@ class PipelineThread : public base::RefCountedThreadSafe<PipelineThread>,
virtual ~PipelineThread();
// Simple method used to make sure the pipeline is running normally.
- bool PipelineOk() { return PIPELINE_OK == pipeline_->error_; }
+ bool IsPipelineOk() { return PIPELINE_OK == pipeline_->error_; }
+
+ // Helper method to tell whether we are in the state of initializing.
+ bool IsPipelineInitializing() {
+ return state_ == kInitDataSource ||
+ state_ == kInitDemuxer ||
+ state_ == kInitAudioDecoder ||
+ state_ == kInitAudioRenderer ||
+ state_ == kInitVideoDecoder ||
+ state_ == kInitVideoRenderer;
+ }
// The following "task" methods correspond to the public methods, but these
// methods are run as the result of posting a task to the PipelineThread's
- // message loop. For example, the Start method posts a task to call the
- // StartTask message on the pipeline thread.
- void StartTask(FilterFactory* filter_factory,
- const std::string& url,
- PipelineCallback* init_complete_callback);
+ // message loop.
+
+ // StartTask() is a special task that performs initialization in multiple
+ // passes. It is executed as a result of calling Start() or
+ // InitializationComplete() that advances initialization to the next state. It
+ // works as a hub of state transition for initialization.
+ void StartTask();
void StopTask();
void SetPlaybackRateTask(float rate);
void SeekTask(base::TimeDelta time, PipelineCallback* seek_callback);
void SetVolumeTask(float volume);
void SetTimeTask();
- void InitializationCompleteTask(FilterHostImpl* FilterHost);
// Internal methods used in the implementation of the pipeline thread. All
// of these methods are only called on the pipeline thread.
@@ -244,36 +288,34 @@ class PipelineThread : public base::RefCountedThreadSafe<PipelineThread>,
// Calls the Stop method on every filter in the pipeline
void StopFilters();
- // The following template funcions make use of the fact that media filter
+ // The following template functions make use of the fact that media filter
// derived interfaces are self-describing in the sense that they all contain
// the static method filter_type() which returns a FilterType enum that
// uniquely identifies the filter's interface. In addition, filters that are
// specific to audio or video also support a static method major_mime_type()
// which returns a string of "audio/" or "video/".
-
+ //
// Uses the FilterFactory to create a new filter of the Filter class, and
- // initiaializes it using the Source object. The source may be another filter
+ // initializes it using the Source object. The source may be another filter
// or it could be a string in the case of a DataSource.
//
- // The CreateFilter method actually does much more than simply creating the
+ // The CreateFilter() method actually does much more than simply creating the
// filter. It creates the FilterHostImpl object, creates the filter using
- // the filter factory, calls the MediaFilter::SetHost method on the filter,
+ // the filter factory, calls the MediaFilter::SetHost() method on the filter,
// and then calls the filter's type-specific Initialize(source) method to
- // initialize the filter. It then runs the thread's message loop and waits
- // until one of the following occurs:
- // 1. The filter calls FilterHost::InitializationComplete()
- // 2. A filter calls FilterHost::Error()
- // 3. The client calls Pipeline::Stop()
+ // initialize the filter. If the required filter cannot be created,
+ // PIPELINE_ERROR_REQUIRED_FILTER_MISSING is raised, initialization is halted
+ // and this object will remain in the "Error" state.
//
// Callers can optionally use the returned Filter for further processing,
// but since the call already placed the filter in the list of filter hosts,
// callers can ignore the return value. In any case, if this function can
- // not create and initailze the speified Filter, then this method will return
- // with |pipeline_->error_| != PIPELINE_OK.
+ // not create and initializes the specified Filter, then this method will
+ // return with |pipeline_->error_| != PIPELINE_OK.
template <class Filter, class Source>
- scoped_refptr<Filter> CreateFilter(FilterFactory* filter_factory,
- Source source,
- const MediaFormat& source_media_format);
+ void CreateFilter(FilterFactory* filter_factory,
+ Source source,
+ const MediaFormat& source_media_format);
// Creates a Filter and initilizes it with the given |source|. If a Filter
// could not be created or an error occurred, this metod returns NULL and the
@@ -281,28 +323,30 @@ class PipelineThread : public base::RefCountedThreadSafe<PipelineThread>,
// the Source could be a filter or a DemuxerStream, but it must support the
// GetMediaFormat() method.
template <class Filter, class Source>
- scoped_refptr<Filter> CreateFilter(FilterFactory* filter_factory,
- Source* source) {
- return CreateFilter<Filter, Source*>(filter_factory,
- source,
- source->media_format());
+ void CreateFilter(FilterFactory* filter_factory, Source* source) {
+ CreateFilter<Filter, Source*>(filter_factory,
+ source,
+ source->media_format());
}
- // Creates a DataSource (the first filter in a pipeline), and initializes it
- // with the specified URL.
- scoped_refptr<DataSource> CreateDataSource(FilterFactory* filter_factory,
- const std::string& url);
-
- // If the |demuxer| contains a stream that matches Decoder::major_media_type()
- // this method creates and initializes the specified Decoder and Renderer.
- // Callers should examine the |pipeline_->error_| member to see if there was
- // an error duing the call. The lack of the specified stream does not
- // constitute an error, and no Decoder or Renderer will be created if the
- // data stream does not exist in the |demuxer|. If a stream is rendered, then
- // this method will call |pipeline_|->InsertRenderedMimeType() to add the
- // mime type to the set of rendered major mime types for the pipeline.
+ // Creates a DataSource (the first filter in a pipeline).
+ void CreateDataSource();
+
+ // Creates a Demuxer.
+ void CreateDemuxer();
+
+ // Creates a decoder of type Decoder. Returns true if the asynchronous action
+ // of creating decoder has started. Returns false if this method did nothing
+ // because the corresponding audio/video stream does not exist.
+ template <class Decoder>
+ bool CreateDecoder();
+
+ // Creates a renderer of type Renderer and connects it with Decoder. Returns
+ // true if the asynchronous action of creating renderer has started. Returns
+ // false if this method did nothing because the corresponding audio/video
+ // stream does not exist.
template <class Decoder, class Renderer>
- void Render(FilterFactory* filter_factory, Demuxer* demuxer);
+ bool CreateRenderer();
// Examine the list of existing filters to find one that supports the
// specified Filter interface. If one exists, the |filter_out| will contain
@@ -321,13 +365,17 @@ class PipelineThread : public base::RefCountedThreadSafe<PipelineThread>,
// loop's queue.
bool time_update_callback_scheduled_;
- // During initialization of a filter, this member points to the FilterHostImpl
- // that is being initialized.
- FilterHostImpl* host_initializing_;
+ // Member that tracks the current state.
+ State state_;
+
+ // Filter factory as passed in by Start().
+ scoped_refptr<FilterFactory> filter_factory_;
+
+ // URL for the data source as passed in by Start().
+ std::string url_;
- // This lock is held through the entire StartTask method to prevent the
- // Stop method from quitting the nested message loop of the StartTask method.
- Lock initialization_lock_;
+ // Initialization callback as passed in by Start().
+ scoped_ptr<PipelineCallback> init_callback_;
// Vector of FilterHostImpl objects that contian the filters for the pipeline.
typedef std::vector<FilterHostImpl*> FilterHostVector;
diff --git a/media/base/pipeline_impl_unittest.cc b/media/base/pipeline_impl_unittest.cc
index f28ee1c..56aa246 100644
--- a/media/base/pipeline_impl_unittest.cc
+++ b/media/base/pipeline_impl_unittest.cc
@@ -200,6 +200,21 @@ TEST_F(PipelineImplTest, Seek) {
EXPECT_TRUE(expected == filters_->video_renderer()->seek_time());
}
+// Try to execute Start()/Stop() on the Pipeline many times and very fast. This
+// test is trying to simulate the situation where the pipeline can get dead
+// locked very easily by quickly calling Start()/Stop().
+TEST_F(PipelineImplTest, StressTestPipelineStartStop) {
+ media::old_mocks::MockFilterConfig config;
+ const int kTimes = 1000;
+ for (int i = 0; i < kTimes; ++i) {
+ scoped_refptr<media::old_mocks::MockFilterFactory> factory =
+ new media::old_mocks::MockFilterFactory(&config);
+ media::PipelineImpl pipeline;
+ pipeline.Start(factory.get(), "", NULL);
+ pipeline.Stop();
+ }
+}
+
// TODO(ralphl): Add a unit test that makes sure that the mock audio filter
// is actually called on a SetVolume() call to the pipeline. I almost checked
// in code that broke this, but all unit tests were passing.