summaryrefslogtreecommitdiffstats
path: root/media/base/pipeline_impl.cc
diff options
context:
space:
mode:
Diffstat (limited to 'media/base/pipeline_impl.cc')
-rw-r--r--media/base/pipeline_impl.cc475
1 files changed, 248 insertions, 227 deletions
diff --git a/media/base/pipeline_impl.cc b/media/base/pipeline_impl.cc
index 56a71a6..68d509c 100644
--- a/media/base/pipeline_impl.cc
+++ b/media/base/pipeline_impl.cc
@@ -31,6 +31,24 @@ bool SupportsSetMessageLoop() {
}
}
+// Small helper function to help us name filter threads for debugging.
+//
+// TODO(scherkus): figure out a cleaner way to derive the filter thread name.
+template <class Filter>
+const char* GetThreadName() {
+ DCHECK(SupportsSetMessageLoop<Filter>());
+ switch (Filter::filter_type()) {
+ case FILTER_DEMUXER:
+ return "DemuxerThread";
+ case FILTER_AUDIO_DECODER:
+ return "AudioDecoderThread";
+ case FILTER_VIDEO_DECODER:
+ return "VideoDecoderThread";
+ default:
+ return "FilterThread";
+ }
+}
+
// Helper function used with NewRunnableMethod to implement a (very) crude
// blocking counter.
//
@@ -46,62 +64,61 @@ void DecrementCounter(Lock* lock, ConditionVariable* cond_var, int* count) {
} // namespace
-PipelineImpl::PipelineImpl() {
+PipelineImpl::PipelineImpl(MessageLoop* message_loop)
+ : message_loop_(message_loop) {
ResetState();
}
PipelineImpl::~PipelineImpl() {
- Stop();
+ DCHECK(!pipeline_internal_)
+ << "Stop() must complete before destroying object";
}
-// Creates the PipelineThread and calls it's start method.
+// Creates the PipelineInternal and calls it's start method.
bool PipelineImpl::Start(FilterFactory* factory,
const std::string& url,
- PipelineCallback* init_complete_callback) {
- DCHECK(!pipeline_thread_);
+ PipelineCallback* start_callback) {
+ DCHECK(!pipeline_internal_);
DCHECK(factory);
- DCHECK(!initialized_);
- DCHECK(!IsPipelineThread());
- if (!pipeline_thread_ && factory) {
- pipeline_thread_ = new PipelineThread(this);
- if (pipeline_thread_) {
- // TODO(ralphl): Does the callback get copied by these fancy templates?
- // if so, then do I want to always delete it here???
- if (pipeline_thread_->Start(factory, url, init_complete_callback)) {
- return true;
- }
- pipeline_thread_ = NULL; // Releases reference to destroy thread
- }
+ if (pipeline_internal_ || !factory) {
+ return false;
}
- delete init_complete_callback;
- return false;
-}
-// Stop the PipelineThread and return to a state identical to that of a newly
-// created PipelineImpl object.
-void PipelineImpl::Stop() {
- DCHECK(!IsPipelineThread());
+ // Create and start the PipelineInternal.
+ pipeline_internal_ = new PipelineInternal(this, message_loop_);
+ if (!pipeline_internal_) {
+ NOTREACHED() << "Could not create PipelineInternal";
+ return false;
+ }
+ pipeline_internal_->Start(factory, url, start_callback);
+ return true;
+}
- if (pipeline_thread_) {
- pipeline_thread_->Stop();
+// Stop the PipelineInternal who will NULL our reference to it and reset our
+// state to a newly created PipelineImpl object.
+void PipelineImpl::Stop(PipelineCallback* stop_callback) {
+ if (pipeline_internal_) {
+ pipeline_internal_->Stop(stop_callback);
}
- ResetState();
}
void PipelineImpl::Seek(base::TimeDelta time,
PipelineCallback* seek_callback) {
- DCHECK(!IsPipelineThread());
-
if (IsPipelineOk()) {
- pipeline_thread_->Seek(time, seek_callback);
+ pipeline_internal_->Seek(time, seek_callback);
} else {
NOTREACHED();
}
}
+bool PipelineImpl::IsRunning() const {
+ AutoLock auto_lock(const_cast<Lock&>(lock_));
+ return pipeline_internal_ != NULL;
+}
+
bool PipelineImpl::IsInitialized() const {
AutoLock auto_lock(lock_);
- return initialized_;
+ return pipeline_internal_ && pipeline_internal_->IsInitialized();
}
bool PipelineImpl::IsRendered(const std::string& major_mime_type) const {
@@ -117,10 +134,8 @@ float PipelineImpl::GetPlaybackRate() const {
}
void PipelineImpl::SetPlaybackRate(float rate) {
- DCHECK(!IsPipelineThread());
-
if (IsPipelineOk() && rate >= 0.0f) {
- pipeline_thread_->SetPlaybackRate(rate);
+ pipeline_internal_->SetPlaybackRate(rate);
} else {
// It's OK for a client to call SetPlaybackRate(0.0f) if we're stopped.
DCHECK(rate == 0.0f && playback_rate_ == 0.0f);
@@ -133,10 +148,8 @@ float PipelineImpl::GetVolume() const {
}
void PipelineImpl::SetVolume(float volume) {
- DCHECK(!IsPipelineThread());
-
if (IsPipelineOk() && volume >= 0.0f && volume <= 1.0f) {
- pipeline_thread_->SetVolume(volume);
+ pipeline_internal_->SetVolume(volume);
} else {
NOTREACHED();
}
@@ -182,8 +195,7 @@ PipelineError PipelineImpl::GetError() const {
void PipelineImpl::ResetState() {
AutoLock auto_lock(lock_);
- pipeline_thread_ = NULL;
- initialized_ = false;
+ pipeline_internal_ = NULL;
duration_ = base::TimeDelta();
buffered_time_ = base::TimeDelta();
buffered_bytes_ = 0;
@@ -198,12 +210,7 @@ void PipelineImpl::ResetState() {
}
bool PipelineImpl::IsPipelineOk() const {
- return pipeline_thread_ && initialized_ && PIPELINE_OK == error_;
-}
-
-bool PipelineImpl::IsPipelineThread() const {
- return pipeline_thread_ &&
- PlatformThread::CurrentId() == pipeline_thread_->thread_id();
+ return pipeline_internal_ && PIPELINE_OK == error_;
}
void PipelineImpl::SetDuration(base::TimeDelta duration) {
@@ -263,100 +270,86 @@ void PipelineImpl::InsertRenderedMimeType(const std::string& major_mime_type) {
//-----------------------------------------------------------------------------
-PipelineThread::PipelineThread(PipelineImpl* pipeline)
+PipelineInternal::PipelineInternal(PipelineImpl* pipeline,
+ MessageLoop* message_loop)
: pipeline_(pipeline),
- thread_("PipelineThread"),
+ message_loop_(message_loop),
state_(kCreated) {
}
-PipelineThread::~PipelineThread() {
- Stop();
- DCHECK(state_ == kStopped || state_ == kError);
+PipelineInternal::~PipelineInternal() {
+ DCHECK(state_ == kCreated || state_ == kStopped);
}
-// This method is called on the client's thread. It starts the pipeline's
-// dedicated thread and posts a task to call the StartTask() method on that
-// thread.
-bool PipelineThread::Start(FilterFactory* filter_factory,
- const std::string& url,
- PipelineCallback* init_complete_callback) {
- DCHECK_EQ(kCreated, state_);
- if (thread_.Start()) {
- filter_factory_ = filter_factory;
- url_ = url;
- init_callback_.reset(init_complete_callback);
- message_loop()->PostTask(FROM_HERE,
- NewRunnableMethod(this, &PipelineThread::StartTask));
- return true;
- }
- return false;
+// Called on client's thread.
+void PipelineInternal::Start(FilterFactory* filter_factory,
+ const std::string& url,
+ PipelineCallback* start_callback) {
+ DCHECK(filter_factory);
+ message_loop_->PostTask(FROM_HERE,
+ NewRunnableMethod(this, &PipelineInternal::StartTask, filter_factory, url,
+ start_callback));
}
-// Called on the client's thread. If the thread has been started, then posts
-// a task to call the StopTask() method, then waits until the thread has
-// stopped.
-void PipelineThread::Stop() {
- if (thread_.IsRunning()) {
- message_loop()->PostTask(FROM_HERE,
- NewRunnableMethod(this, &PipelineThread::StopTask));
- thread_.Stop();
- }
- DCHECK(filter_hosts_.empty());
- DCHECK(filter_threads_.empty());
+// Called on client's thread.
+void PipelineInternal::Stop(PipelineCallback* stop_callback) {
+ message_loop_->PostTask(FROM_HERE,
+ NewRunnableMethod(this, &PipelineInternal::StopTask, stop_callback));
}
// Called on client's thread.
-void PipelineThread::Seek(base::TimeDelta time,
+void PipelineInternal::Seek(base::TimeDelta time,
PipelineCallback* seek_callback) {
- message_loop()->PostTask(FROM_HERE,
- NewRunnableMethod(this, &PipelineThread::SeekTask, time, seek_callback));
+ message_loop_->PostTask(FROM_HERE,
+ NewRunnableMethod(this, &PipelineInternal::SeekTask, time,
+ seek_callback));
}
// Called on client's thread.
-void PipelineThread::SetPlaybackRate(float rate) {
- message_loop()->PostTask(FROM_HERE,
- NewRunnableMethod(this, &PipelineThread::SetPlaybackRateTask, rate));
+void PipelineInternal::SetPlaybackRate(float rate) {
+ message_loop_->PostTask(FROM_HERE,
+ NewRunnableMethod(this, &PipelineInternal::SetPlaybackRateTask, rate));
}
// Called on client's thread.
-void PipelineThread::SetVolume(float volume) {
- message_loop()->PostTask(FROM_HERE,
- NewRunnableMethod(this, &PipelineThread::SetVolumeTask, volume));
+void PipelineInternal::SetVolume(float volume) {
+ message_loop_->PostTask(FROM_HERE,
+ NewRunnableMethod(this, &PipelineInternal::SetVolumeTask, volume));
}
// Called from any thread.
-void PipelineThread::InitializationComplete(FilterHostImpl* host) {
+void PipelineInternal::InitializationComplete(FilterHostImpl* host) {
if (IsPipelineOk()) {
- // Continue the start task by proceeding to the next stage.
- message_loop()->PostTask(FROM_HERE,
- NewRunnableMethod(this, &PipelineThread::StartTask));
+ // Continue the initialize task by proceeding to the next stage.
+ message_loop_->PostTask(FROM_HERE,
+ NewRunnableMethod(this, &PipelineInternal::InitializeTask));
}
}
// Called from any thread. Updates the pipeline time.
-void PipelineThread::SetTime(base::TimeDelta time) {
- pipeline()->SetTime(time);
+void PipelineInternal::SetTime(base::TimeDelta time) {
+ // TODO(scherkus): why not post a task?
+ pipeline_->SetTime(time);
}
-// Called from any thread. Sets the pipeline |error_| member and schedules a
-// task to stop all the filters in the pipeline. Note that the thread will
-// continue to run until the client calls Pipeline::Stop(), but nothing will
-// be processed since filters will not be able to post tasks.
-void PipelineThread::Error(PipelineError error) {
- // If this method returns false, then an error has already happened, so no
- // reason to run the StopTask again. It's going to happen.
- if (pipeline()->InternalSetError(error)) {
- message_loop()->PostTask(FROM_HERE,
- NewRunnableMethod(this, &PipelineThread::StopTask));
- }
+// Called from any thread. Sets the pipeline |error_| member and destroys all
+// filters.
+void PipelineInternal::Error(PipelineError error) {
+ message_loop_->PostTask(FROM_HERE,
+ NewRunnableMethod(this, &PipelineInternal::ErrorTask, error));
}
-// Called as a result of destruction of the thread.
-//
-// TODO(scherkus): this can block the client due to synchronous Stop() API call.
-void PipelineThread::WillDestroyCurrentMessageLoop() {
- STLDeleteElements(&filter_hosts_);
- STLDeleteElements(&filter_threads_);
+void PipelineInternal::StartTask(FilterFactory* filter_factory,
+ const std::string& url,
+ PipelineCallback* start_callback) {
+ DCHECK_EQ(MessageLoop::current(), message_loop_);
+ DCHECK_EQ(kCreated, state_);
+ filter_factory_ = filter_factory;
+ url_ = url;
+ start_callback_.reset(start_callback);
+
+ // Kick off initialization.
+ InitializeTask();
}
// Main initialization method called on the pipeline thread. This code attempts
@@ -370,18 +363,17 @@ void PipelineThread::WillDestroyCurrentMessageLoop() {
// then connects the VideoDecoder to a VideoRenderer.
//
// When all required filters have been created and have called their
-// FilterHost's InitializationComplete method, the pipeline's |initialized_|
-// member is set to true, and, if the client provided an
-// |init_complete_callback_|, it is called with "true".
+// FilterHost's InitializationComplete() method, the pipeline will update its
+// state to kStarted and |init_callback_|, will be executed.
//
// If initialization fails, the client's callback will still be called, but
// the bool parameter passed to it will be false.
//
-// TODO(hclam): StartTask is now starting the pipeline asynchronously. It
+// TODO(hclam): InitializeTask() is now starting the pipeline asynchronously. It
// works like a big state change table. If we no longer need to start filters
// in order, we need to get rid of all the state change.
-void PipelineThread::StartTask() {
- DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id());
+void PipelineInternal::InitializeTask() {
+ DCHECK_EQ(MessageLoop::current(), message_loop_);
// If we have received the stop signal, return immediately.
if (state_ == kStopped)
@@ -391,7 +383,6 @@ void PipelineThread::StartTask() {
// Just created, create data source.
if (state_ == kCreated) {
- message_loop()->AddDestructionObserver(this);
state_ = kInitDataSource;
CreateDataSource();
return;
@@ -446,99 +437,77 @@ void PipelineThread::StartTask() {
}
state_ = kStarted;
- pipeline_->initialized_ = true;
filter_factory_ = NULL;
- if (init_callback_.get()) {
- init_callback_->Run(true);
- init_callback_.reset();
+ if (start_callback_.get()) {
+ start_callback_->Run(true);
+ start_callback_.reset();
}
}
}
// This method is called as a result of the client calling Pipeline::Stop() or
// as the result of an error condition. If there is no error, then set the
-// pipeline's error_ member to PIPELINE_STOPPING. We stop the filters in the
+// pipeline's |error_| member to PIPELINE_STOPPING. We stop the filters in the
// reverse order.
//
// TODO(scherkus): beware! this can get posted multiple times since we post
// Stop() tasks even if we've already stopped. Perhaps this should no-op for
// additional calls, however most of this logic will be changing.
-void PipelineThread::StopTask() {
- DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id());
+void PipelineInternal::StopTask(PipelineCallback* stop_callback) {
+ DCHECK_EQ(MessageLoop::current(), message_loop_);
+ stop_callback_.reset(stop_callback);
- if (IsPipelineInitializing()) {
- // If IsPipelineOk() is true, the pipeline was simply stopped during
- // initialization. Otherwise it is a failure.
- state_ = IsPipelineOk() ? kStopped : kError;
- filter_factory_ = NULL;
- if (init_callback_.get()) {
- init_callback_->Run(false);
- init_callback_.reset();
- }
- } else {
- state_ = kStopped;
+ // If we've already stopped, return immediately.
+ if (state_ == kStopped) {
+ return;
}
- if (IsPipelineOk()) {
- pipeline_->error_ = PIPELINE_STOPPING;
- }
+ // Carry out setting the error, notifying the client and destroying filters.
+ ErrorTask(PIPELINE_STOPPING);
- // Stop every filter.
- for (FilterHostVector::iterator iter = filter_hosts_.begin();
- iter != filter_hosts_.end();
- ++iter) {
- (*iter)->Stop();
- }
+ // We no longer need to examine our previous state, set it to stopped.
+ state_ = kStopped;
- // Figure out how many threads we have to stop.
- //
- // TODO(scherkus): remove the workaround for the "multiple StopTask()" issue.
- FilterThreadVector running_threads;
- for (FilterThreadVector::iterator iter = filter_threads_.begin();
- iter != filter_threads_.end();
- ++iter) {
- if ((*iter)->IsRunning()) {
- running_threads.push_back(*iter);
- }
+ // Reset the pipeline and set our reference to NULL so we don't accidentally
+ // modify the pipeline. Once remaining tasks execute we will be destroyed.
+ pipeline_->ResetState();
+ pipeline_ = NULL;
+
+ // Notify the client that stopping has finished.
+ if (stop_callback_.get()) {
+ stop_callback_->Run(true);
+ stop_callback_.reset();
}
+}
- // Crude blocking counter implementation.
- Lock lock;
- ConditionVariable wait_for_zero(&lock);
- int count = running_threads.size();
+void PipelineInternal::ErrorTask(PipelineError error) {
+ DCHECK_EQ(MessageLoop::current(), message_loop_);
+ DCHECK_NE(PIPELINE_OK, error) << "PIPELINE_OK isn't an error!";
- // Post a task to every filter's thread to ensure that they've completed their
- // stopping logic before stopping the threads themselves.
- //
- // TODO(scherkus): again, Stop() should either be synchronous or we should
- // receive a signal from filters that they have indeed stopped.
- for (FilterThreadVector::iterator iter = running_threads.begin();
- iter != running_threads.end();
- ++iter) {
- (*iter)->message_loop()->PostTask(FROM_HERE,
- NewRunnableFunction(&DecrementCounter, &lock, &wait_for_zero, &count));
+ // Suppress executing additional error logic.
+ if (state_ == kError) {
+ return;
}
- // Wait on our "blocking counter".
- {
- AutoLock auto_lock(lock);
- while (count > 0) {
- wait_for_zero.Wait();
- }
- }
+ // Update our error code first in case we execute the start callback.
+ pipeline_->error_ = error;
- // Stop every running filter thread.
- //
- // TODO(scherkus): can we watchdog this section to detect wedged threads?
- for (FilterThreadVector::iterator iter = running_threads.begin();
- iter != running_threads.end();
- ++iter) {
- (*iter)->Stop();
+ // Notify the client that starting did not complete, if necessary.
+ if (IsPipelineInitializing() && start_callback_.get()) {
+ start_callback_->Run(false);
}
+ start_callback_.reset();
+ filter_factory_ = NULL;
+
+ // We no longer need to examine our previous state, set it to stopped.
+ state_ = kError;
+
+ // Destroy every filter and reset the pipeline as well.
+ DestroyFilters();
}
-void PipelineThread::SetPlaybackRateTask(float rate) {
- DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id());
+void PipelineInternal::SetPlaybackRateTask(float rate) {
+ DCHECK_EQ(MessageLoop::current(), message_loop_);
pipeline_->InternalSetPlaybackRate(rate);
for (FilterHostVector::iterator iter = filter_hosts_.begin();
@@ -548,9 +517,10 @@ void PipelineThread::SetPlaybackRateTask(float rate) {
}
}
-void PipelineThread::SeekTask(base::TimeDelta time,
- PipelineCallback* seek_callback) {
- DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id());
+void PipelineInternal::SeekTask(base::TimeDelta time,
+ PipelineCallback* seek_callback) {
+ DCHECK_EQ(MessageLoop::current(), message_loop_);
+ seek_callback_.reset(seek_callback);
for (FilterHostVector::iterator iter = filter_hosts_.begin();
iter != filter_hosts_.end();
@@ -566,14 +536,14 @@ void PipelineThread::SeekTask(base::TimeDelta time,
// immediately here or we set time and do callback when we have new
// frames/packets.
SetTime(time);
- if (seek_callback) {
- seek_callback->Run(true);
- delete seek_callback;
+ if (seek_callback_.get()) {
+ seek_callback_->Run(true);
+ seek_callback_.reset();
}
}
-void PipelineThread::SetVolumeTask(float volume) {
- DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id());
+void PipelineInternal::SetVolumeTask(float volume) {
+ DCHECK_EQ(MessageLoop::current(), message_loop_);
pipeline_->volume_ = volume;
scoped_refptr<AudioRenderer> audio_renderer;
@@ -584,44 +554,46 @@ void PipelineThread::SetVolumeTask(float volume) {
}
template <class Filter, class Source>
-void PipelineThread::CreateFilter(FilterFactory* filter_factory,
- Source source,
- const MediaFormat& media_format) {
- DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id());
+void PipelineInternal::CreateFilter(FilterFactory* filter_factory,
+ Source source,
+ const MediaFormat& media_format) {
+ DCHECK_EQ(MessageLoop::current(), message_loop_);
DCHECK(IsPipelineOk());
+ // Create the filter.
scoped_refptr<Filter> filter = filter_factory->Create<Filter>(media_format);
if (!filter) {
Error(PIPELINE_ERROR_REQUIRED_FILTER_MISSING);
- } else {
- scoped_ptr<FilterHostImpl> host(new FilterHostImpl(this, filter.get()));
- // Create a dedicated thread for this filter.
- if (SupportsSetMessageLoop<Filter>()) {
- // TODO(scherkus): figure out a way to name these threads so it matches
- // the filter type.
- scoped_ptr<base::Thread> thread(new base::Thread("FilterThread"));
- if (!thread.get() || !thread->Start()) {
- NOTREACHED() << "Could not start filter thread";
- Error(PIPELINE_ERROR_INITIALIZATION_FAILED);
- } else {
- filter->set_message_loop(thread->message_loop());
- filter_threads_.push_back(thread.release());
- }
- }
+ return;
+ }
- // Creating a thread could have failed, verify we're still OK.
- if (IsPipelineOk()) {
- filter_hosts_.push_back(host.get());
- filter->set_host(host.release());
- if (!filter->Initialize(source)) {
- Error(PIPELINE_ERROR_INITIALIZATION_FAILED);
- }
+ // Create a dedicated thread for this filter if applicable.
+ if (SupportsSetMessageLoop<Filter>()) {
+ scoped_ptr<base::Thread> thread(new base::Thread(GetThreadName<Filter>()));
+ if (!thread.get() || !thread->Start()) {
+ NOTREACHED() << "Could not start filter thread";
+ Error(PIPELINE_ERROR_INITIALIZATION_FAILED);
+ return;
}
+
+ filter->set_message_loop(thread->message_loop());
+ filter_threads_.push_back(thread.release());
+ }
+
+ // Create the filter's host.
+ DCHECK(IsPipelineOk());
+ scoped_ptr<FilterHostImpl> host(new FilterHostImpl(this, filter.get()));
+ filter->set_host(host.get());
+ filter_hosts_.push_back(host.release());
+
+ // Now initialize the filter.
+ if (!filter->Initialize(source)) {
+ Error(PIPELINE_ERROR_INITIALIZATION_FAILED);
}
}
-void PipelineThread::CreateDataSource() {
- DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id());
+void PipelineInternal::CreateDataSource() {
+ DCHECK_EQ(MessageLoop::current(), message_loop_);
DCHECK(IsPipelineOk());
MediaFormat url_format;
@@ -630,8 +602,8 @@ void PipelineThread::CreateDataSource() {
CreateFilter<DataSource>(filter_factory_, url_, url_format);
}
-void PipelineThread::CreateDemuxer() {
- DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id());
+void PipelineInternal::CreateDemuxer() {
+ DCHECK_EQ(MessageLoop::current(), message_loop_);
DCHECK(IsPipelineOk());
scoped_refptr<DataSource> data_source;
@@ -641,8 +613,8 @@ void PipelineThread::CreateDemuxer() {
}
template <class Decoder>
-bool PipelineThread::CreateDecoder() {
- DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id());
+bool PipelineInternal::CreateDecoder() {
+ DCHECK_EQ(MessageLoop::current(), message_loop_);
DCHECK(IsPipelineOk());
scoped_refptr<Demuxer> demuxer;
@@ -664,8 +636,8 @@ bool PipelineThread::CreateDecoder() {
}
template <class Decoder, class Renderer>
-bool PipelineThread::CreateRenderer() {
- DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id());
+bool PipelineInternal::CreateRenderer() {
+ DCHECK_EQ(MessageLoop::current(), message_loop_);
DCHECK(IsPipelineOk());
scoped_refptr<Decoder> decoder;
@@ -681,8 +653,8 @@ bool PipelineThread::CreateRenderer() {
}
template <class Filter>
-void PipelineThread::GetFilter(scoped_refptr<Filter>* filter_out) const {
- DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id());
+void PipelineInternal::GetFilter(scoped_refptr<Filter>* filter_out) const {
+ DCHECK_EQ(MessageLoop::current(), message_loop_);
*filter_out = NULL;
for (FilterHostVector::const_iterator iter = filter_hosts_.begin();
@@ -692,4 +664,53 @@ void PipelineThread::GetFilter(scoped_refptr<Filter>* filter_out) const {
}
}
+void PipelineInternal::DestroyFilters() {
+ // Stop every filter.
+ for (FilterHostVector::iterator iter = filter_hosts_.begin();
+ iter != filter_hosts_.end();
+ ++iter) {
+ (*iter)->Stop();
+ }
+
+ // Crude blocking counter implementation.
+ Lock lock;
+ ConditionVariable wait_for_zero(&lock);
+ int count = filter_threads_.size();
+
+ // Post a task to every filter's thread to ensure that they've completed their
+ // stopping logic before stopping the threads themselves.
+ //
+ // TODO(scherkus): again, Stop() should either be synchronous or we should
+ // receive a signal from filters that they have indeed stopped.
+ for (FilterThreadVector::iterator iter = filter_threads_.begin();
+ iter != filter_threads_.end();
+ ++iter) {
+ (*iter)->message_loop()->PostTask(FROM_HERE,
+ NewRunnableFunction(&DecrementCounter, &lock, &wait_for_zero, &count));
+ }
+
+ // Wait on our "blocking counter".
+ {
+ AutoLock auto_lock(lock);
+ while (count > 0) {
+ wait_for_zero.Wait();
+ }
+ }
+
+ // Stop every running filter thread.
+ //
+ // TODO(scherkus): can we watchdog this section to detect wedged threads?
+ for (FilterThreadVector::iterator iter = filter_threads_.begin();
+ iter != filter_threads_.end();
+ ++iter) {
+ (*iter)->Stop();
+ }
+
+ // Reset the pipeline, which will decrement a reference to this object.
+ // We will get destroyed as soon as the remaining tasks finish executing.
+ // To be safe, we'll set our pipeline reference to NULL.
+ STLDeleteElements(&filter_hosts_);
+ STLDeleteElements(&filter_threads_);
+}
+
} // namespace media