diff options
Diffstat (limited to 'media/base/pipeline_impl.cc')
-rw-r--r-- | media/base/pipeline_impl.cc | 475 |
1 files changed, 248 insertions, 227 deletions
diff --git a/media/base/pipeline_impl.cc b/media/base/pipeline_impl.cc index 56a71a6..68d509c 100644 --- a/media/base/pipeline_impl.cc +++ b/media/base/pipeline_impl.cc @@ -31,6 +31,24 @@ bool SupportsSetMessageLoop() { } } +// Small helper function to help us name filter threads for debugging. +// +// TODO(scherkus): figure out a cleaner way to derive the filter thread name. +template <class Filter> +const char* GetThreadName() { + DCHECK(SupportsSetMessageLoop<Filter>()); + switch (Filter::filter_type()) { + case FILTER_DEMUXER: + return "DemuxerThread"; + case FILTER_AUDIO_DECODER: + return "AudioDecoderThread"; + case FILTER_VIDEO_DECODER: + return "VideoDecoderThread"; + default: + return "FilterThread"; + } +} + // Helper function used with NewRunnableMethod to implement a (very) crude // blocking counter. // @@ -46,62 +64,61 @@ void DecrementCounter(Lock* lock, ConditionVariable* cond_var, int* count) { } // namespace -PipelineImpl::PipelineImpl() { +PipelineImpl::PipelineImpl(MessageLoop* message_loop) + : message_loop_(message_loop) { ResetState(); } PipelineImpl::~PipelineImpl() { - Stop(); + DCHECK(!pipeline_internal_) + << "Stop() must complete before destroying object"; } -// Creates the PipelineThread and calls it's start method. +// Creates the PipelineInternal and calls it's start method. bool PipelineImpl::Start(FilterFactory* factory, const std::string& url, - PipelineCallback* init_complete_callback) { - DCHECK(!pipeline_thread_); + PipelineCallback* start_callback) { + DCHECK(!pipeline_internal_); DCHECK(factory); - DCHECK(!initialized_); - DCHECK(!IsPipelineThread()); - if (!pipeline_thread_ && factory) { - pipeline_thread_ = new PipelineThread(this); - if (pipeline_thread_) { - // TODO(ralphl): Does the callback get copied by these fancy templates? - // if so, then do I want to always delete it here??? - if (pipeline_thread_->Start(factory, url, init_complete_callback)) { - return true; - } - pipeline_thread_ = NULL; // Releases reference to destroy thread - } + if (pipeline_internal_ || !factory) { + return false; } - delete init_complete_callback; - return false; -} -// Stop the PipelineThread and return to a state identical to that of a newly -// created PipelineImpl object. -void PipelineImpl::Stop() { - DCHECK(!IsPipelineThread()); + // Create and start the PipelineInternal. + pipeline_internal_ = new PipelineInternal(this, message_loop_); + if (!pipeline_internal_) { + NOTREACHED() << "Could not create PipelineInternal"; + return false; + } + pipeline_internal_->Start(factory, url, start_callback); + return true; +} - if (pipeline_thread_) { - pipeline_thread_->Stop(); +// Stop the PipelineInternal who will NULL our reference to it and reset our +// state to a newly created PipelineImpl object. +void PipelineImpl::Stop(PipelineCallback* stop_callback) { + if (pipeline_internal_) { + pipeline_internal_->Stop(stop_callback); } - ResetState(); } void PipelineImpl::Seek(base::TimeDelta time, PipelineCallback* seek_callback) { - DCHECK(!IsPipelineThread()); - if (IsPipelineOk()) { - pipeline_thread_->Seek(time, seek_callback); + pipeline_internal_->Seek(time, seek_callback); } else { NOTREACHED(); } } +bool PipelineImpl::IsRunning() const { + AutoLock auto_lock(const_cast<Lock&>(lock_)); + return pipeline_internal_ != NULL; +} + bool PipelineImpl::IsInitialized() const { AutoLock auto_lock(lock_); - return initialized_; + return pipeline_internal_ && pipeline_internal_->IsInitialized(); } bool PipelineImpl::IsRendered(const std::string& major_mime_type) const { @@ -117,10 +134,8 @@ float PipelineImpl::GetPlaybackRate() const { } void PipelineImpl::SetPlaybackRate(float rate) { - DCHECK(!IsPipelineThread()); - if (IsPipelineOk() && rate >= 0.0f) { - pipeline_thread_->SetPlaybackRate(rate); + pipeline_internal_->SetPlaybackRate(rate); } else { // It's OK for a client to call SetPlaybackRate(0.0f) if we're stopped. DCHECK(rate == 0.0f && playback_rate_ == 0.0f); @@ -133,10 +148,8 @@ float PipelineImpl::GetVolume() const { } void PipelineImpl::SetVolume(float volume) { - DCHECK(!IsPipelineThread()); - if (IsPipelineOk() && volume >= 0.0f && volume <= 1.0f) { - pipeline_thread_->SetVolume(volume); + pipeline_internal_->SetVolume(volume); } else { NOTREACHED(); } @@ -182,8 +195,7 @@ PipelineError PipelineImpl::GetError() const { void PipelineImpl::ResetState() { AutoLock auto_lock(lock_); - pipeline_thread_ = NULL; - initialized_ = false; + pipeline_internal_ = NULL; duration_ = base::TimeDelta(); buffered_time_ = base::TimeDelta(); buffered_bytes_ = 0; @@ -198,12 +210,7 @@ void PipelineImpl::ResetState() { } bool PipelineImpl::IsPipelineOk() const { - return pipeline_thread_ && initialized_ && PIPELINE_OK == error_; -} - -bool PipelineImpl::IsPipelineThread() const { - return pipeline_thread_ && - PlatformThread::CurrentId() == pipeline_thread_->thread_id(); + return pipeline_internal_ && PIPELINE_OK == error_; } void PipelineImpl::SetDuration(base::TimeDelta duration) { @@ -263,100 +270,86 @@ void PipelineImpl::InsertRenderedMimeType(const std::string& major_mime_type) { //----------------------------------------------------------------------------- -PipelineThread::PipelineThread(PipelineImpl* pipeline) +PipelineInternal::PipelineInternal(PipelineImpl* pipeline, + MessageLoop* message_loop) : pipeline_(pipeline), - thread_("PipelineThread"), + message_loop_(message_loop), state_(kCreated) { } -PipelineThread::~PipelineThread() { - Stop(); - DCHECK(state_ == kStopped || state_ == kError); +PipelineInternal::~PipelineInternal() { + DCHECK(state_ == kCreated || state_ == kStopped); } -// This method is called on the client's thread. It starts the pipeline's -// dedicated thread and posts a task to call the StartTask() method on that -// thread. -bool PipelineThread::Start(FilterFactory* filter_factory, - const std::string& url, - PipelineCallback* init_complete_callback) { - DCHECK_EQ(kCreated, state_); - if (thread_.Start()) { - filter_factory_ = filter_factory; - url_ = url; - init_callback_.reset(init_complete_callback); - message_loop()->PostTask(FROM_HERE, - NewRunnableMethod(this, &PipelineThread::StartTask)); - return true; - } - return false; +// Called on client's thread. +void PipelineInternal::Start(FilterFactory* filter_factory, + const std::string& url, + PipelineCallback* start_callback) { + DCHECK(filter_factory); + message_loop_->PostTask(FROM_HERE, + NewRunnableMethod(this, &PipelineInternal::StartTask, filter_factory, url, + start_callback)); } -// Called on the client's thread. If the thread has been started, then posts -// a task to call the StopTask() method, then waits until the thread has -// stopped. -void PipelineThread::Stop() { - if (thread_.IsRunning()) { - message_loop()->PostTask(FROM_HERE, - NewRunnableMethod(this, &PipelineThread::StopTask)); - thread_.Stop(); - } - DCHECK(filter_hosts_.empty()); - DCHECK(filter_threads_.empty()); +// Called on client's thread. +void PipelineInternal::Stop(PipelineCallback* stop_callback) { + message_loop_->PostTask(FROM_HERE, + NewRunnableMethod(this, &PipelineInternal::StopTask, stop_callback)); } // Called on client's thread. -void PipelineThread::Seek(base::TimeDelta time, +void PipelineInternal::Seek(base::TimeDelta time, PipelineCallback* seek_callback) { - message_loop()->PostTask(FROM_HERE, - NewRunnableMethod(this, &PipelineThread::SeekTask, time, seek_callback)); + message_loop_->PostTask(FROM_HERE, + NewRunnableMethod(this, &PipelineInternal::SeekTask, time, + seek_callback)); } // Called on client's thread. -void PipelineThread::SetPlaybackRate(float rate) { - message_loop()->PostTask(FROM_HERE, - NewRunnableMethod(this, &PipelineThread::SetPlaybackRateTask, rate)); +void PipelineInternal::SetPlaybackRate(float rate) { + message_loop_->PostTask(FROM_HERE, + NewRunnableMethod(this, &PipelineInternal::SetPlaybackRateTask, rate)); } // Called on client's thread. -void PipelineThread::SetVolume(float volume) { - message_loop()->PostTask(FROM_HERE, - NewRunnableMethod(this, &PipelineThread::SetVolumeTask, volume)); +void PipelineInternal::SetVolume(float volume) { + message_loop_->PostTask(FROM_HERE, + NewRunnableMethod(this, &PipelineInternal::SetVolumeTask, volume)); } // Called from any thread. -void PipelineThread::InitializationComplete(FilterHostImpl* host) { +void PipelineInternal::InitializationComplete(FilterHostImpl* host) { if (IsPipelineOk()) { - // Continue the start task by proceeding to the next stage. - message_loop()->PostTask(FROM_HERE, - NewRunnableMethod(this, &PipelineThread::StartTask)); + // Continue the initialize task by proceeding to the next stage. + message_loop_->PostTask(FROM_HERE, + NewRunnableMethod(this, &PipelineInternal::InitializeTask)); } } // Called from any thread. Updates the pipeline time. -void PipelineThread::SetTime(base::TimeDelta time) { - pipeline()->SetTime(time); +void PipelineInternal::SetTime(base::TimeDelta time) { + // TODO(scherkus): why not post a task? + pipeline_->SetTime(time); } -// Called from any thread. Sets the pipeline |error_| member and schedules a -// task to stop all the filters in the pipeline. Note that the thread will -// continue to run until the client calls Pipeline::Stop(), but nothing will -// be processed since filters will not be able to post tasks. -void PipelineThread::Error(PipelineError error) { - // If this method returns false, then an error has already happened, so no - // reason to run the StopTask again. It's going to happen. - if (pipeline()->InternalSetError(error)) { - message_loop()->PostTask(FROM_HERE, - NewRunnableMethod(this, &PipelineThread::StopTask)); - } +// Called from any thread. Sets the pipeline |error_| member and destroys all +// filters. +void PipelineInternal::Error(PipelineError error) { + message_loop_->PostTask(FROM_HERE, + NewRunnableMethod(this, &PipelineInternal::ErrorTask, error)); } -// Called as a result of destruction of the thread. -// -// TODO(scherkus): this can block the client due to synchronous Stop() API call. -void PipelineThread::WillDestroyCurrentMessageLoop() { - STLDeleteElements(&filter_hosts_); - STLDeleteElements(&filter_threads_); +void PipelineInternal::StartTask(FilterFactory* filter_factory, + const std::string& url, + PipelineCallback* start_callback) { + DCHECK_EQ(MessageLoop::current(), message_loop_); + DCHECK_EQ(kCreated, state_); + filter_factory_ = filter_factory; + url_ = url; + start_callback_.reset(start_callback); + + // Kick off initialization. + InitializeTask(); } // Main initialization method called on the pipeline thread. This code attempts @@ -370,18 +363,17 @@ void PipelineThread::WillDestroyCurrentMessageLoop() { // then connects the VideoDecoder to a VideoRenderer. // // When all required filters have been created and have called their -// FilterHost's InitializationComplete method, the pipeline's |initialized_| -// member is set to true, and, if the client provided an -// |init_complete_callback_|, it is called with "true". +// FilterHost's InitializationComplete() method, the pipeline will update its +// state to kStarted and |init_callback_|, will be executed. // // If initialization fails, the client's callback will still be called, but // the bool parameter passed to it will be false. // -// TODO(hclam): StartTask is now starting the pipeline asynchronously. It +// TODO(hclam): InitializeTask() is now starting the pipeline asynchronously. It // works like a big state change table. If we no longer need to start filters // in order, we need to get rid of all the state change. -void PipelineThread::StartTask() { - DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); +void PipelineInternal::InitializeTask() { + DCHECK_EQ(MessageLoop::current(), message_loop_); // If we have received the stop signal, return immediately. if (state_ == kStopped) @@ -391,7 +383,6 @@ void PipelineThread::StartTask() { // Just created, create data source. if (state_ == kCreated) { - message_loop()->AddDestructionObserver(this); state_ = kInitDataSource; CreateDataSource(); return; @@ -446,99 +437,77 @@ void PipelineThread::StartTask() { } state_ = kStarted; - pipeline_->initialized_ = true; filter_factory_ = NULL; - if (init_callback_.get()) { - init_callback_->Run(true); - init_callback_.reset(); + if (start_callback_.get()) { + start_callback_->Run(true); + start_callback_.reset(); } } } // This method is called as a result of the client calling Pipeline::Stop() or // as the result of an error condition. If there is no error, then set the -// pipeline's error_ member to PIPELINE_STOPPING. We stop the filters in the +// pipeline's |error_| member to PIPELINE_STOPPING. We stop the filters in the // reverse order. // // TODO(scherkus): beware! this can get posted multiple times since we post // Stop() tasks even if we've already stopped. Perhaps this should no-op for // additional calls, however most of this logic will be changing. -void PipelineThread::StopTask() { - DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); +void PipelineInternal::StopTask(PipelineCallback* stop_callback) { + DCHECK_EQ(MessageLoop::current(), message_loop_); + stop_callback_.reset(stop_callback); - if (IsPipelineInitializing()) { - // If IsPipelineOk() is true, the pipeline was simply stopped during - // initialization. Otherwise it is a failure. - state_ = IsPipelineOk() ? kStopped : kError; - filter_factory_ = NULL; - if (init_callback_.get()) { - init_callback_->Run(false); - init_callback_.reset(); - } - } else { - state_ = kStopped; + // If we've already stopped, return immediately. + if (state_ == kStopped) { + return; } - if (IsPipelineOk()) { - pipeline_->error_ = PIPELINE_STOPPING; - } + // Carry out setting the error, notifying the client and destroying filters. + ErrorTask(PIPELINE_STOPPING); - // Stop every filter. - for (FilterHostVector::iterator iter = filter_hosts_.begin(); - iter != filter_hosts_.end(); - ++iter) { - (*iter)->Stop(); - } + // We no longer need to examine our previous state, set it to stopped. + state_ = kStopped; - // Figure out how many threads we have to stop. - // - // TODO(scherkus): remove the workaround for the "multiple StopTask()" issue. - FilterThreadVector running_threads; - for (FilterThreadVector::iterator iter = filter_threads_.begin(); - iter != filter_threads_.end(); - ++iter) { - if ((*iter)->IsRunning()) { - running_threads.push_back(*iter); - } + // Reset the pipeline and set our reference to NULL so we don't accidentally + // modify the pipeline. Once remaining tasks execute we will be destroyed. + pipeline_->ResetState(); + pipeline_ = NULL; + + // Notify the client that stopping has finished. + if (stop_callback_.get()) { + stop_callback_->Run(true); + stop_callback_.reset(); } +} - // Crude blocking counter implementation. - Lock lock; - ConditionVariable wait_for_zero(&lock); - int count = running_threads.size(); +void PipelineInternal::ErrorTask(PipelineError error) { + DCHECK_EQ(MessageLoop::current(), message_loop_); + DCHECK_NE(PIPELINE_OK, error) << "PIPELINE_OK isn't an error!"; - // Post a task to every filter's thread to ensure that they've completed their - // stopping logic before stopping the threads themselves. - // - // TODO(scherkus): again, Stop() should either be synchronous or we should - // receive a signal from filters that they have indeed stopped. - for (FilterThreadVector::iterator iter = running_threads.begin(); - iter != running_threads.end(); - ++iter) { - (*iter)->message_loop()->PostTask(FROM_HERE, - NewRunnableFunction(&DecrementCounter, &lock, &wait_for_zero, &count)); + // Suppress executing additional error logic. + if (state_ == kError) { + return; } - // Wait on our "blocking counter". - { - AutoLock auto_lock(lock); - while (count > 0) { - wait_for_zero.Wait(); - } - } + // Update our error code first in case we execute the start callback. + pipeline_->error_ = error; - // Stop every running filter thread. - // - // TODO(scherkus): can we watchdog this section to detect wedged threads? - for (FilterThreadVector::iterator iter = running_threads.begin(); - iter != running_threads.end(); - ++iter) { - (*iter)->Stop(); + // Notify the client that starting did not complete, if necessary. + if (IsPipelineInitializing() && start_callback_.get()) { + start_callback_->Run(false); } + start_callback_.reset(); + filter_factory_ = NULL; + + // We no longer need to examine our previous state, set it to stopped. + state_ = kError; + + // Destroy every filter and reset the pipeline as well. + DestroyFilters(); } -void PipelineThread::SetPlaybackRateTask(float rate) { - DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); +void PipelineInternal::SetPlaybackRateTask(float rate) { + DCHECK_EQ(MessageLoop::current(), message_loop_); pipeline_->InternalSetPlaybackRate(rate); for (FilterHostVector::iterator iter = filter_hosts_.begin(); @@ -548,9 +517,10 @@ void PipelineThread::SetPlaybackRateTask(float rate) { } } -void PipelineThread::SeekTask(base::TimeDelta time, - PipelineCallback* seek_callback) { - DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); +void PipelineInternal::SeekTask(base::TimeDelta time, + PipelineCallback* seek_callback) { + DCHECK_EQ(MessageLoop::current(), message_loop_); + seek_callback_.reset(seek_callback); for (FilterHostVector::iterator iter = filter_hosts_.begin(); iter != filter_hosts_.end(); @@ -566,14 +536,14 @@ void PipelineThread::SeekTask(base::TimeDelta time, // immediately here or we set time and do callback when we have new // frames/packets. SetTime(time); - if (seek_callback) { - seek_callback->Run(true); - delete seek_callback; + if (seek_callback_.get()) { + seek_callback_->Run(true); + seek_callback_.reset(); } } -void PipelineThread::SetVolumeTask(float volume) { - DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); +void PipelineInternal::SetVolumeTask(float volume) { + DCHECK_EQ(MessageLoop::current(), message_loop_); pipeline_->volume_ = volume; scoped_refptr<AudioRenderer> audio_renderer; @@ -584,44 +554,46 @@ void PipelineThread::SetVolumeTask(float volume) { } template <class Filter, class Source> -void PipelineThread::CreateFilter(FilterFactory* filter_factory, - Source source, - const MediaFormat& media_format) { - DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); +void PipelineInternal::CreateFilter(FilterFactory* filter_factory, + Source source, + const MediaFormat& media_format) { + DCHECK_EQ(MessageLoop::current(), message_loop_); DCHECK(IsPipelineOk()); + // Create the filter. scoped_refptr<Filter> filter = filter_factory->Create<Filter>(media_format); if (!filter) { Error(PIPELINE_ERROR_REQUIRED_FILTER_MISSING); - } else { - scoped_ptr<FilterHostImpl> host(new FilterHostImpl(this, filter.get())); - // Create a dedicated thread for this filter. - if (SupportsSetMessageLoop<Filter>()) { - // TODO(scherkus): figure out a way to name these threads so it matches - // the filter type. - scoped_ptr<base::Thread> thread(new base::Thread("FilterThread")); - if (!thread.get() || !thread->Start()) { - NOTREACHED() << "Could not start filter thread"; - Error(PIPELINE_ERROR_INITIALIZATION_FAILED); - } else { - filter->set_message_loop(thread->message_loop()); - filter_threads_.push_back(thread.release()); - } - } + return; + } - // Creating a thread could have failed, verify we're still OK. - if (IsPipelineOk()) { - filter_hosts_.push_back(host.get()); - filter->set_host(host.release()); - if (!filter->Initialize(source)) { - Error(PIPELINE_ERROR_INITIALIZATION_FAILED); - } + // Create a dedicated thread for this filter if applicable. + if (SupportsSetMessageLoop<Filter>()) { + scoped_ptr<base::Thread> thread(new base::Thread(GetThreadName<Filter>())); + if (!thread.get() || !thread->Start()) { + NOTREACHED() << "Could not start filter thread"; + Error(PIPELINE_ERROR_INITIALIZATION_FAILED); + return; } + + filter->set_message_loop(thread->message_loop()); + filter_threads_.push_back(thread.release()); + } + + // Create the filter's host. + DCHECK(IsPipelineOk()); + scoped_ptr<FilterHostImpl> host(new FilterHostImpl(this, filter.get())); + filter->set_host(host.get()); + filter_hosts_.push_back(host.release()); + + // Now initialize the filter. + if (!filter->Initialize(source)) { + Error(PIPELINE_ERROR_INITIALIZATION_FAILED); } } -void PipelineThread::CreateDataSource() { - DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); +void PipelineInternal::CreateDataSource() { + DCHECK_EQ(MessageLoop::current(), message_loop_); DCHECK(IsPipelineOk()); MediaFormat url_format; @@ -630,8 +602,8 @@ void PipelineThread::CreateDataSource() { CreateFilter<DataSource>(filter_factory_, url_, url_format); } -void PipelineThread::CreateDemuxer() { - DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); +void PipelineInternal::CreateDemuxer() { + DCHECK_EQ(MessageLoop::current(), message_loop_); DCHECK(IsPipelineOk()); scoped_refptr<DataSource> data_source; @@ -641,8 +613,8 @@ void PipelineThread::CreateDemuxer() { } template <class Decoder> -bool PipelineThread::CreateDecoder() { - DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); +bool PipelineInternal::CreateDecoder() { + DCHECK_EQ(MessageLoop::current(), message_loop_); DCHECK(IsPipelineOk()); scoped_refptr<Demuxer> demuxer; @@ -664,8 +636,8 @@ bool PipelineThread::CreateDecoder() { } template <class Decoder, class Renderer> -bool PipelineThread::CreateRenderer() { - DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); +bool PipelineInternal::CreateRenderer() { + DCHECK_EQ(MessageLoop::current(), message_loop_); DCHECK(IsPipelineOk()); scoped_refptr<Decoder> decoder; @@ -681,8 +653,8 @@ bool PipelineThread::CreateRenderer() { } template <class Filter> -void PipelineThread::GetFilter(scoped_refptr<Filter>* filter_out) const { - DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); +void PipelineInternal::GetFilter(scoped_refptr<Filter>* filter_out) const { + DCHECK_EQ(MessageLoop::current(), message_loop_); *filter_out = NULL; for (FilterHostVector::const_iterator iter = filter_hosts_.begin(); @@ -692,4 +664,53 @@ void PipelineThread::GetFilter(scoped_refptr<Filter>* filter_out) const { } } +void PipelineInternal::DestroyFilters() { + // Stop every filter. + for (FilterHostVector::iterator iter = filter_hosts_.begin(); + iter != filter_hosts_.end(); + ++iter) { + (*iter)->Stop(); + } + + // Crude blocking counter implementation. + Lock lock; + ConditionVariable wait_for_zero(&lock); + int count = filter_threads_.size(); + + // Post a task to every filter's thread to ensure that they've completed their + // stopping logic before stopping the threads themselves. + // + // TODO(scherkus): again, Stop() should either be synchronous or we should + // receive a signal from filters that they have indeed stopped. + for (FilterThreadVector::iterator iter = filter_threads_.begin(); + iter != filter_threads_.end(); + ++iter) { + (*iter)->message_loop()->PostTask(FROM_HERE, + NewRunnableFunction(&DecrementCounter, &lock, &wait_for_zero, &count)); + } + + // Wait on our "blocking counter". + { + AutoLock auto_lock(lock); + while (count > 0) { + wait_for_zero.Wait(); + } + } + + // Stop every running filter thread. + // + // TODO(scherkus): can we watchdog this section to detect wedged threads? + for (FilterThreadVector::iterator iter = filter_threads_.begin(); + iter != filter_threads_.end(); + ++iter) { + (*iter)->Stop(); + } + + // Reset the pipeline, which will decrement a reference to this object. + // We will get destroyed as soon as the remaining tasks finish executing. + // To be safe, we'll set our pipeline reference to NULL. + STLDeleteElements(&filter_hosts_); + STLDeleteElements(&filter_threads_); +} + } // namespace media |