summaryrefslogtreecommitdiffstats
path: root/media/base/pipeline_impl.cc
diff options
context:
space:
mode:
authorscherkus@chromium.org <scherkus@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2009-06-25 00:54:18 +0000
committerscherkus@chromium.org <scherkus@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2009-06-25 00:54:18 +0000
commitd0f914b1965c44d92beb46480135390b73ae3e10 (patch)
tree04d39187483252e360ee35cc94b5ff92ee685b78 /media/base/pipeline_impl.cc
parentbedb4a2725ebe735e4f8f1294b931e38dfb88ddc (diff)
downloadchromium_src-d0f914b1965c44d92beb46480135390b73ae3e10.zip
chromium_src-d0f914b1965c44d92beb46480135390b73ae3e10.tar.gz
chromium_src-d0f914b1965c44d92beb46480135390b73ae3e10.tar.bz2
Refactor media pipeline and filters to use injected message loops.
Message loops are provided via MediaFilter::SetMessageLoop(). For now FFmpegDemuxer is the only filter taking advantage of injected message loops, and its unit tests have been updated as well. TEST=FFmpegDemuxer tests should continue to run BUG=none Review URL: http://codereview.chromium.org/145014 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@19209 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'media/base/pipeline_impl.cc')
-rw-r--r--media/base/pipeline_impl.cc115
1 files changed, 110 insertions, 5 deletions
diff --git a/media/base/pipeline_impl.cc b/media/base/pipeline_impl.cc
index 49ffb98..8a43382 100644
--- a/media/base/pipeline_impl.cc
+++ b/media/base/pipeline_impl.cc
@@ -1,8 +1,12 @@
-// Copyright (c) 2006-2009 The Chromium Authors. All rights reserved.
+// Copyright (c) 2008-2009 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
+//
+// TODO(scherkus): clean up PipelineImpl... too many crazy function names,
+// potential deadlocks, nested message loops, etc...
#include "base/compiler_specific.h"
+#include "base/condition_variable.h"
#include "base/stl_util-inl.h"
#include "media/base/filter_host_impl.h"
#include "media/base/media_format.h"
@@ -10,6 +14,36 @@
namespace media {
+namespace {
+
+// Small helper function to help us transition over to injected message loops.
+//
+// TODO(scherkus): have every filter support injected message loops.
+template <class Filter>
+bool SupportsSetMessageLoop() {
+ switch (Filter::filter_type()) {
+ case FILTER_DEMUXER:
+ return true;
+ default:
+ return false;
+ }
+}
+
+// Helper function used with NewRunnableMethod to implement a (very) crude
+// blocking counter.
+//
+// TODO(scherkus): remove this as soon as Stop() is made asynchronous.
+void DecrementCounter(Lock* lock, ConditionVariable* cond_var, int* count) {
+ AutoLock auto_lock(*lock);
+ --(*count);
+ CHECK(*count >= 0);
+ if (*count == 0) {
+ cond_var->Signal();
+ }
+}
+
+} // namespace
+
PipelineImpl::PipelineImpl() {
ResetState();
}
@@ -287,6 +321,8 @@ bool PipelineThread::Start(FilterFactory* filter_factory,
// method. This method waits for that Lock to be released so that we know
// that the thread is not executing a nested message loop. This way we know
// that that Thread::Stop call will quit the appropriate message loop.
+//
+// TODO(scherkus): this can potentially deadlock, hack away our lock usage!!
void PipelineThread::Stop() {
if (thread_.IsRunning()) {
PostTask(NewRunnableMethod(this, &PipelineThread::StopTask));
@@ -294,6 +330,7 @@ void PipelineThread::Stop() {
thread_.Stop();
}
DCHECK(filter_hosts_.empty());
+ DCHECK(filter_threads_.empty());
}
// Called on client's thread.
@@ -418,16 +455,66 @@ void PipelineThread::StartTask(FilterFactory* filter_factory,
// as the result of an error condition. If there is no error, then set the
// pipeline's error_ member to PIPELINE_STOPPING. We stop the filters in the
// reverse order.
+//
+// TODO(scherkus): beware! this can get posted multiple times! it shouldn't!
void PipelineThread::StopTask() {
if (PipelineOk()) {
pipeline_->error_ = PIPELINE_STOPPING;
}
+ // Stop every filter.
for (FilterHostVector::iterator iter = filter_hosts_.begin();
iter != filter_hosts_.end();
++iter) {
(*iter)->Stop();
}
+
+ // Figure out how many threads we have to stop.
+ //
+ // TODO(scherkus): remove the workaround for the "multiple StopTask()" issue.
+ FilterThreadVector running_threads;
+ for (FilterThreadVector::iterator iter = filter_threads_.begin();
+ iter != filter_threads_.end();
+ ++iter) {
+ if ((*iter)->IsRunning()) {
+ running_threads.push_back(*iter);
+ }
+ }
+
+ // Crude blocking counter implementation.
+ Lock lock;
+ ConditionVariable wait_for_zero(&lock);
+ int count = running_threads.size();
+
+ // Post a task to every filter's thread to ensure that they've completed their
+ // stopping logic before stopping the threads themselves.
+ //
+ // TODO(scherkus): again, Stop() should either be synchronous or we should
+ // receive a signal from filters that they have indeed stopped.
+ for (FilterThreadVector::iterator iter = running_threads.begin();
+ iter != running_threads.end();
+ ++iter) {
+ (*iter)->message_loop()->PostTask(FROM_HERE,
+ NewRunnableFunction(&DecrementCounter, &lock, &wait_for_zero, &count));
+ }
+
+ // Wait on our "blocking counter".
+ {
+ AutoLock auto_lock(lock);
+ while (count > 0) {
+ wait_for_zero.Wait();
+ }
+ }
+
+ // Stop every running filter thread.
+ //
+ // TODO(scherkus): can we watchdog this section to detect wedged threads?
+ for (FilterThreadVector::iterator iter = running_threads.begin();
+ iter != running_threads.end();
+ ++iter) {
+ (*iter)->Stop();
+ }
+
if (host_initializing_) {
host_initializing_ = NULL;
message_loop()->Quit();
@@ -546,10 +633,25 @@ scoped_refptr<Filter> PipelineThread::CreateFilter(
if (NULL == host_initializing_) {
Error(PIPELINE_ERROR_OUT_OF_MEMORY);
} else {
- filter_hosts_.push_back(host_initializing_);
- filter->SetFilterHost(host_initializing_);
- if (!filter->Initialize(source)) {
- Error(PIPELINE_ERROR_INITIALIZATION_FAILED);
+ // Create a dedicated thread for this filter.
+ if (SupportsSetMessageLoop<Filter>()) {
+ scoped_ptr<base::Thread> thread(new base::Thread("FilterThread"));
+ if (!thread.get() || !thread->Start()) {
+ NOTREACHED() << "Could not start filter thread";
+ Error(PIPELINE_ERROR_INITIALIZATION_FAILED);
+ } else {
+ filter->SetMessageLoop(thread->message_loop());
+ filter_threads_.push_back(thread.release());
+ }
+ }
+
+ // Creating a thread could have failed, verify we're still OK.
+ if (PipelineOk()) {
+ filter_hosts_.push_back(host_initializing_);
+ filter->SetFilterHost(host_initializing_);
+ if (!filter->Initialize(source)) {
+ Error(PIPELINE_ERROR_INITIALIZATION_FAILED);
+ }
}
}
}
@@ -588,8 +690,11 @@ scoped_refptr<DataSource> PipelineThread::CreateDataSource(
}
// Called as a result of destruction of the thread.
+//
+// TODO(scherkus): this can block the client due to synchronous Stop() API call.
void PipelineThread::WillDestroyCurrentMessageLoop() {
STLDeleteElements(&filter_hosts_);
+ STLDeleteElements(&filter_threads_);
}
} // namespace media