diff options
author | scherkus@chromium.org <scherkus@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2009-06-25 00:54:18 +0000 |
---|---|---|
committer | scherkus@chromium.org <scherkus@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2009-06-25 00:54:18 +0000 |
commit | d0f914b1965c44d92beb46480135390b73ae3e10 (patch) | |
tree | 04d39187483252e360ee35cc94b5ff92ee685b78 /media/base/pipeline_impl.cc | |
parent | bedb4a2725ebe735e4f8f1294b931e38dfb88ddc (diff) | |
download | chromium_src-d0f914b1965c44d92beb46480135390b73ae3e10.zip chromium_src-d0f914b1965c44d92beb46480135390b73ae3e10.tar.gz chromium_src-d0f914b1965c44d92beb46480135390b73ae3e10.tar.bz2 |
Refactor media pipeline and filters to use injected message loops.
Message loops are provided via MediaFilter::SetMessageLoop(). For now FFmpegDemuxer is the only filter taking advantage of injected message loops, and its unit tests have been updated as well.
TEST=FFmpegDemuxer tests should continue to run
BUG=none
Review URL: http://codereview.chromium.org/145014
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@19209 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'media/base/pipeline_impl.cc')
-rw-r--r-- | media/base/pipeline_impl.cc | 115 |
1 files changed, 110 insertions, 5 deletions
diff --git a/media/base/pipeline_impl.cc b/media/base/pipeline_impl.cc index 49ffb98..8a43382 100644 --- a/media/base/pipeline_impl.cc +++ b/media/base/pipeline_impl.cc @@ -1,8 +1,12 @@ -// Copyright (c) 2006-2009 The Chromium Authors. All rights reserved. +// Copyright (c) 2008-2009 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. +// +// TODO(scherkus): clean up PipelineImpl... too many crazy function names, +// potential deadlocks, nested message loops, etc... #include "base/compiler_specific.h" +#include "base/condition_variable.h" #include "base/stl_util-inl.h" #include "media/base/filter_host_impl.h" #include "media/base/media_format.h" @@ -10,6 +14,36 @@ namespace media { +namespace { + +// Small helper function to help us transition over to injected message loops. +// +// TODO(scherkus): have every filter support injected message loops. +template <class Filter> +bool SupportsSetMessageLoop() { + switch (Filter::filter_type()) { + case FILTER_DEMUXER: + return true; + default: + return false; + } +} + +// Helper function used with NewRunnableMethod to implement a (very) crude +// blocking counter. +// +// TODO(scherkus): remove this as soon as Stop() is made asynchronous. +void DecrementCounter(Lock* lock, ConditionVariable* cond_var, int* count) { + AutoLock auto_lock(*lock); + --(*count); + CHECK(*count >= 0); + if (*count == 0) { + cond_var->Signal(); + } +} + +} // namespace + PipelineImpl::PipelineImpl() { ResetState(); } @@ -287,6 +321,8 @@ bool PipelineThread::Start(FilterFactory* filter_factory, // method. This method waits for that Lock to be released so that we know // that the thread is not executing a nested message loop. This way we know // that that Thread::Stop call will quit the appropriate message loop. +// +// TODO(scherkus): this can potentially deadlock, hack away our lock usage!! void PipelineThread::Stop() { if (thread_.IsRunning()) { PostTask(NewRunnableMethod(this, &PipelineThread::StopTask)); @@ -294,6 +330,7 @@ void PipelineThread::Stop() { thread_.Stop(); } DCHECK(filter_hosts_.empty()); + DCHECK(filter_threads_.empty()); } // Called on client's thread. @@ -418,16 +455,66 @@ void PipelineThread::StartTask(FilterFactory* filter_factory, // as the result of an error condition. If there is no error, then set the // pipeline's error_ member to PIPELINE_STOPPING. We stop the filters in the // reverse order. +// +// TODO(scherkus): beware! this can get posted multiple times! it shouldn't! void PipelineThread::StopTask() { if (PipelineOk()) { pipeline_->error_ = PIPELINE_STOPPING; } + // Stop every filter. for (FilterHostVector::iterator iter = filter_hosts_.begin(); iter != filter_hosts_.end(); ++iter) { (*iter)->Stop(); } + + // Figure out how many threads we have to stop. + // + // TODO(scherkus): remove the workaround for the "multiple StopTask()" issue. + FilterThreadVector running_threads; + for (FilterThreadVector::iterator iter = filter_threads_.begin(); + iter != filter_threads_.end(); + ++iter) { + if ((*iter)->IsRunning()) { + running_threads.push_back(*iter); + } + } + + // Crude blocking counter implementation. + Lock lock; + ConditionVariable wait_for_zero(&lock); + int count = running_threads.size(); + + // Post a task to every filter's thread to ensure that they've completed their + // stopping logic before stopping the threads themselves. + // + // TODO(scherkus): again, Stop() should either be synchronous or we should + // receive a signal from filters that they have indeed stopped. + for (FilterThreadVector::iterator iter = running_threads.begin(); + iter != running_threads.end(); + ++iter) { + (*iter)->message_loop()->PostTask(FROM_HERE, + NewRunnableFunction(&DecrementCounter, &lock, &wait_for_zero, &count)); + } + + // Wait on our "blocking counter". + { + AutoLock auto_lock(lock); + while (count > 0) { + wait_for_zero.Wait(); + } + } + + // Stop every running filter thread. + // + // TODO(scherkus): can we watchdog this section to detect wedged threads? + for (FilterThreadVector::iterator iter = running_threads.begin(); + iter != running_threads.end(); + ++iter) { + (*iter)->Stop(); + } + if (host_initializing_) { host_initializing_ = NULL; message_loop()->Quit(); @@ -546,10 +633,25 @@ scoped_refptr<Filter> PipelineThread::CreateFilter( if (NULL == host_initializing_) { Error(PIPELINE_ERROR_OUT_OF_MEMORY); } else { - filter_hosts_.push_back(host_initializing_); - filter->SetFilterHost(host_initializing_); - if (!filter->Initialize(source)) { - Error(PIPELINE_ERROR_INITIALIZATION_FAILED); + // Create a dedicated thread for this filter. + if (SupportsSetMessageLoop<Filter>()) { + scoped_ptr<base::Thread> thread(new base::Thread("FilterThread")); + if (!thread.get() || !thread->Start()) { + NOTREACHED() << "Could not start filter thread"; + Error(PIPELINE_ERROR_INITIALIZATION_FAILED); + } else { + filter->SetMessageLoop(thread->message_loop()); + filter_threads_.push_back(thread.release()); + } + } + + // Creating a thread could have failed, verify we're still OK. + if (PipelineOk()) { + filter_hosts_.push_back(host_initializing_); + filter->SetFilterHost(host_initializing_); + if (!filter->Initialize(source)) { + Error(PIPELINE_ERROR_INITIALIZATION_FAILED); + } } } } @@ -588,8 +690,11 @@ scoped_refptr<DataSource> PipelineThread::CreateDataSource( } // Called as a result of destruction of the thread. +// +// TODO(scherkus): this can block the client due to synchronous Stop() API call. void PipelineThread::WillDestroyCurrentMessageLoop() { STLDeleteElements(&filter_hosts_); + STLDeleteElements(&filter_threads_); } } // namespace media |