diff options
author | scherkus@chromium.org <scherkus@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2009-06-25 00:54:18 +0000 |
---|---|---|
committer | scherkus@chromium.org <scherkus@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2009-06-25 00:54:18 +0000 |
commit | d0f914b1965c44d92beb46480135390b73ae3e10 (patch) | |
tree | 04d39187483252e360ee35cc94b5ff92ee685b78 /media/base | |
parent | bedb4a2725ebe735e4f8f1294b931e38dfb88ddc (diff) | |
download | chromium_src-d0f914b1965c44d92beb46480135390b73ae3e10.zip chromium_src-d0f914b1965c44d92beb46480135390b73ae3e10.tar.gz chromium_src-d0f914b1965c44d92beb46480135390b73ae3e10.tar.bz2 |
Refactor media pipeline and filters to use injected message loops.
Message loops are provided via MediaFilter::SetMessageLoop(). For now FFmpegDemuxer is the only filter taking advantage of injected message loops, and its unit tests have been updated as well.
TEST=FFmpegDemuxer tests should continue to run
BUG=none
Review URL: http://codereview.chromium.org/145014
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@19209 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'media/base')
-rw-r--r-- | media/base/filters.h | 28 | ||||
-rw-r--r-- | media/base/mock_reader.h | 14 | ||||
-rw-r--r-- | media/base/pipeline_impl.cc | 115 | ||||
-rw-r--r-- | media/base/pipeline_impl.h | 3 |
4 files changed, 135 insertions, 25 deletions
diff --git a/media/base/filters.h b/media/base/filters.h index a98cd2d..7f59138 100644 --- a/media/base/filters.h +++ b/media/base/filters.h @@ -27,8 +27,8 @@ #include <string> #include "base/logging.h" +#include "base/message_loop.h" #include "base/ref_counted.h" -#include "base/task.h" #include "base/time.h" #include "media/base/media_format.h" @@ -56,18 +56,28 @@ enum FilterType { class MediaFilter : public base::RefCountedThreadSafe<MediaFilter> { public: - MediaFilter() : host_(NULL) {} + MediaFilter() : host_(NULL), message_loop_(NULL) {} // Sets the protected member |host_|. This is the first method called by // the FilterHost after a filter is created. The host holds a strong - // reference to the filter. The refernce held by the host is guaranteed + // reference to the filter. The reference held by the host is guaranteed // to be released before the host object is destroyed by the pipeline. virtual void SetFilterHost(FilterHost* host) { - DCHECK(NULL == host_); - DCHECK(NULL != host); + DCHECK(host); + DCHECK(!host_); host_ = host; } + // Sets the protected member |message_loop_|, which is used by filters for + // processing asynchronous tasks and maintaining synchronized access to + // internal data members. The message loop should be running and exceed the + // lifetime of the filter. + virtual void SetMessageLoop(MessageLoop* message_loop) { + DCHECK(message_loop); + DCHECK(!message_loop_); + message_loop_ = message_loop; + } + // The pipeline is being stopped either as a result of an error or because // the client called Stop(). virtual void Stop() = 0; @@ -76,15 +86,19 @@ class MediaFilter : public base::RefCountedThreadSafe<MediaFilter> { // method if they need to respond to this call. virtual void SetPlaybackRate(float playback_rate) {} - // The pipeline is being seeked to the specified time. Filters may implement + // The pipeline is seeking to the specified time. Filters may implement // this method if they need to respond to this call. virtual void Seek(base::TimeDelta time) {} protected: - FilterHost* host_; + // Only allow scoped_refptr<> to delete filters. friend class base::RefCountedThreadSafe<MediaFilter>; virtual ~MediaFilter() {} + // TODO(scherkus): make these private with public/protected accessors. + FilterHost* host_; + MessageLoop* message_loop_; + private: DISALLOW_COPY_AND_ASSIGN(MediaFilter); }; diff --git a/media/base/mock_reader.h b/media/base/mock_reader.h index ad8db88..3640981 100644 --- a/media/base/mock_reader.h +++ b/media/base/mock_reader.h @@ -21,8 +21,7 @@ class MockReader : public: MockReader() : called_(false), - expecting_call_(false), - wait_for_read_(false, false) { + expecting_call_(false) { } virtual ~MockReader() { @@ -34,7 +33,6 @@ class MockReader : expecting_call_ = false; called_ = false; buffer_ = NULL; - wait_for_read_.Reset(); } // Executes an asynchronous read on the given filter. @@ -45,12 +43,6 @@ class MockReader : filter->Read(NewCallback(this, &MockReader::OnReadComplete)); } - // Waits 500ms for the read callback to be completed. Returns true if the - // read was completed, false otherwise. - bool WaitForRead() { - return wait_for_read_.TimedWait(base::TimeDelta::FromMilliseconds(500)); - } - // Mock accessors. BufferType* buffer() { return buffer_; } bool called() { return called_; } @@ -63,7 +55,6 @@ class MockReader : expecting_call_ = false; called_ = true; buffer_ = buffer; - wait_for_read_.Signal(); } // Reference to the buffer provided in the callback. @@ -75,9 +66,6 @@ class MockReader : // Whether or not this reader was expecting a callback. bool expecting_call_; - // Used by tests to wait for the callback to be executed. - base::WaitableEvent wait_for_read_; - DISALLOW_COPY_AND_ASSIGN(MockReader); }; diff --git a/media/base/pipeline_impl.cc b/media/base/pipeline_impl.cc index 49ffb98..8a43382 100644 --- a/media/base/pipeline_impl.cc +++ b/media/base/pipeline_impl.cc @@ -1,8 +1,12 @@ -// Copyright (c) 2006-2009 The Chromium Authors. All rights reserved. +// Copyright (c) 2008-2009 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. +// +// TODO(scherkus): clean up PipelineImpl... too many crazy function names, +// potential deadlocks, nested message loops, etc... #include "base/compiler_specific.h" +#include "base/condition_variable.h" #include "base/stl_util-inl.h" #include "media/base/filter_host_impl.h" #include "media/base/media_format.h" @@ -10,6 +14,36 @@ namespace media { +namespace { + +// Small helper function to help us transition over to injected message loops. +// +// TODO(scherkus): have every filter support injected message loops. +template <class Filter> +bool SupportsSetMessageLoop() { + switch (Filter::filter_type()) { + case FILTER_DEMUXER: + return true; + default: + return false; + } +} + +// Helper function used with NewRunnableMethod to implement a (very) crude +// blocking counter. +// +// TODO(scherkus): remove this as soon as Stop() is made asynchronous. +void DecrementCounter(Lock* lock, ConditionVariable* cond_var, int* count) { + AutoLock auto_lock(*lock); + --(*count); + CHECK(*count >= 0); + if (*count == 0) { + cond_var->Signal(); + } +} + +} // namespace + PipelineImpl::PipelineImpl() { ResetState(); } @@ -287,6 +321,8 @@ bool PipelineThread::Start(FilterFactory* filter_factory, // method. This method waits for that Lock to be released so that we know // that the thread is not executing a nested message loop. This way we know // that that Thread::Stop call will quit the appropriate message loop. +// +// TODO(scherkus): this can potentially deadlock, hack away our lock usage!! void PipelineThread::Stop() { if (thread_.IsRunning()) { PostTask(NewRunnableMethod(this, &PipelineThread::StopTask)); @@ -294,6 +330,7 @@ void PipelineThread::Stop() { thread_.Stop(); } DCHECK(filter_hosts_.empty()); + DCHECK(filter_threads_.empty()); } // Called on client's thread. @@ -418,16 +455,66 @@ void PipelineThread::StartTask(FilterFactory* filter_factory, // as the result of an error condition. If there is no error, then set the // pipeline's error_ member to PIPELINE_STOPPING. We stop the filters in the // reverse order. +// +// TODO(scherkus): beware! this can get posted multiple times! it shouldn't! void PipelineThread::StopTask() { if (PipelineOk()) { pipeline_->error_ = PIPELINE_STOPPING; } + // Stop every filter. for (FilterHostVector::iterator iter = filter_hosts_.begin(); iter != filter_hosts_.end(); ++iter) { (*iter)->Stop(); } + + // Figure out how many threads we have to stop. + // + // TODO(scherkus): remove the workaround for the "multiple StopTask()" issue. + FilterThreadVector running_threads; + for (FilterThreadVector::iterator iter = filter_threads_.begin(); + iter != filter_threads_.end(); + ++iter) { + if ((*iter)->IsRunning()) { + running_threads.push_back(*iter); + } + } + + // Crude blocking counter implementation. + Lock lock; + ConditionVariable wait_for_zero(&lock); + int count = running_threads.size(); + + // Post a task to every filter's thread to ensure that they've completed their + // stopping logic before stopping the threads themselves. + // + // TODO(scherkus): again, Stop() should either be synchronous or we should + // receive a signal from filters that they have indeed stopped. + for (FilterThreadVector::iterator iter = running_threads.begin(); + iter != running_threads.end(); + ++iter) { + (*iter)->message_loop()->PostTask(FROM_HERE, + NewRunnableFunction(&DecrementCounter, &lock, &wait_for_zero, &count)); + } + + // Wait on our "blocking counter". + { + AutoLock auto_lock(lock); + while (count > 0) { + wait_for_zero.Wait(); + } + } + + // Stop every running filter thread. + // + // TODO(scherkus): can we watchdog this section to detect wedged threads? + for (FilterThreadVector::iterator iter = running_threads.begin(); + iter != running_threads.end(); + ++iter) { + (*iter)->Stop(); + } + if (host_initializing_) { host_initializing_ = NULL; message_loop()->Quit(); @@ -546,10 +633,25 @@ scoped_refptr<Filter> PipelineThread::CreateFilter( if (NULL == host_initializing_) { Error(PIPELINE_ERROR_OUT_OF_MEMORY); } else { - filter_hosts_.push_back(host_initializing_); - filter->SetFilterHost(host_initializing_); - if (!filter->Initialize(source)) { - Error(PIPELINE_ERROR_INITIALIZATION_FAILED); + // Create a dedicated thread for this filter. + if (SupportsSetMessageLoop<Filter>()) { + scoped_ptr<base::Thread> thread(new base::Thread("FilterThread")); + if (!thread.get() || !thread->Start()) { + NOTREACHED() << "Could not start filter thread"; + Error(PIPELINE_ERROR_INITIALIZATION_FAILED); + } else { + filter->SetMessageLoop(thread->message_loop()); + filter_threads_.push_back(thread.release()); + } + } + + // Creating a thread could have failed, verify we're still OK. + if (PipelineOk()) { + filter_hosts_.push_back(host_initializing_); + filter->SetFilterHost(host_initializing_); + if (!filter->Initialize(source)) { + Error(PIPELINE_ERROR_INITIALIZATION_FAILED); + } } } } @@ -588,8 +690,11 @@ scoped_refptr<DataSource> PipelineThread::CreateDataSource( } // Called as a result of destruction of the thread. +// +// TODO(scherkus): this can block the client due to synchronous Stop() API call. void PipelineThread::WillDestroyCurrentMessageLoop() { STLDeleteElements(&filter_hosts_); + STLDeleteElements(&filter_threads_); } } // namespace media diff --git a/media/base/pipeline_impl.h b/media/base/pipeline_impl.h index ad57444..07d9c05 100644 --- a/media/base/pipeline_impl.h +++ b/media/base/pipeline_impl.h @@ -333,6 +333,9 @@ class PipelineThread : public base::RefCountedThreadSafe<PipelineThread>, typedef std::vector<FilterHostImpl*> FilterHostVector; FilterHostVector filter_hosts_; + typedef std::vector<base::Thread*> FilterThreadVector; + FilterThreadVector filter_threads_; + DISALLOW_COPY_AND_ASSIGN(PipelineThread); }; |