summaryrefslogtreecommitdiffstats
path: root/media/base
diff options
context:
space:
mode:
authorscherkus@chromium.org <scherkus@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2009-06-25 00:54:18 +0000
committerscherkus@chromium.org <scherkus@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2009-06-25 00:54:18 +0000
commitd0f914b1965c44d92beb46480135390b73ae3e10 (patch)
tree04d39187483252e360ee35cc94b5ff92ee685b78 /media/base
parentbedb4a2725ebe735e4f8f1294b931e38dfb88ddc (diff)
downloadchromium_src-d0f914b1965c44d92beb46480135390b73ae3e10.zip
chromium_src-d0f914b1965c44d92beb46480135390b73ae3e10.tar.gz
chromium_src-d0f914b1965c44d92beb46480135390b73ae3e10.tar.bz2
Refactor media pipeline and filters to use injected message loops.
Message loops are provided via MediaFilter::SetMessageLoop(). For now FFmpegDemuxer is the only filter taking advantage of injected message loops, and its unit tests have been updated as well. TEST=FFmpegDemuxer tests should continue to run BUG=none Review URL: http://codereview.chromium.org/145014 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@19209 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'media/base')
-rw-r--r--media/base/filters.h28
-rw-r--r--media/base/mock_reader.h14
-rw-r--r--media/base/pipeline_impl.cc115
-rw-r--r--media/base/pipeline_impl.h3
4 files changed, 135 insertions, 25 deletions
diff --git a/media/base/filters.h b/media/base/filters.h
index a98cd2d..7f59138 100644
--- a/media/base/filters.h
+++ b/media/base/filters.h
@@ -27,8 +27,8 @@
#include <string>
#include "base/logging.h"
+#include "base/message_loop.h"
#include "base/ref_counted.h"
-#include "base/task.h"
#include "base/time.h"
#include "media/base/media_format.h"
@@ -56,18 +56,28 @@ enum FilterType {
class MediaFilter : public base::RefCountedThreadSafe<MediaFilter> {
public:
- MediaFilter() : host_(NULL) {}
+ MediaFilter() : host_(NULL), message_loop_(NULL) {}
// Sets the protected member |host_|. This is the first method called by
// the FilterHost after a filter is created. The host holds a strong
- // reference to the filter. The refernce held by the host is guaranteed
+ // reference to the filter. The reference held by the host is guaranteed
// to be released before the host object is destroyed by the pipeline.
virtual void SetFilterHost(FilterHost* host) {
- DCHECK(NULL == host_);
- DCHECK(NULL != host);
+ DCHECK(host);
+ DCHECK(!host_);
host_ = host;
}
+ // Sets the protected member |message_loop_|, which is used by filters for
+ // processing asynchronous tasks and maintaining synchronized access to
+ // internal data members. The message loop should be running and exceed the
+ // lifetime of the filter.
+ virtual void SetMessageLoop(MessageLoop* message_loop) {
+ DCHECK(message_loop);
+ DCHECK(!message_loop_);
+ message_loop_ = message_loop;
+ }
+
// The pipeline is being stopped either as a result of an error or because
// the client called Stop().
virtual void Stop() = 0;
@@ -76,15 +86,19 @@ class MediaFilter : public base::RefCountedThreadSafe<MediaFilter> {
// method if they need to respond to this call.
virtual void SetPlaybackRate(float playback_rate) {}
- // The pipeline is being seeked to the specified time. Filters may implement
+ // The pipeline is seeking to the specified time. Filters may implement
// this method if they need to respond to this call.
virtual void Seek(base::TimeDelta time) {}
protected:
- FilterHost* host_;
+ // Only allow scoped_refptr<> to delete filters.
friend class base::RefCountedThreadSafe<MediaFilter>;
virtual ~MediaFilter() {}
+ // TODO(scherkus): make these private with public/protected accessors.
+ FilterHost* host_;
+ MessageLoop* message_loop_;
+
private:
DISALLOW_COPY_AND_ASSIGN(MediaFilter);
};
diff --git a/media/base/mock_reader.h b/media/base/mock_reader.h
index ad8db88..3640981 100644
--- a/media/base/mock_reader.h
+++ b/media/base/mock_reader.h
@@ -21,8 +21,7 @@ class MockReader :
public:
MockReader()
: called_(false),
- expecting_call_(false),
- wait_for_read_(false, false) {
+ expecting_call_(false) {
}
virtual ~MockReader() {
@@ -34,7 +33,6 @@ class MockReader :
expecting_call_ = false;
called_ = false;
buffer_ = NULL;
- wait_for_read_.Reset();
}
// Executes an asynchronous read on the given filter.
@@ -45,12 +43,6 @@ class MockReader :
filter->Read(NewCallback(this, &MockReader::OnReadComplete));
}
- // Waits 500ms for the read callback to be completed. Returns true if the
- // read was completed, false otherwise.
- bool WaitForRead() {
- return wait_for_read_.TimedWait(base::TimeDelta::FromMilliseconds(500));
- }
-
// Mock accessors.
BufferType* buffer() { return buffer_; }
bool called() { return called_; }
@@ -63,7 +55,6 @@ class MockReader :
expecting_call_ = false;
called_ = true;
buffer_ = buffer;
- wait_for_read_.Signal();
}
// Reference to the buffer provided in the callback.
@@ -75,9 +66,6 @@ class MockReader :
// Whether or not this reader was expecting a callback.
bool expecting_call_;
- // Used by tests to wait for the callback to be executed.
- base::WaitableEvent wait_for_read_;
-
DISALLOW_COPY_AND_ASSIGN(MockReader);
};
diff --git a/media/base/pipeline_impl.cc b/media/base/pipeline_impl.cc
index 49ffb98..8a43382 100644
--- a/media/base/pipeline_impl.cc
+++ b/media/base/pipeline_impl.cc
@@ -1,8 +1,12 @@
-// Copyright (c) 2006-2009 The Chromium Authors. All rights reserved.
+// Copyright (c) 2008-2009 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
+//
+// TODO(scherkus): clean up PipelineImpl... too many crazy function names,
+// potential deadlocks, nested message loops, etc...
#include "base/compiler_specific.h"
+#include "base/condition_variable.h"
#include "base/stl_util-inl.h"
#include "media/base/filter_host_impl.h"
#include "media/base/media_format.h"
@@ -10,6 +14,36 @@
namespace media {
+namespace {
+
+// Small helper function to help us transition over to injected message loops.
+//
+// TODO(scherkus): have every filter support injected message loops.
+template <class Filter>
+bool SupportsSetMessageLoop() {
+ switch (Filter::filter_type()) {
+ case FILTER_DEMUXER:
+ return true;
+ default:
+ return false;
+ }
+}
+
+// Helper function used with NewRunnableMethod to implement a (very) crude
+// blocking counter.
+//
+// TODO(scherkus): remove this as soon as Stop() is made asynchronous.
+void DecrementCounter(Lock* lock, ConditionVariable* cond_var, int* count) {
+ AutoLock auto_lock(*lock);
+ --(*count);
+ CHECK(*count >= 0);
+ if (*count == 0) {
+ cond_var->Signal();
+ }
+}
+
+} // namespace
+
PipelineImpl::PipelineImpl() {
ResetState();
}
@@ -287,6 +321,8 @@ bool PipelineThread::Start(FilterFactory* filter_factory,
// method. This method waits for that Lock to be released so that we know
// that the thread is not executing a nested message loop. This way we know
// that that Thread::Stop call will quit the appropriate message loop.
+//
+// TODO(scherkus): this can potentially deadlock, hack away our lock usage!!
void PipelineThread::Stop() {
if (thread_.IsRunning()) {
PostTask(NewRunnableMethod(this, &PipelineThread::StopTask));
@@ -294,6 +330,7 @@ void PipelineThread::Stop() {
thread_.Stop();
}
DCHECK(filter_hosts_.empty());
+ DCHECK(filter_threads_.empty());
}
// Called on client's thread.
@@ -418,16 +455,66 @@ void PipelineThread::StartTask(FilterFactory* filter_factory,
// as the result of an error condition. If there is no error, then set the
// pipeline's error_ member to PIPELINE_STOPPING. We stop the filters in the
// reverse order.
+//
+// TODO(scherkus): beware! this can get posted multiple times! it shouldn't!
void PipelineThread::StopTask() {
if (PipelineOk()) {
pipeline_->error_ = PIPELINE_STOPPING;
}
+ // Stop every filter.
for (FilterHostVector::iterator iter = filter_hosts_.begin();
iter != filter_hosts_.end();
++iter) {
(*iter)->Stop();
}
+
+ // Figure out how many threads we have to stop.
+ //
+ // TODO(scherkus): remove the workaround for the "multiple StopTask()" issue.
+ FilterThreadVector running_threads;
+ for (FilterThreadVector::iterator iter = filter_threads_.begin();
+ iter != filter_threads_.end();
+ ++iter) {
+ if ((*iter)->IsRunning()) {
+ running_threads.push_back(*iter);
+ }
+ }
+
+ // Crude blocking counter implementation.
+ Lock lock;
+ ConditionVariable wait_for_zero(&lock);
+ int count = running_threads.size();
+
+ // Post a task to every filter's thread to ensure that they've completed their
+ // stopping logic before stopping the threads themselves.
+ //
+ // TODO(scherkus): again, Stop() should either be synchronous or we should
+ // receive a signal from filters that they have indeed stopped.
+ for (FilterThreadVector::iterator iter = running_threads.begin();
+ iter != running_threads.end();
+ ++iter) {
+ (*iter)->message_loop()->PostTask(FROM_HERE,
+ NewRunnableFunction(&DecrementCounter, &lock, &wait_for_zero, &count));
+ }
+
+ // Wait on our "blocking counter".
+ {
+ AutoLock auto_lock(lock);
+ while (count > 0) {
+ wait_for_zero.Wait();
+ }
+ }
+
+ // Stop every running filter thread.
+ //
+ // TODO(scherkus): can we watchdog this section to detect wedged threads?
+ for (FilterThreadVector::iterator iter = running_threads.begin();
+ iter != running_threads.end();
+ ++iter) {
+ (*iter)->Stop();
+ }
+
if (host_initializing_) {
host_initializing_ = NULL;
message_loop()->Quit();
@@ -546,10 +633,25 @@ scoped_refptr<Filter> PipelineThread::CreateFilter(
if (NULL == host_initializing_) {
Error(PIPELINE_ERROR_OUT_OF_MEMORY);
} else {
- filter_hosts_.push_back(host_initializing_);
- filter->SetFilterHost(host_initializing_);
- if (!filter->Initialize(source)) {
- Error(PIPELINE_ERROR_INITIALIZATION_FAILED);
+ // Create a dedicated thread for this filter.
+ if (SupportsSetMessageLoop<Filter>()) {
+ scoped_ptr<base::Thread> thread(new base::Thread("FilterThread"));
+ if (!thread.get() || !thread->Start()) {
+ NOTREACHED() << "Could not start filter thread";
+ Error(PIPELINE_ERROR_INITIALIZATION_FAILED);
+ } else {
+ filter->SetMessageLoop(thread->message_loop());
+ filter_threads_.push_back(thread.release());
+ }
+ }
+
+ // Creating a thread could have failed, verify we're still OK.
+ if (PipelineOk()) {
+ filter_hosts_.push_back(host_initializing_);
+ filter->SetFilterHost(host_initializing_);
+ if (!filter->Initialize(source)) {
+ Error(PIPELINE_ERROR_INITIALIZATION_FAILED);
+ }
}
}
}
@@ -588,8 +690,11 @@ scoped_refptr<DataSource> PipelineThread::CreateDataSource(
}
// Called as a result of destruction of the thread.
+//
+// TODO(scherkus): this can block the client due to synchronous Stop() API call.
void PipelineThread::WillDestroyCurrentMessageLoop() {
STLDeleteElements(&filter_hosts_);
+ STLDeleteElements(&filter_threads_);
}
} // namespace media
diff --git a/media/base/pipeline_impl.h b/media/base/pipeline_impl.h
index ad57444..07d9c05 100644
--- a/media/base/pipeline_impl.h
+++ b/media/base/pipeline_impl.h
@@ -333,6 +333,9 @@ class PipelineThread : public base::RefCountedThreadSafe<PipelineThread>,
typedef std::vector<FilterHostImpl*> FilterHostVector;
FilterHostVector filter_hosts_;
+ typedef std::vector<base::Thread*> FilterThreadVector;
+ FilterThreadVector filter_threads_;
+
DISALLOW_COPY_AND_ASSIGN(PipelineThread);
};