diff options
author | scherkus@chromium.org <scherkus@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2009-06-25 00:54:18 +0000 |
---|---|---|
committer | scherkus@chromium.org <scherkus@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2009-06-25 00:54:18 +0000 |
commit | d0f914b1965c44d92beb46480135390b73ae3e10 (patch) | |
tree | 04d39187483252e360ee35cc94b5ff92ee685b78 | |
parent | bedb4a2725ebe735e4f8f1294b931e38dfb88ddc (diff) | |
download | chromium_src-d0f914b1965c44d92beb46480135390b73ae3e10.zip chromium_src-d0f914b1965c44d92beb46480135390b73ae3e10.tar.gz chromium_src-d0f914b1965c44d92beb46480135390b73ae3e10.tar.bz2 |
Refactor media pipeline and filters to use injected message loops.
Message loops are provided via MediaFilter::SetMessageLoop(). For now FFmpegDemuxer is the only filter taking advantage of injected message loops, and its unit tests have been updated as well.
TEST=FFmpegDemuxer tests should continue to run
BUG=none
Review URL: http://codereview.chromium.org/145014
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@19209 0039d316-1c4b-4281-b951-d872f2087c98
-rw-r--r-- | media/base/filters.h | 28 | ||||
-rw-r--r-- | media/base/mock_reader.h | 14 | ||||
-rw-r--r-- | media/base/pipeline_impl.cc | 115 | ||||
-rw-r--r-- | media/base/pipeline_impl.h | 3 | ||||
-rw-r--r-- | media/filters/ffmpeg_demuxer.cc | 167 | ||||
-rw-r--r-- | media/filters/ffmpeg_demuxer.h | 41 | ||||
-rw-r--r-- | media/filters/ffmpeg_demuxer_unittest.cc | 71 |
7 files changed, 266 insertions, 173 deletions
diff --git a/media/base/filters.h b/media/base/filters.h index a98cd2d..7f59138 100644 --- a/media/base/filters.h +++ b/media/base/filters.h @@ -27,8 +27,8 @@ #include <string> #include "base/logging.h" +#include "base/message_loop.h" #include "base/ref_counted.h" -#include "base/task.h" #include "base/time.h" #include "media/base/media_format.h" @@ -56,18 +56,28 @@ enum FilterType { class MediaFilter : public base::RefCountedThreadSafe<MediaFilter> { public: - MediaFilter() : host_(NULL) {} + MediaFilter() : host_(NULL), message_loop_(NULL) {} // Sets the protected member |host_|. This is the first method called by // the FilterHost after a filter is created. The host holds a strong - // reference to the filter. The refernce held by the host is guaranteed + // reference to the filter. The reference held by the host is guaranteed // to be released before the host object is destroyed by the pipeline. virtual void SetFilterHost(FilterHost* host) { - DCHECK(NULL == host_); - DCHECK(NULL != host); + DCHECK(host); + DCHECK(!host_); host_ = host; } + // Sets the protected member |message_loop_|, which is used by filters for + // processing asynchronous tasks and maintaining synchronized access to + // internal data members. The message loop should be running and exceed the + // lifetime of the filter. + virtual void SetMessageLoop(MessageLoop* message_loop) { + DCHECK(message_loop); + DCHECK(!message_loop_); + message_loop_ = message_loop; + } + // The pipeline is being stopped either as a result of an error or because // the client called Stop(). virtual void Stop() = 0; @@ -76,15 +86,19 @@ class MediaFilter : public base::RefCountedThreadSafe<MediaFilter> { // method if they need to respond to this call. virtual void SetPlaybackRate(float playback_rate) {} - // The pipeline is being seeked to the specified time. Filters may implement + // The pipeline is seeking to the specified time. Filters may implement // this method if they need to respond to this call. virtual void Seek(base::TimeDelta time) {} protected: - FilterHost* host_; + // Only allow scoped_refptr<> to delete filters. friend class base::RefCountedThreadSafe<MediaFilter>; virtual ~MediaFilter() {} + // TODO(scherkus): make these private with public/protected accessors. + FilterHost* host_; + MessageLoop* message_loop_; + private: DISALLOW_COPY_AND_ASSIGN(MediaFilter); }; diff --git a/media/base/mock_reader.h b/media/base/mock_reader.h index ad8db88..3640981 100644 --- a/media/base/mock_reader.h +++ b/media/base/mock_reader.h @@ -21,8 +21,7 @@ class MockReader : public: MockReader() : called_(false), - expecting_call_(false), - wait_for_read_(false, false) { + expecting_call_(false) { } virtual ~MockReader() { @@ -34,7 +33,6 @@ class MockReader : expecting_call_ = false; called_ = false; buffer_ = NULL; - wait_for_read_.Reset(); } // Executes an asynchronous read on the given filter. @@ -45,12 +43,6 @@ class MockReader : filter->Read(NewCallback(this, &MockReader::OnReadComplete)); } - // Waits 500ms for the read callback to be completed. Returns true if the - // read was completed, false otherwise. - bool WaitForRead() { - return wait_for_read_.TimedWait(base::TimeDelta::FromMilliseconds(500)); - } - // Mock accessors. BufferType* buffer() { return buffer_; } bool called() { return called_; } @@ -63,7 +55,6 @@ class MockReader : expecting_call_ = false; called_ = true; buffer_ = buffer; - wait_for_read_.Signal(); } // Reference to the buffer provided in the callback. @@ -75,9 +66,6 @@ class MockReader : // Whether or not this reader was expecting a callback. bool expecting_call_; - // Used by tests to wait for the callback to be executed. - base::WaitableEvent wait_for_read_; - DISALLOW_COPY_AND_ASSIGN(MockReader); }; diff --git a/media/base/pipeline_impl.cc b/media/base/pipeline_impl.cc index 49ffb98..8a43382 100644 --- a/media/base/pipeline_impl.cc +++ b/media/base/pipeline_impl.cc @@ -1,8 +1,12 @@ -// Copyright (c) 2006-2009 The Chromium Authors. All rights reserved. +// Copyright (c) 2008-2009 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. +// +// TODO(scherkus): clean up PipelineImpl... too many crazy function names, +// potential deadlocks, nested message loops, etc... #include "base/compiler_specific.h" +#include "base/condition_variable.h" #include "base/stl_util-inl.h" #include "media/base/filter_host_impl.h" #include "media/base/media_format.h" @@ -10,6 +14,36 @@ namespace media { +namespace { + +// Small helper function to help us transition over to injected message loops. +// +// TODO(scherkus): have every filter support injected message loops. +template <class Filter> +bool SupportsSetMessageLoop() { + switch (Filter::filter_type()) { + case FILTER_DEMUXER: + return true; + default: + return false; + } +} + +// Helper function used with NewRunnableMethod to implement a (very) crude +// blocking counter. +// +// TODO(scherkus): remove this as soon as Stop() is made asynchronous. +void DecrementCounter(Lock* lock, ConditionVariable* cond_var, int* count) { + AutoLock auto_lock(*lock); + --(*count); + CHECK(*count >= 0); + if (*count == 0) { + cond_var->Signal(); + } +} + +} // namespace + PipelineImpl::PipelineImpl() { ResetState(); } @@ -287,6 +321,8 @@ bool PipelineThread::Start(FilterFactory* filter_factory, // method. This method waits for that Lock to be released so that we know // that the thread is not executing a nested message loop. This way we know // that that Thread::Stop call will quit the appropriate message loop. +// +// TODO(scherkus): this can potentially deadlock, hack away our lock usage!! void PipelineThread::Stop() { if (thread_.IsRunning()) { PostTask(NewRunnableMethod(this, &PipelineThread::StopTask)); @@ -294,6 +330,7 @@ void PipelineThread::Stop() { thread_.Stop(); } DCHECK(filter_hosts_.empty()); + DCHECK(filter_threads_.empty()); } // Called on client's thread. @@ -418,16 +455,66 @@ void PipelineThread::StartTask(FilterFactory* filter_factory, // as the result of an error condition. If there is no error, then set the // pipeline's error_ member to PIPELINE_STOPPING. We stop the filters in the // reverse order. +// +// TODO(scherkus): beware! this can get posted multiple times! it shouldn't! void PipelineThread::StopTask() { if (PipelineOk()) { pipeline_->error_ = PIPELINE_STOPPING; } + // Stop every filter. for (FilterHostVector::iterator iter = filter_hosts_.begin(); iter != filter_hosts_.end(); ++iter) { (*iter)->Stop(); } + + // Figure out how many threads we have to stop. + // + // TODO(scherkus): remove the workaround for the "multiple StopTask()" issue. + FilterThreadVector running_threads; + for (FilterThreadVector::iterator iter = filter_threads_.begin(); + iter != filter_threads_.end(); + ++iter) { + if ((*iter)->IsRunning()) { + running_threads.push_back(*iter); + } + } + + // Crude blocking counter implementation. + Lock lock; + ConditionVariable wait_for_zero(&lock); + int count = running_threads.size(); + + // Post a task to every filter's thread to ensure that they've completed their + // stopping logic before stopping the threads themselves. + // + // TODO(scherkus): again, Stop() should either be synchronous or we should + // receive a signal from filters that they have indeed stopped. + for (FilterThreadVector::iterator iter = running_threads.begin(); + iter != running_threads.end(); + ++iter) { + (*iter)->message_loop()->PostTask(FROM_HERE, + NewRunnableFunction(&DecrementCounter, &lock, &wait_for_zero, &count)); + } + + // Wait on our "blocking counter". + { + AutoLock auto_lock(lock); + while (count > 0) { + wait_for_zero.Wait(); + } + } + + // Stop every running filter thread. + // + // TODO(scherkus): can we watchdog this section to detect wedged threads? + for (FilterThreadVector::iterator iter = running_threads.begin(); + iter != running_threads.end(); + ++iter) { + (*iter)->Stop(); + } + if (host_initializing_) { host_initializing_ = NULL; message_loop()->Quit(); @@ -546,10 +633,25 @@ scoped_refptr<Filter> PipelineThread::CreateFilter( if (NULL == host_initializing_) { Error(PIPELINE_ERROR_OUT_OF_MEMORY); } else { - filter_hosts_.push_back(host_initializing_); - filter->SetFilterHost(host_initializing_); - if (!filter->Initialize(source)) { - Error(PIPELINE_ERROR_INITIALIZATION_FAILED); + // Create a dedicated thread for this filter. + if (SupportsSetMessageLoop<Filter>()) { + scoped_ptr<base::Thread> thread(new base::Thread("FilterThread")); + if (!thread.get() || !thread->Start()) { + NOTREACHED() << "Could not start filter thread"; + Error(PIPELINE_ERROR_INITIALIZATION_FAILED); + } else { + filter->SetMessageLoop(thread->message_loop()); + filter_threads_.push_back(thread.release()); + } + } + + // Creating a thread could have failed, verify we're still OK. + if (PipelineOk()) { + filter_hosts_.push_back(host_initializing_); + filter->SetFilterHost(host_initializing_); + if (!filter->Initialize(source)) { + Error(PIPELINE_ERROR_INITIALIZATION_FAILED); + } } } } @@ -588,8 +690,11 @@ scoped_refptr<DataSource> PipelineThread::CreateDataSource( } // Called as a result of destruction of the thread. +// +// TODO(scherkus): this can block the client due to synchronous Stop() API call. void PipelineThread::WillDestroyCurrentMessageLoop() { STLDeleteElements(&filter_hosts_); + STLDeleteElements(&filter_threads_); } } // namespace media diff --git a/media/base/pipeline_impl.h b/media/base/pipeline_impl.h index ad57444..07d9c05 100644 --- a/media/base/pipeline_impl.h +++ b/media/base/pipeline_impl.h @@ -333,6 +333,9 @@ class PipelineThread : public base::RefCountedThreadSafe<PipelineThread>, typedef std::vector<FilterHostImpl*> FilterHostVector; FilterHostVector filter_hosts_; + typedef std::vector<base::Thread*> FilterThreadVector; + FilterThreadVector filter_threads_; + DISALLOW_COPY_AND_ASSIGN(PipelineThread); }; diff --git a/media/filters/ffmpeg_demuxer.cc b/media/filters/ffmpeg_demuxer.cc index 2e8a4c6..fb39f10 100644 --- a/media/filters/ffmpeg_demuxer.cc +++ b/media/filters/ffmpeg_demuxer.cc @@ -95,7 +95,6 @@ FFmpegDemuxerStream::FFmpegDemuxerStream(FFmpegDemuxer* demuxer, } FFmpegDemuxerStream::~FFmpegDemuxerStream() { - AutoLock auto_lock(lock_); DCHECK(stopped_); DCHECK(read_queue_.empty()); DCHECK(buffer_queue_.empty()); @@ -111,47 +110,44 @@ void* FFmpegDemuxerStream::QueryInterface(const char* id) { } bool FFmpegDemuxerStream::HasPendingReads() { - AutoLock auto_lock(lock_); + DCHECK_EQ(PlatformThread::CurrentId(), demuxer_->thread_id_); + DCHECK(!stopped_ || read_queue_.empty()) + << "Read queue should have been emptied if demuxing stream is stopped"; return !read_queue_.empty(); } base::TimeDelta FFmpegDemuxerStream::EnqueuePacket(AVPacket* packet) { + DCHECK_EQ(PlatformThread::CurrentId(), demuxer_->thread_id_); base::TimeDelta timestamp = ConvertTimestamp(packet->pts); base::TimeDelta duration = ConvertTimestamp(packet->duration); + if (stopped_) { + NOTREACHED() << "Attempted to enqueue packet on a stopped stream"; + return timestamp; + } + + // Enqueue the callback and attempt to satisfy a read immediately. scoped_refptr<Buffer> buffer = new AVPacketBuffer(packet, timestamp, duration); - DCHECK(buffer); - { - AutoLock auto_lock(lock_); - if (stopped_) { - NOTREACHED() << "Attempted to enqueue packet on a stopped stream."; - return timestamp; - } - buffer_queue_.push_back(buffer); + if (!buffer) { + NOTREACHED() << "Unable to allocate AVPacketBuffer"; + return timestamp; } - FulfillPendingReads(); + buffer_queue_.push_back(buffer); + FulfillPendingRead(); return timestamp; } void FFmpegDemuxerStream::FlushBuffers() { - AutoLock auto_lock(lock_); + DCHECK_EQ(PlatformThread::CurrentId(), demuxer_->thread_id_); buffer_queue_.clear(); discontinuous_ = true; } void FFmpegDemuxerStream::Stop() { - // Since |buffer_queue_| can be very large, we swap queues and delete outside - // of the lock. - BufferQueue tmp_buffer_queue; - ReadQueue tmp_read_queue; - { - AutoLock auto_lock(lock_); - buffer_queue_.swap(tmp_buffer_queue); - read_queue_.swap(tmp_read_queue); - stopped_ = true; - } - tmp_buffer_queue.clear(); - STLDeleteElements(&tmp_read_queue); + DCHECK_EQ(PlatformThread::CurrentId(), demuxer_->thread_id_); + buffer_queue_.clear(); + STLDeleteElements(&read_queue_); + stopped_ = true; } const MediaFormat& FFmpegDemuxerStream::media_format() { @@ -160,59 +156,53 @@ const MediaFormat& FFmpegDemuxerStream::media_format() { void FFmpegDemuxerStream::Read(Callback1<Buffer*>::Type* read_callback) { DCHECK(read_callback); - { - AutoLock auto_lock(lock_); - // Don't accept any additional reads if we've been told to stop. - // - // TODO(scherkus): it would be cleaner if we replied with an error message. - if (stopped_) { - delete read_callback; - return; - } + demuxer_->message_loop_->PostTask(FROM_HERE, + NewRunnableMethod(this, &FFmpegDemuxerStream::ReadTask, read_callback)); +} + +void FFmpegDemuxerStream::ReadTask(Callback1<Buffer*>::Type* read_callback) { + DCHECK_EQ(PlatformThread::CurrentId(), demuxer_->thread_id_); - // Otherwise enqueue the request. - read_queue_.push_back(read_callback); + // Don't accept any additional reads if we've been told to stop. + // + // TODO(scherkus): it would be cleaner if we replied with an error message. + if (stopped_) { + delete read_callback; + return; } - // See if we can immediately fulfill this read and whether we need to demux. - bool post_demux_task = FulfillPendingReads(); + // Enqueue the callback and attempt to satisfy it immediately. + read_queue_.push_back(read_callback); + FulfillPendingRead(); - // Since Read() may be executed on any thread, it's possible that StopTask() - // finishes executing on the demuxer thread and the message loop goes away. - // - // To prevent that we'll grab the lock and post a task at the same time, which - // will keep the message loop alive. - AutoLock auto_lock(lock_); - if (post_demux_task && !stopped_) { + // There are still pending reads, demux some more. + if (HasPendingReads()) { demuxer_->PostDemuxTask(); } } -bool FFmpegDemuxerStream::FulfillPendingReads() { - bool pending_reads = false; - while (true) { - scoped_refptr<Buffer> buffer; - scoped_ptr<Callback1<Buffer*>::Type> read_callback; - { - AutoLock auto_lock(lock_); - pending_reads = !read_queue_.empty(); - if (buffer_queue_.empty() || read_queue_.empty()) { - break; - } - buffer = buffer_queue_.front(); - read_callback.reset(read_queue_.front()); - buffer_queue_.pop_front(); - read_queue_.pop_front(); - - // Handle discontinuities due to FlushBuffers() being called. - if (discontinuous_) { - buffer->SetDiscontinuous(true); - discontinuous_ = false; - } - } - read_callback->Run(buffer); +void FFmpegDemuxerStream::FulfillPendingRead() { + DCHECK_EQ(PlatformThread::CurrentId(), demuxer_->thread_id_); + if (buffer_queue_.empty() || read_queue_.empty()) { + return; } - return pending_reads; + + // Dequeue a buffer and pending read pair. + scoped_refptr<Buffer> buffer = buffer_queue_.front(); + scoped_ptr<Callback1<Buffer*>::Type> read_callback(read_queue_.front()); + buffer_queue_.pop_front(); + read_queue_.pop_front(); + + // Handle discontinuities due to FlushBuffers() being called. + // + // TODO(scherkus): get rid of |discontinuous_| and use buffer flags. + if (discontinuous_) { + buffer->SetDiscontinuous(true); + discontinuous_ = false; + } + + // Execute the callback. + read_callback->Run(buffer); } base::TimeDelta FFmpegDemuxerStream::ConvertTimestamp(int64 timestamp) { @@ -225,11 +215,10 @@ base::TimeDelta FFmpegDemuxerStream::ConvertTimestamp(int64 timestamp) { // FFmpegDemuxer // FFmpegDemuxer::FFmpegDemuxer() - : thread_("DemuxerThread") { + : thread_id_(NULL) { } FFmpegDemuxer::~FFmpegDemuxer() { - DCHECK(!thread_.IsRunning()); DCHECK(!format_context_.get()); // TODO(scherkus): I believe we need to use av_close_input_file() here // instead of scoped_ptr_malloc calling av_free(). @@ -239,18 +228,14 @@ FFmpegDemuxer::~FFmpegDemuxer() { } void FFmpegDemuxer::PostDemuxTask() { - thread_.message_loop()->PostTask(FROM_HERE, + message_loop_->PostTask(FROM_HERE, NewRunnableMethod(this, &FFmpegDemuxer::DemuxTask)); } void FFmpegDemuxer::Stop() { - // Stop our thread and post a task notifying the streams to stop as well. - if (thread_.IsRunning()) { - thread_.message_loop()->PostTask(FROM_HERE, - NewRunnableMethod(this, &FFmpegDemuxer::StopTask)); - thread_.Stop(); - } - format_context_.reset(); + // Post a task to notify the streams to stop as well. + message_loop_->PostTask(FROM_HERE, + NewRunnableMethod(this, &FFmpegDemuxer::StopTask)); } void FFmpegDemuxer::Seek(base::TimeDelta time) { @@ -258,18 +243,12 @@ void FFmpegDemuxer::Seek(base::TimeDelta time) { // operation is completed and filters behind the demuxer is good to issue // more reads, but we are posting a task here, which makes the seek operation // asynchronous, should change how seek works to make it fully asynchronous. - thread_.message_loop()->PostTask(FROM_HERE, + message_loop_->PostTask(FROM_HERE, NewRunnableMethod(this, &FFmpegDemuxer::SeekTask, time)); } bool FFmpegDemuxer::Initialize(DataSource* data_source) { - // Start our internal demuxing thread. - if (!thread_.Start()) { - host_->Error(DEMUXER_ERROR_COULD_NOT_CREATE_THREAD); - return false; - } - - thread_.message_loop()->PostTask(FROM_HERE, + message_loop_->PostTask(FROM_HERE, NewRunnableMethod(this, &FFmpegDemuxer::InititalizeTask, data_source)); return true; } @@ -294,6 +273,10 @@ void FFmpegDemuxer::InititalizeTask(DataSource* data_source) { // // Refer to media/filters/ffmpeg_glue.h for details. + // Grab the thread id for debugging. + DCHECK(!thread_id_); + thread_id_ = PlatformThread::CurrentId(); + // Add our data source and get our unique key. std::string key = FFmpegGlue::get()->AddDataSource(data_source); @@ -353,6 +336,8 @@ void FFmpegDemuxer::InititalizeTask(DataSource* data_source) { } void FFmpegDemuxer::SeekTask(base::TimeDelta time) { + DCHECK_EQ(PlatformThread::CurrentId(), thread_id_); + // Tell streams to flush buffers due to seeking. StreamVector::iterator iter; for (iter = streams_.begin(); iter != streams_.end(); ++iter) { @@ -373,6 +358,8 @@ void FFmpegDemuxer::SeekTask(base::TimeDelta time) { } void FFmpegDemuxer::DemuxTask() { + DCHECK_EQ(PlatformThread::CurrentId(), thread_id_); + // Make sure we have work to do before demuxing. if (!StreamsHavePendingReads()) { return; @@ -428,14 +415,18 @@ void FFmpegDemuxer::DemuxTask() { } void FFmpegDemuxer::StopTask() { + DCHECK_EQ(PlatformThread::CurrentId(), thread_id_); StreamVector::iterator iter; for (iter = streams_.begin(); iter != streams_.end(); ++iter) { (*iter)->Stop(); } + + // Free our AVFormatContext. + format_context_.reset(); } bool FFmpegDemuxer::StreamsHavePendingReads() { - DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); + DCHECK_EQ(PlatformThread::CurrentId(), thread_id_); StreamVector::iterator iter; for (iter = streams_.begin(); iter != streams_.end(); ++iter) { if ((*iter)->HasPendingReads()) { @@ -446,7 +437,7 @@ bool FFmpegDemuxer::StreamsHavePendingReads() { } void FFmpegDemuxer::StreamHasEnded() { - DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id()); + DCHECK_EQ(PlatformThread::CurrentId(), thread_id_); StreamVector::iterator iter; for (iter = streams_.begin(); iter != streams_.end(); ++iter) { AVPacket* packet = new AVPacket(); diff --git a/media/filters/ffmpeg_demuxer.h b/media/filters/ffmpeg_demuxer.h index 2ffbf50..bb5c285 100644 --- a/media/filters/ffmpeg_demuxer.h +++ b/media/filters/ffmpeg_demuxer.h @@ -16,8 +16,8 @@ // excessive memory consumption. // // When stopped, FFmpegDemuxer and FFmpegDemuxerStream release all callbacks -// and buffered packets and shuts down its internal thread. Reads from a -// stopped FFmpegDemuxerStream will not be replied to. +// and buffered packets. Reads from a stopped FFmpegDemuxerStream will not be +// replied to. #ifndef MEDIA_FILTERS_FFMPEG_DEMUXER_H_ #define MEDIA_FILTERS_FFMPEG_DEMUXER_H_ @@ -25,9 +25,6 @@ #include <deque> #include <vector> -#include "base/lock.h" -#include "base/thread.h" -#include "base/waitable_event.h" #include "media/base/buffers.h" #include "media/base/factory.h" #include "media/base/filters.h" @@ -59,20 +56,20 @@ class FFmpegDemuxerStream : public DemuxerStream, public AVStreamProvider { // Returns true is this stream has pending reads, false otherwise. // // Safe to call on any thread. - bool HasPendingReads(); + virtual bool HasPendingReads(); // Enqueues and takes ownership over the given AVPacket, returns the timestamp // of the enqueued packet. - base::TimeDelta EnqueuePacket(AVPacket* packet); + virtual base::TimeDelta EnqueuePacket(AVPacket* packet); // Signals to empty the buffer queue and mark next packet as discontinuous. - void FlushBuffers(); + virtual void FlushBuffers(); // Empties the queues and ignores any additional calls to Read(). - void Stop(); + virtual void Stop(); // Returns the duration of this stream. - base::TimeDelta duration() { return duration_; } + virtual base::TimeDelta duration() { return duration_; } // DemuxerStream implementation. virtual const MediaFormat& media_format(); @@ -85,8 +82,12 @@ class FFmpegDemuxerStream : public DemuxerStream, public AVStreamProvider { virtual void* QueryInterface(const char* interface_id); private: - // Returns true if there are still pending reads. - bool FulfillPendingReads(); + // Carries out enqueuing a pending read on the demuxer thread. + void ReadTask(Callback1<Buffer*>::Type* read_callback); + + // Attempts to fulfill a single pending read by dequeueing a buffer and read + // callback pair and executing the callback. + void FulfillPendingRead(); // Converts an FFmpeg stream timestamp into a base::TimeDelta. base::TimeDelta ConvertTimestamp(int64 timestamp); @@ -98,9 +99,7 @@ class FFmpegDemuxerStream : public DemuxerStream, public AVStreamProvider { bool discontinuous_; bool stopped_; - Lock lock_; - - typedef std::deque< scoped_refptr<Buffer> > BufferQueue; + typedef std::deque<scoped_refptr<Buffer> > BufferQueue; BufferQueue buffer_queue_; typedef std::deque<Callback1<Buffer*>::Type*> ReadQueue; @@ -116,8 +115,8 @@ class FFmpegDemuxer : public Demuxer { return new FilterFactoryImpl0<FFmpegDemuxer>(); } - // Called by FFmpegDemuxerStreams to post a demuxing task. - void PostDemuxTask(); + // Posts a task to perform additional demuxing. + virtual void PostDemuxTask(); // MediaFilter implementation. virtual void Stop(); @@ -129,8 +128,8 @@ class FFmpegDemuxer : public Demuxer { virtual scoped_refptr<DemuxerStream> GetStream(int stream_id); private: - // Accesses |thread_| to create a helper method used by every test. - friend class FFmpegDemuxerTest; + // Allow FFmpegDemuxerStream to post tasks to our message loop. + friend class FFmpegDemuxerStream; // Only allow a factory to create this class. friend class FilterFactoryImpl0<FFmpegDemuxer>; @@ -182,8 +181,8 @@ class FFmpegDemuxer : public Demuxer { StreamVector streams_; StreamVector packet_streams_; - // Thread handle. - base::Thread thread_; + // Used for debugging. + PlatformThreadId thread_id_; DISALLOW_COPY_AND_ASSIGN(FFmpegDemuxer); }; diff --git a/media/filters/ffmpeg_demuxer_unittest.cc b/media/filters/ffmpeg_demuxer_unittest.cc index faa08ef..cdacf12 100644 --- a/media/filters/ffmpeg_demuxer_unittest.cc +++ b/media/filters/ffmpeg_demuxer_unittest.cc @@ -4,9 +4,6 @@ #include <deque> -#include "base/singleton.h" -#include "base/tuple.h" -#include "media/base/filter_host.h" #include "media/base/filters.h" #include "media/base/mock_ffmpeg.h" #include "media/base/mock_filter_host.h" @@ -60,8 +57,7 @@ class FFmpegDemuxerTest : public testing::Test { static const uint8 kVideoData[]; static const uint8* kNullData; - FFmpegDemuxerTest() - : wait_for_demuxer_(false, false) { + FFmpegDemuxerTest() { // Create an FFmpegDemuxer. factory_ = FFmpegDemuxer::CreateFilterFactory(); MediaFormat media_format; @@ -70,6 +66,9 @@ class FFmpegDemuxerTest : public testing::Test { demuxer_ = factory_->Create<FFmpegDemuxer>(media_format); DCHECK(demuxer_); + // Provide a message loop. + demuxer_->SetMessageLoop(&message_loop_); + // Prepare a filter host and data source for the demuxer. pipeline_.reset(new MockPipeline()); filter_host_.reset(new MockFilterHost<Demuxer>(pipeline_.get(), demuxer_)); @@ -113,6 +112,9 @@ class FFmpegDemuxerTest : public testing::Test { // Call Stop() to shut down internal threads. demuxer_->Stop(); + // Finish up any remaining tasks. + message_loop_.RunAllPending(); + // Reset MockFFmpeg. MockFFmpeg::set(NULL); } @@ -130,25 +132,19 @@ class FFmpegDemuxerTest : public testing::Test { void InitializeDemuxer() { InitializeDemuxerMocks(); EXPECT_TRUE(demuxer_->Initialize(data_source_.get())); + message_loop_.RunAllPending(); EXPECT_TRUE(filter_host_->WaitForInitialized()); EXPECT_TRUE(filter_host_->IsInitialized()); EXPECT_EQ(PIPELINE_OK, pipeline_->GetError()); } - // To eliminate flakiness, this method will wait for the demuxer's message - // loop to finish any currently executing and queued tasks. - void WaitForDemuxerThread() { - demuxer_->thread_.message_loop()->PostTask(FROM_HERE, - NewRunnableFunction(&FFmpegDemuxerTest::Notify, &wait_for_demuxer_)); - wait_for_demuxer_.Wait(); - } - // Fixture members. scoped_refptr<FilterFactory> factory_; scoped_refptr<FFmpegDemuxer> demuxer_; scoped_ptr<MockPipeline> pipeline_; scoped_ptr<MockFilterHost<Demuxer> > filter_host_; scoped_refptr<StrictMock<MockDataSource> > data_source_; + MessageLoop message_loop_; // FFmpeg fixtures. AVFormatContext format_context_; @@ -157,14 +153,6 @@ class FFmpegDemuxerTest : public testing::Test { MockFFmpeg mock_ffmpeg_; private: - // Used with NewRunnableFunction() -- we don't use NewRunnableMethod() since - // it would force this class to be refcounted causing double deletions. - static void Notify(base::WaitableEvent* event) { - event->Signal(); - } - - base::WaitableEvent wait_for_demuxer_; - DISALLOW_COPY_AND_ASSIGN(FFmpegDemuxerTest); }; @@ -203,6 +191,7 @@ TEST_F(FFmpegDemuxerTest, Initialize_OpenFails) { .WillOnce(Return(-1)); EXPECT_TRUE(demuxer_->Initialize(data_source_.get())); + message_loop_.RunAllPending(); EXPECT_TRUE(filter_host_->WaitForError(DEMUXER_ERROR_COULD_NOT_OPEN)); EXPECT_FALSE(filter_host_->IsInitialized()); } @@ -216,6 +205,7 @@ TEST_F(FFmpegDemuxerTest, Initialize_ParseFails) { EXPECT_CALL(*MockFFmpeg::get(), AVFree(&format_context_)); EXPECT_TRUE(demuxer_->Initialize(data_source_.get())); + message_loop_.RunAllPending(); EXPECT_TRUE(filter_host_->WaitForError(DEMUXER_ERROR_COULD_NOT_PARSE)); EXPECT_FALSE(filter_host_->IsInitialized()); } @@ -229,6 +219,7 @@ TEST_F(FFmpegDemuxerTest, Initialize_NoStreams) { format_context_.nb_streams = 0; EXPECT_TRUE(demuxer_->Initialize(data_source_.get())); + message_loop_.RunAllPending(); EXPECT_TRUE(filter_host_->WaitForError(DEMUXER_ERROR_NO_SUPPORTED_STREAMS)); EXPECT_FALSE(filter_host_->IsInitialized()); } @@ -243,6 +234,7 @@ TEST_F(FFmpegDemuxerTest, Initialize_DataStreamOnly) { format_context_.nb_streams = 1; EXPECT_TRUE(demuxer_->Initialize(data_source_.get())); + message_loop_.RunAllPending(); EXPECT_TRUE(filter_host_->WaitForError(DEMUXER_ERROR_NO_SUPPORTED_STREAMS)); EXPECT_FALSE(filter_host_->IsInitialized()); } @@ -346,7 +338,7 @@ TEST_F(FFmpegDemuxerTest, Read) { // Attempt a read from the audio stream and run the message loop until done. scoped_refptr<DemuxerStreamReader> reader(new DemuxerStreamReader()); reader->Read(audio); - EXPECT_TRUE(reader->WaitForRead()); + message_loop_.RunAllPending(); EXPECT_TRUE(reader->called()); ASSERT_TRUE(reader->buffer()); EXPECT_FALSE(reader->buffer()->IsDiscontinuous()); @@ -358,12 +350,12 @@ TEST_F(FFmpegDemuxerTest, Read) { // Manually release the last reference to the buffer. reader->Reset(); - WaitForDemuxerThread(); + message_loop_.RunAllPending(); MockFFmpeg::get()->CheckPoint(2); // Attempt a read from the video stream and run the message loop until done. reader->Read(video); - EXPECT_TRUE(reader->WaitForRead()); + message_loop_.RunAllPending(); EXPECT_TRUE(reader->called()); ASSERT_TRUE(reader->buffer()); EXPECT_FALSE(reader->buffer()->IsDiscontinuous()); @@ -375,7 +367,7 @@ TEST_F(FFmpegDemuxerTest, Read) { // Manually release the last reference to the buffer and verify it was freed. reader->Reset(); - WaitForDemuxerThread(); + message_loop_.RunAllPending(); MockFFmpeg::get()->CheckPoint(4); // We should now expect an end of stream buffer in both the audio and video @@ -383,7 +375,7 @@ TEST_F(FFmpegDemuxerTest, Read) { // Attempt a read from the audio stream and run the message loop until done. reader->Read(audio); - EXPECT_TRUE(reader->WaitForRead()); + message_loop_.RunAllPending(); EXPECT_TRUE(reader->called()); ASSERT_TRUE(reader->buffer()); EXPECT_TRUE(reader->buffer()->IsEndOfStream()); @@ -392,12 +384,12 @@ TEST_F(FFmpegDemuxerTest, Read) { // Manually release buffer, which should release any remaining AVPackets. reader->Reset(); - WaitForDemuxerThread(); + message_loop_.RunAllPending(); MockFFmpeg::get()->CheckPoint(5); // Attempt a read from the audio stream and run the message loop until done. reader->Read(video); - EXPECT_TRUE(reader->WaitForRead()); + message_loop_.RunAllPending(); EXPECT_TRUE(reader->called()); ASSERT_TRUE(reader->buffer()); EXPECT_TRUE(reader->buffer()->IsEndOfStream()); @@ -406,7 +398,7 @@ TEST_F(FFmpegDemuxerTest, Read) { // Manually release buffer, which should release any remaining AVPackets. reader->Reset(); - WaitForDemuxerThread(); + message_loop_.RunAllPending(); MockFFmpeg::get()->CheckPoint(6); } @@ -481,7 +473,7 @@ TEST_F(FFmpegDemuxerTest, Seek) { // Read a video packet and release it. scoped_refptr<DemuxerStreamReader> reader(new DemuxerStreamReader()); reader->Read(video); - EXPECT_TRUE(reader->WaitForRead()); + message_loop_.RunAllPending(); EXPECT_TRUE(reader->called()); ASSERT_TRUE(reader->buffer()); EXPECT_FALSE(reader->buffer()->IsDiscontinuous()); @@ -490,12 +482,12 @@ TEST_F(FFmpegDemuxerTest, Seek) { // Release the video packet and verify the other packets are still queued. reader->Reset(); - WaitForDemuxerThread(); + message_loop_.RunAllPending(); MockFFmpeg::get()->CheckPoint(1); // Now issue a simple forward seek, which should discard queued packets. demuxer_->Seek(base::TimeDelta::FromMicroseconds(kExpectedTimestamp)); - WaitForDemuxerThread(); + message_loop_.RunAllPending(); MockFFmpeg::get()->CheckPoint(2); // The next read from each stream should now be discontinuous, but subsequent @@ -503,7 +495,7 @@ TEST_F(FFmpegDemuxerTest, Seek) { // Audio read #1, should be discontinuous. reader->Read(audio); - EXPECT_TRUE(reader->WaitForRead()); + message_loop_.RunAllPending(); EXPECT_TRUE(reader->called()); ASSERT_TRUE(reader->buffer()); EXPECT_TRUE(reader->buffer()->IsDiscontinuous()); @@ -513,7 +505,7 @@ TEST_F(FFmpegDemuxerTest, Seek) { // Audio read #2, should not be discontinuous. reader->Reset(); reader->Read(audio); - EXPECT_TRUE(reader->WaitForRead()); + message_loop_.RunAllPending(); EXPECT_TRUE(reader->called()); ASSERT_TRUE(reader->buffer()); EXPECT_FALSE(reader->buffer()->IsDiscontinuous()); @@ -523,7 +515,7 @@ TEST_F(FFmpegDemuxerTest, Seek) { // Video read #1, should be discontinuous. reader->Reset(); reader->Read(video); - EXPECT_TRUE(reader->WaitForRead()); + message_loop_.RunAllPending(); EXPECT_TRUE(reader->called()); ASSERT_TRUE(reader->buffer()); EXPECT_TRUE(reader->buffer()->IsDiscontinuous()); @@ -533,7 +525,7 @@ TEST_F(FFmpegDemuxerTest, Seek) { // Video read #2, should not be discontinuous. reader->Reset(); reader->Read(video); - EXPECT_TRUE(reader->WaitForRead()); + message_loop_.RunAllPending(); EXPECT_TRUE(reader->called()); ASSERT_TRUE(reader->buffer()); EXPECT_FALSE(reader->buffer()->IsDiscontinuous()); @@ -542,7 +534,7 @@ TEST_F(FFmpegDemuxerTest, Seek) { // Manually release the last reference to the buffer and verify it was freed. reader->Reset(); - WaitForDemuxerThread(); + message_loop_.RunAllPending(); MockFFmpeg::get()->CheckPoint(3); } @@ -586,7 +578,7 @@ TEST_F(FFmpegDemuxerTest, MP3Hack) { // contents should match. scoped_refptr<DemuxerStreamReader> reader = new DemuxerStreamReader(); reader->Read(audio); - EXPECT_TRUE(reader->WaitForRead()); + message_loop_.RunAllPending(); EXPECT_TRUE(reader->called()); ASSERT_TRUE(reader->buffer()); EXPECT_FALSE(reader->buffer()->IsDiscontinuous()); @@ -599,7 +591,7 @@ TEST_F(FFmpegDemuxerTest, MP3Hack) { // Manually release the last reference to the buffer and verify it was freed. reader->Reset(); - WaitForDemuxerThread(); + message_loop_.RunAllPending(); MockFFmpeg::get()->CheckPoint(2); } @@ -651,6 +643,7 @@ TEST_F(FFmpegDemuxerTest, Stop) { // Attempt the read... audio->Read(callback.release()); + message_loop_.RunAllPending(); // ...and verify that |callback| was deleted. MockFFmpeg::get()->CheckPoint(1); |