summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorscherkus@chromium.org <scherkus@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2009-06-25 00:54:18 +0000
committerscherkus@chromium.org <scherkus@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2009-06-25 00:54:18 +0000
commitd0f914b1965c44d92beb46480135390b73ae3e10 (patch)
tree04d39187483252e360ee35cc94b5ff92ee685b78
parentbedb4a2725ebe735e4f8f1294b931e38dfb88ddc (diff)
downloadchromium_src-d0f914b1965c44d92beb46480135390b73ae3e10.zip
chromium_src-d0f914b1965c44d92beb46480135390b73ae3e10.tar.gz
chromium_src-d0f914b1965c44d92beb46480135390b73ae3e10.tar.bz2
Refactor media pipeline and filters to use injected message loops.
Message loops are provided via MediaFilter::SetMessageLoop(). For now FFmpegDemuxer is the only filter taking advantage of injected message loops, and its unit tests have been updated as well. TEST=FFmpegDemuxer tests should continue to run BUG=none Review URL: http://codereview.chromium.org/145014 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@19209 0039d316-1c4b-4281-b951-d872f2087c98
-rw-r--r--media/base/filters.h28
-rw-r--r--media/base/mock_reader.h14
-rw-r--r--media/base/pipeline_impl.cc115
-rw-r--r--media/base/pipeline_impl.h3
-rw-r--r--media/filters/ffmpeg_demuxer.cc167
-rw-r--r--media/filters/ffmpeg_demuxer.h41
-rw-r--r--media/filters/ffmpeg_demuxer_unittest.cc71
7 files changed, 266 insertions, 173 deletions
diff --git a/media/base/filters.h b/media/base/filters.h
index a98cd2d..7f59138 100644
--- a/media/base/filters.h
+++ b/media/base/filters.h
@@ -27,8 +27,8 @@
#include <string>
#include "base/logging.h"
+#include "base/message_loop.h"
#include "base/ref_counted.h"
-#include "base/task.h"
#include "base/time.h"
#include "media/base/media_format.h"
@@ -56,18 +56,28 @@ enum FilterType {
class MediaFilter : public base::RefCountedThreadSafe<MediaFilter> {
public:
- MediaFilter() : host_(NULL) {}
+ MediaFilter() : host_(NULL), message_loop_(NULL) {}
// Sets the protected member |host_|. This is the first method called by
// the FilterHost after a filter is created. The host holds a strong
- // reference to the filter. The refernce held by the host is guaranteed
+ // reference to the filter. The reference held by the host is guaranteed
// to be released before the host object is destroyed by the pipeline.
virtual void SetFilterHost(FilterHost* host) {
- DCHECK(NULL == host_);
- DCHECK(NULL != host);
+ DCHECK(host);
+ DCHECK(!host_);
host_ = host;
}
+ // Sets the protected member |message_loop_|, which is used by filters for
+ // processing asynchronous tasks and maintaining synchronized access to
+ // internal data members. The message loop should be running and exceed the
+ // lifetime of the filter.
+ virtual void SetMessageLoop(MessageLoop* message_loop) {
+ DCHECK(message_loop);
+ DCHECK(!message_loop_);
+ message_loop_ = message_loop;
+ }
+
// The pipeline is being stopped either as a result of an error or because
// the client called Stop().
virtual void Stop() = 0;
@@ -76,15 +86,19 @@ class MediaFilter : public base::RefCountedThreadSafe<MediaFilter> {
// method if they need to respond to this call.
virtual void SetPlaybackRate(float playback_rate) {}
- // The pipeline is being seeked to the specified time. Filters may implement
+ // The pipeline is seeking to the specified time. Filters may implement
// this method if they need to respond to this call.
virtual void Seek(base::TimeDelta time) {}
protected:
- FilterHost* host_;
+ // Only allow scoped_refptr<> to delete filters.
friend class base::RefCountedThreadSafe<MediaFilter>;
virtual ~MediaFilter() {}
+ // TODO(scherkus): make these private with public/protected accessors.
+ FilterHost* host_;
+ MessageLoop* message_loop_;
+
private:
DISALLOW_COPY_AND_ASSIGN(MediaFilter);
};
diff --git a/media/base/mock_reader.h b/media/base/mock_reader.h
index ad8db88..3640981 100644
--- a/media/base/mock_reader.h
+++ b/media/base/mock_reader.h
@@ -21,8 +21,7 @@ class MockReader :
public:
MockReader()
: called_(false),
- expecting_call_(false),
- wait_for_read_(false, false) {
+ expecting_call_(false) {
}
virtual ~MockReader() {
@@ -34,7 +33,6 @@ class MockReader :
expecting_call_ = false;
called_ = false;
buffer_ = NULL;
- wait_for_read_.Reset();
}
// Executes an asynchronous read on the given filter.
@@ -45,12 +43,6 @@ class MockReader :
filter->Read(NewCallback(this, &MockReader::OnReadComplete));
}
- // Waits 500ms for the read callback to be completed. Returns true if the
- // read was completed, false otherwise.
- bool WaitForRead() {
- return wait_for_read_.TimedWait(base::TimeDelta::FromMilliseconds(500));
- }
-
// Mock accessors.
BufferType* buffer() { return buffer_; }
bool called() { return called_; }
@@ -63,7 +55,6 @@ class MockReader :
expecting_call_ = false;
called_ = true;
buffer_ = buffer;
- wait_for_read_.Signal();
}
// Reference to the buffer provided in the callback.
@@ -75,9 +66,6 @@ class MockReader :
// Whether or not this reader was expecting a callback.
bool expecting_call_;
- // Used by tests to wait for the callback to be executed.
- base::WaitableEvent wait_for_read_;
-
DISALLOW_COPY_AND_ASSIGN(MockReader);
};
diff --git a/media/base/pipeline_impl.cc b/media/base/pipeline_impl.cc
index 49ffb98..8a43382 100644
--- a/media/base/pipeline_impl.cc
+++ b/media/base/pipeline_impl.cc
@@ -1,8 +1,12 @@
-// Copyright (c) 2006-2009 The Chromium Authors. All rights reserved.
+// Copyright (c) 2008-2009 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
+//
+// TODO(scherkus): clean up PipelineImpl... too many crazy function names,
+// potential deadlocks, nested message loops, etc...
#include "base/compiler_specific.h"
+#include "base/condition_variable.h"
#include "base/stl_util-inl.h"
#include "media/base/filter_host_impl.h"
#include "media/base/media_format.h"
@@ -10,6 +14,36 @@
namespace media {
+namespace {
+
+// Small helper function to help us transition over to injected message loops.
+//
+// TODO(scherkus): have every filter support injected message loops.
+template <class Filter>
+bool SupportsSetMessageLoop() {
+ switch (Filter::filter_type()) {
+ case FILTER_DEMUXER:
+ return true;
+ default:
+ return false;
+ }
+}
+
+// Helper function used with NewRunnableMethod to implement a (very) crude
+// blocking counter.
+//
+// TODO(scherkus): remove this as soon as Stop() is made asynchronous.
+void DecrementCounter(Lock* lock, ConditionVariable* cond_var, int* count) {
+ AutoLock auto_lock(*lock);
+ --(*count);
+ CHECK(*count >= 0);
+ if (*count == 0) {
+ cond_var->Signal();
+ }
+}
+
+} // namespace
+
PipelineImpl::PipelineImpl() {
ResetState();
}
@@ -287,6 +321,8 @@ bool PipelineThread::Start(FilterFactory* filter_factory,
// method. This method waits for that Lock to be released so that we know
// that the thread is not executing a nested message loop. This way we know
// that that Thread::Stop call will quit the appropriate message loop.
+//
+// TODO(scherkus): this can potentially deadlock, hack away our lock usage!!
void PipelineThread::Stop() {
if (thread_.IsRunning()) {
PostTask(NewRunnableMethod(this, &PipelineThread::StopTask));
@@ -294,6 +330,7 @@ void PipelineThread::Stop() {
thread_.Stop();
}
DCHECK(filter_hosts_.empty());
+ DCHECK(filter_threads_.empty());
}
// Called on client's thread.
@@ -418,16 +455,66 @@ void PipelineThread::StartTask(FilterFactory* filter_factory,
// as the result of an error condition. If there is no error, then set the
// pipeline's error_ member to PIPELINE_STOPPING. We stop the filters in the
// reverse order.
+//
+// TODO(scherkus): beware! this can get posted multiple times! it shouldn't!
void PipelineThread::StopTask() {
if (PipelineOk()) {
pipeline_->error_ = PIPELINE_STOPPING;
}
+ // Stop every filter.
for (FilterHostVector::iterator iter = filter_hosts_.begin();
iter != filter_hosts_.end();
++iter) {
(*iter)->Stop();
}
+
+ // Figure out how many threads we have to stop.
+ //
+ // TODO(scherkus): remove the workaround for the "multiple StopTask()" issue.
+ FilterThreadVector running_threads;
+ for (FilterThreadVector::iterator iter = filter_threads_.begin();
+ iter != filter_threads_.end();
+ ++iter) {
+ if ((*iter)->IsRunning()) {
+ running_threads.push_back(*iter);
+ }
+ }
+
+ // Crude blocking counter implementation.
+ Lock lock;
+ ConditionVariable wait_for_zero(&lock);
+ int count = running_threads.size();
+
+ // Post a task to every filter's thread to ensure that they've completed their
+ // stopping logic before stopping the threads themselves.
+ //
+ // TODO(scherkus): again, Stop() should either be synchronous or we should
+ // receive a signal from filters that they have indeed stopped.
+ for (FilterThreadVector::iterator iter = running_threads.begin();
+ iter != running_threads.end();
+ ++iter) {
+ (*iter)->message_loop()->PostTask(FROM_HERE,
+ NewRunnableFunction(&DecrementCounter, &lock, &wait_for_zero, &count));
+ }
+
+ // Wait on our "blocking counter".
+ {
+ AutoLock auto_lock(lock);
+ while (count > 0) {
+ wait_for_zero.Wait();
+ }
+ }
+
+ // Stop every running filter thread.
+ //
+ // TODO(scherkus): can we watchdog this section to detect wedged threads?
+ for (FilterThreadVector::iterator iter = running_threads.begin();
+ iter != running_threads.end();
+ ++iter) {
+ (*iter)->Stop();
+ }
+
if (host_initializing_) {
host_initializing_ = NULL;
message_loop()->Quit();
@@ -546,10 +633,25 @@ scoped_refptr<Filter> PipelineThread::CreateFilter(
if (NULL == host_initializing_) {
Error(PIPELINE_ERROR_OUT_OF_MEMORY);
} else {
- filter_hosts_.push_back(host_initializing_);
- filter->SetFilterHost(host_initializing_);
- if (!filter->Initialize(source)) {
- Error(PIPELINE_ERROR_INITIALIZATION_FAILED);
+ // Create a dedicated thread for this filter.
+ if (SupportsSetMessageLoop<Filter>()) {
+ scoped_ptr<base::Thread> thread(new base::Thread("FilterThread"));
+ if (!thread.get() || !thread->Start()) {
+ NOTREACHED() << "Could not start filter thread";
+ Error(PIPELINE_ERROR_INITIALIZATION_FAILED);
+ } else {
+ filter->SetMessageLoop(thread->message_loop());
+ filter_threads_.push_back(thread.release());
+ }
+ }
+
+ // Creating a thread could have failed, verify we're still OK.
+ if (PipelineOk()) {
+ filter_hosts_.push_back(host_initializing_);
+ filter->SetFilterHost(host_initializing_);
+ if (!filter->Initialize(source)) {
+ Error(PIPELINE_ERROR_INITIALIZATION_FAILED);
+ }
}
}
}
@@ -588,8 +690,11 @@ scoped_refptr<DataSource> PipelineThread::CreateDataSource(
}
// Called as a result of destruction of the thread.
+//
+// TODO(scherkus): this can block the client due to synchronous Stop() API call.
void PipelineThread::WillDestroyCurrentMessageLoop() {
STLDeleteElements(&filter_hosts_);
+ STLDeleteElements(&filter_threads_);
}
} // namespace media
diff --git a/media/base/pipeline_impl.h b/media/base/pipeline_impl.h
index ad57444..07d9c05 100644
--- a/media/base/pipeline_impl.h
+++ b/media/base/pipeline_impl.h
@@ -333,6 +333,9 @@ class PipelineThread : public base::RefCountedThreadSafe<PipelineThread>,
typedef std::vector<FilterHostImpl*> FilterHostVector;
FilterHostVector filter_hosts_;
+ typedef std::vector<base::Thread*> FilterThreadVector;
+ FilterThreadVector filter_threads_;
+
DISALLOW_COPY_AND_ASSIGN(PipelineThread);
};
diff --git a/media/filters/ffmpeg_demuxer.cc b/media/filters/ffmpeg_demuxer.cc
index 2e8a4c6..fb39f10 100644
--- a/media/filters/ffmpeg_demuxer.cc
+++ b/media/filters/ffmpeg_demuxer.cc
@@ -95,7 +95,6 @@ FFmpegDemuxerStream::FFmpegDemuxerStream(FFmpegDemuxer* demuxer,
}
FFmpegDemuxerStream::~FFmpegDemuxerStream() {
- AutoLock auto_lock(lock_);
DCHECK(stopped_);
DCHECK(read_queue_.empty());
DCHECK(buffer_queue_.empty());
@@ -111,47 +110,44 @@ void* FFmpegDemuxerStream::QueryInterface(const char* id) {
}
bool FFmpegDemuxerStream::HasPendingReads() {
- AutoLock auto_lock(lock_);
+ DCHECK_EQ(PlatformThread::CurrentId(), demuxer_->thread_id_);
+ DCHECK(!stopped_ || read_queue_.empty())
+ << "Read queue should have been emptied if demuxing stream is stopped";
return !read_queue_.empty();
}
base::TimeDelta FFmpegDemuxerStream::EnqueuePacket(AVPacket* packet) {
+ DCHECK_EQ(PlatformThread::CurrentId(), demuxer_->thread_id_);
base::TimeDelta timestamp = ConvertTimestamp(packet->pts);
base::TimeDelta duration = ConvertTimestamp(packet->duration);
+ if (stopped_) {
+ NOTREACHED() << "Attempted to enqueue packet on a stopped stream";
+ return timestamp;
+ }
+
+ // Enqueue the callback and attempt to satisfy a read immediately.
scoped_refptr<Buffer> buffer =
new AVPacketBuffer(packet, timestamp, duration);
- DCHECK(buffer);
- {
- AutoLock auto_lock(lock_);
- if (stopped_) {
- NOTREACHED() << "Attempted to enqueue packet on a stopped stream.";
- return timestamp;
- }
- buffer_queue_.push_back(buffer);
+ if (!buffer) {
+ NOTREACHED() << "Unable to allocate AVPacketBuffer";
+ return timestamp;
}
- FulfillPendingReads();
+ buffer_queue_.push_back(buffer);
+ FulfillPendingRead();
return timestamp;
}
void FFmpegDemuxerStream::FlushBuffers() {
- AutoLock auto_lock(lock_);
+ DCHECK_EQ(PlatformThread::CurrentId(), demuxer_->thread_id_);
buffer_queue_.clear();
discontinuous_ = true;
}
void FFmpegDemuxerStream::Stop() {
- // Since |buffer_queue_| can be very large, we swap queues and delete outside
- // of the lock.
- BufferQueue tmp_buffer_queue;
- ReadQueue tmp_read_queue;
- {
- AutoLock auto_lock(lock_);
- buffer_queue_.swap(tmp_buffer_queue);
- read_queue_.swap(tmp_read_queue);
- stopped_ = true;
- }
- tmp_buffer_queue.clear();
- STLDeleteElements(&tmp_read_queue);
+ DCHECK_EQ(PlatformThread::CurrentId(), demuxer_->thread_id_);
+ buffer_queue_.clear();
+ STLDeleteElements(&read_queue_);
+ stopped_ = true;
}
const MediaFormat& FFmpegDemuxerStream::media_format() {
@@ -160,59 +156,53 @@ const MediaFormat& FFmpegDemuxerStream::media_format() {
void FFmpegDemuxerStream::Read(Callback1<Buffer*>::Type* read_callback) {
DCHECK(read_callback);
- {
- AutoLock auto_lock(lock_);
- // Don't accept any additional reads if we've been told to stop.
- //
- // TODO(scherkus): it would be cleaner if we replied with an error message.
- if (stopped_) {
- delete read_callback;
- return;
- }
+ demuxer_->message_loop_->PostTask(FROM_HERE,
+ NewRunnableMethod(this, &FFmpegDemuxerStream::ReadTask, read_callback));
+}
+
+void FFmpegDemuxerStream::ReadTask(Callback1<Buffer*>::Type* read_callback) {
+ DCHECK_EQ(PlatformThread::CurrentId(), demuxer_->thread_id_);
- // Otherwise enqueue the request.
- read_queue_.push_back(read_callback);
+ // Don't accept any additional reads if we've been told to stop.
+ //
+ // TODO(scherkus): it would be cleaner if we replied with an error message.
+ if (stopped_) {
+ delete read_callback;
+ return;
}
- // See if we can immediately fulfill this read and whether we need to demux.
- bool post_demux_task = FulfillPendingReads();
+ // Enqueue the callback and attempt to satisfy it immediately.
+ read_queue_.push_back(read_callback);
+ FulfillPendingRead();
- // Since Read() may be executed on any thread, it's possible that StopTask()
- // finishes executing on the demuxer thread and the message loop goes away.
- //
- // To prevent that we'll grab the lock and post a task at the same time, which
- // will keep the message loop alive.
- AutoLock auto_lock(lock_);
- if (post_demux_task && !stopped_) {
+ // There are still pending reads, demux some more.
+ if (HasPendingReads()) {
demuxer_->PostDemuxTask();
}
}
-bool FFmpegDemuxerStream::FulfillPendingReads() {
- bool pending_reads = false;
- while (true) {
- scoped_refptr<Buffer> buffer;
- scoped_ptr<Callback1<Buffer*>::Type> read_callback;
- {
- AutoLock auto_lock(lock_);
- pending_reads = !read_queue_.empty();
- if (buffer_queue_.empty() || read_queue_.empty()) {
- break;
- }
- buffer = buffer_queue_.front();
- read_callback.reset(read_queue_.front());
- buffer_queue_.pop_front();
- read_queue_.pop_front();
-
- // Handle discontinuities due to FlushBuffers() being called.
- if (discontinuous_) {
- buffer->SetDiscontinuous(true);
- discontinuous_ = false;
- }
- }
- read_callback->Run(buffer);
+void FFmpegDemuxerStream::FulfillPendingRead() {
+ DCHECK_EQ(PlatformThread::CurrentId(), demuxer_->thread_id_);
+ if (buffer_queue_.empty() || read_queue_.empty()) {
+ return;
}
- return pending_reads;
+
+ // Dequeue a buffer and pending read pair.
+ scoped_refptr<Buffer> buffer = buffer_queue_.front();
+ scoped_ptr<Callback1<Buffer*>::Type> read_callback(read_queue_.front());
+ buffer_queue_.pop_front();
+ read_queue_.pop_front();
+
+ // Handle discontinuities due to FlushBuffers() being called.
+ //
+ // TODO(scherkus): get rid of |discontinuous_| and use buffer flags.
+ if (discontinuous_) {
+ buffer->SetDiscontinuous(true);
+ discontinuous_ = false;
+ }
+
+ // Execute the callback.
+ read_callback->Run(buffer);
}
base::TimeDelta FFmpegDemuxerStream::ConvertTimestamp(int64 timestamp) {
@@ -225,11 +215,10 @@ base::TimeDelta FFmpegDemuxerStream::ConvertTimestamp(int64 timestamp) {
// FFmpegDemuxer
//
FFmpegDemuxer::FFmpegDemuxer()
- : thread_("DemuxerThread") {
+ : thread_id_(NULL) {
}
FFmpegDemuxer::~FFmpegDemuxer() {
- DCHECK(!thread_.IsRunning());
DCHECK(!format_context_.get());
// TODO(scherkus): I believe we need to use av_close_input_file() here
// instead of scoped_ptr_malloc calling av_free().
@@ -239,18 +228,14 @@ FFmpegDemuxer::~FFmpegDemuxer() {
}
void FFmpegDemuxer::PostDemuxTask() {
- thread_.message_loop()->PostTask(FROM_HERE,
+ message_loop_->PostTask(FROM_HERE,
NewRunnableMethod(this, &FFmpegDemuxer::DemuxTask));
}
void FFmpegDemuxer::Stop() {
- // Stop our thread and post a task notifying the streams to stop as well.
- if (thread_.IsRunning()) {
- thread_.message_loop()->PostTask(FROM_HERE,
- NewRunnableMethod(this, &FFmpegDemuxer::StopTask));
- thread_.Stop();
- }
- format_context_.reset();
+ // Post a task to notify the streams to stop as well.
+ message_loop_->PostTask(FROM_HERE,
+ NewRunnableMethod(this, &FFmpegDemuxer::StopTask));
}
void FFmpegDemuxer::Seek(base::TimeDelta time) {
@@ -258,18 +243,12 @@ void FFmpegDemuxer::Seek(base::TimeDelta time) {
// operation is completed and filters behind the demuxer is good to issue
// more reads, but we are posting a task here, which makes the seek operation
// asynchronous, should change how seek works to make it fully asynchronous.
- thread_.message_loop()->PostTask(FROM_HERE,
+ message_loop_->PostTask(FROM_HERE,
NewRunnableMethod(this, &FFmpegDemuxer::SeekTask, time));
}
bool FFmpegDemuxer::Initialize(DataSource* data_source) {
- // Start our internal demuxing thread.
- if (!thread_.Start()) {
- host_->Error(DEMUXER_ERROR_COULD_NOT_CREATE_THREAD);
- return false;
- }
-
- thread_.message_loop()->PostTask(FROM_HERE,
+ message_loop_->PostTask(FROM_HERE,
NewRunnableMethod(this, &FFmpegDemuxer::InititalizeTask, data_source));
return true;
}
@@ -294,6 +273,10 @@ void FFmpegDemuxer::InititalizeTask(DataSource* data_source) {
//
// Refer to media/filters/ffmpeg_glue.h for details.
+ // Grab the thread id for debugging.
+ DCHECK(!thread_id_);
+ thread_id_ = PlatformThread::CurrentId();
+
// Add our data source and get our unique key.
std::string key = FFmpegGlue::get()->AddDataSource(data_source);
@@ -353,6 +336,8 @@ void FFmpegDemuxer::InititalizeTask(DataSource* data_source) {
}
void FFmpegDemuxer::SeekTask(base::TimeDelta time) {
+ DCHECK_EQ(PlatformThread::CurrentId(), thread_id_);
+
// Tell streams to flush buffers due to seeking.
StreamVector::iterator iter;
for (iter = streams_.begin(); iter != streams_.end(); ++iter) {
@@ -373,6 +358,8 @@ void FFmpegDemuxer::SeekTask(base::TimeDelta time) {
}
void FFmpegDemuxer::DemuxTask() {
+ DCHECK_EQ(PlatformThread::CurrentId(), thread_id_);
+
// Make sure we have work to do before demuxing.
if (!StreamsHavePendingReads()) {
return;
@@ -428,14 +415,18 @@ void FFmpegDemuxer::DemuxTask() {
}
void FFmpegDemuxer::StopTask() {
+ DCHECK_EQ(PlatformThread::CurrentId(), thread_id_);
StreamVector::iterator iter;
for (iter = streams_.begin(); iter != streams_.end(); ++iter) {
(*iter)->Stop();
}
+
+ // Free our AVFormatContext.
+ format_context_.reset();
}
bool FFmpegDemuxer::StreamsHavePendingReads() {
- DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id());
+ DCHECK_EQ(PlatformThread::CurrentId(), thread_id_);
StreamVector::iterator iter;
for (iter = streams_.begin(); iter != streams_.end(); ++iter) {
if ((*iter)->HasPendingReads()) {
@@ -446,7 +437,7 @@ bool FFmpegDemuxer::StreamsHavePendingReads() {
}
void FFmpegDemuxer::StreamHasEnded() {
- DCHECK_EQ(PlatformThread::CurrentId(), thread_.thread_id());
+ DCHECK_EQ(PlatformThread::CurrentId(), thread_id_);
StreamVector::iterator iter;
for (iter = streams_.begin(); iter != streams_.end(); ++iter) {
AVPacket* packet = new AVPacket();
diff --git a/media/filters/ffmpeg_demuxer.h b/media/filters/ffmpeg_demuxer.h
index 2ffbf50..bb5c285 100644
--- a/media/filters/ffmpeg_demuxer.h
+++ b/media/filters/ffmpeg_demuxer.h
@@ -16,8 +16,8 @@
// excessive memory consumption.
//
// When stopped, FFmpegDemuxer and FFmpegDemuxerStream release all callbacks
-// and buffered packets and shuts down its internal thread. Reads from a
-// stopped FFmpegDemuxerStream will not be replied to.
+// and buffered packets. Reads from a stopped FFmpegDemuxerStream will not be
+// replied to.
#ifndef MEDIA_FILTERS_FFMPEG_DEMUXER_H_
#define MEDIA_FILTERS_FFMPEG_DEMUXER_H_
@@ -25,9 +25,6 @@
#include <deque>
#include <vector>
-#include "base/lock.h"
-#include "base/thread.h"
-#include "base/waitable_event.h"
#include "media/base/buffers.h"
#include "media/base/factory.h"
#include "media/base/filters.h"
@@ -59,20 +56,20 @@ class FFmpegDemuxerStream : public DemuxerStream, public AVStreamProvider {
// Returns true is this stream has pending reads, false otherwise.
//
// Safe to call on any thread.
- bool HasPendingReads();
+ virtual bool HasPendingReads();
// Enqueues and takes ownership over the given AVPacket, returns the timestamp
// of the enqueued packet.
- base::TimeDelta EnqueuePacket(AVPacket* packet);
+ virtual base::TimeDelta EnqueuePacket(AVPacket* packet);
// Signals to empty the buffer queue and mark next packet as discontinuous.
- void FlushBuffers();
+ virtual void FlushBuffers();
// Empties the queues and ignores any additional calls to Read().
- void Stop();
+ virtual void Stop();
// Returns the duration of this stream.
- base::TimeDelta duration() { return duration_; }
+ virtual base::TimeDelta duration() { return duration_; }
// DemuxerStream implementation.
virtual const MediaFormat& media_format();
@@ -85,8 +82,12 @@ class FFmpegDemuxerStream : public DemuxerStream, public AVStreamProvider {
virtual void* QueryInterface(const char* interface_id);
private:
- // Returns true if there are still pending reads.
- bool FulfillPendingReads();
+ // Carries out enqueuing a pending read on the demuxer thread.
+ void ReadTask(Callback1<Buffer*>::Type* read_callback);
+
+ // Attempts to fulfill a single pending read by dequeueing a buffer and read
+ // callback pair and executing the callback.
+ void FulfillPendingRead();
// Converts an FFmpeg stream timestamp into a base::TimeDelta.
base::TimeDelta ConvertTimestamp(int64 timestamp);
@@ -98,9 +99,7 @@ class FFmpegDemuxerStream : public DemuxerStream, public AVStreamProvider {
bool discontinuous_;
bool stopped_;
- Lock lock_;
-
- typedef std::deque< scoped_refptr<Buffer> > BufferQueue;
+ typedef std::deque<scoped_refptr<Buffer> > BufferQueue;
BufferQueue buffer_queue_;
typedef std::deque<Callback1<Buffer*>::Type*> ReadQueue;
@@ -116,8 +115,8 @@ class FFmpegDemuxer : public Demuxer {
return new FilterFactoryImpl0<FFmpegDemuxer>();
}
- // Called by FFmpegDemuxerStreams to post a demuxing task.
- void PostDemuxTask();
+ // Posts a task to perform additional demuxing.
+ virtual void PostDemuxTask();
// MediaFilter implementation.
virtual void Stop();
@@ -129,8 +128,8 @@ class FFmpegDemuxer : public Demuxer {
virtual scoped_refptr<DemuxerStream> GetStream(int stream_id);
private:
- // Accesses |thread_| to create a helper method used by every test.
- friend class FFmpegDemuxerTest;
+ // Allow FFmpegDemuxerStream to post tasks to our message loop.
+ friend class FFmpegDemuxerStream;
// Only allow a factory to create this class.
friend class FilterFactoryImpl0<FFmpegDemuxer>;
@@ -182,8 +181,8 @@ class FFmpegDemuxer : public Demuxer {
StreamVector streams_;
StreamVector packet_streams_;
- // Thread handle.
- base::Thread thread_;
+ // Used for debugging.
+ PlatformThreadId thread_id_;
DISALLOW_COPY_AND_ASSIGN(FFmpegDemuxer);
};
diff --git a/media/filters/ffmpeg_demuxer_unittest.cc b/media/filters/ffmpeg_demuxer_unittest.cc
index faa08ef..cdacf12 100644
--- a/media/filters/ffmpeg_demuxer_unittest.cc
+++ b/media/filters/ffmpeg_demuxer_unittest.cc
@@ -4,9 +4,6 @@
#include <deque>
-#include "base/singleton.h"
-#include "base/tuple.h"
-#include "media/base/filter_host.h"
#include "media/base/filters.h"
#include "media/base/mock_ffmpeg.h"
#include "media/base/mock_filter_host.h"
@@ -60,8 +57,7 @@ class FFmpegDemuxerTest : public testing::Test {
static const uint8 kVideoData[];
static const uint8* kNullData;
- FFmpegDemuxerTest()
- : wait_for_demuxer_(false, false) {
+ FFmpegDemuxerTest() {
// Create an FFmpegDemuxer.
factory_ = FFmpegDemuxer::CreateFilterFactory();
MediaFormat media_format;
@@ -70,6 +66,9 @@ class FFmpegDemuxerTest : public testing::Test {
demuxer_ = factory_->Create<FFmpegDemuxer>(media_format);
DCHECK(demuxer_);
+ // Provide a message loop.
+ demuxer_->SetMessageLoop(&message_loop_);
+
// Prepare a filter host and data source for the demuxer.
pipeline_.reset(new MockPipeline());
filter_host_.reset(new MockFilterHost<Demuxer>(pipeline_.get(), demuxer_));
@@ -113,6 +112,9 @@ class FFmpegDemuxerTest : public testing::Test {
// Call Stop() to shut down internal threads.
demuxer_->Stop();
+ // Finish up any remaining tasks.
+ message_loop_.RunAllPending();
+
// Reset MockFFmpeg.
MockFFmpeg::set(NULL);
}
@@ -130,25 +132,19 @@ class FFmpegDemuxerTest : public testing::Test {
void InitializeDemuxer() {
InitializeDemuxerMocks();
EXPECT_TRUE(demuxer_->Initialize(data_source_.get()));
+ message_loop_.RunAllPending();
EXPECT_TRUE(filter_host_->WaitForInitialized());
EXPECT_TRUE(filter_host_->IsInitialized());
EXPECT_EQ(PIPELINE_OK, pipeline_->GetError());
}
- // To eliminate flakiness, this method will wait for the demuxer's message
- // loop to finish any currently executing and queued tasks.
- void WaitForDemuxerThread() {
- demuxer_->thread_.message_loop()->PostTask(FROM_HERE,
- NewRunnableFunction(&FFmpegDemuxerTest::Notify, &wait_for_demuxer_));
- wait_for_demuxer_.Wait();
- }
-
// Fixture members.
scoped_refptr<FilterFactory> factory_;
scoped_refptr<FFmpegDemuxer> demuxer_;
scoped_ptr<MockPipeline> pipeline_;
scoped_ptr<MockFilterHost<Demuxer> > filter_host_;
scoped_refptr<StrictMock<MockDataSource> > data_source_;
+ MessageLoop message_loop_;
// FFmpeg fixtures.
AVFormatContext format_context_;
@@ -157,14 +153,6 @@ class FFmpegDemuxerTest : public testing::Test {
MockFFmpeg mock_ffmpeg_;
private:
- // Used with NewRunnableFunction() -- we don't use NewRunnableMethod() since
- // it would force this class to be refcounted causing double deletions.
- static void Notify(base::WaitableEvent* event) {
- event->Signal();
- }
-
- base::WaitableEvent wait_for_demuxer_;
-
DISALLOW_COPY_AND_ASSIGN(FFmpegDemuxerTest);
};
@@ -203,6 +191,7 @@ TEST_F(FFmpegDemuxerTest, Initialize_OpenFails) {
.WillOnce(Return(-1));
EXPECT_TRUE(demuxer_->Initialize(data_source_.get()));
+ message_loop_.RunAllPending();
EXPECT_TRUE(filter_host_->WaitForError(DEMUXER_ERROR_COULD_NOT_OPEN));
EXPECT_FALSE(filter_host_->IsInitialized());
}
@@ -216,6 +205,7 @@ TEST_F(FFmpegDemuxerTest, Initialize_ParseFails) {
EXPECT_CALL(*MockFFmpeg::get(), AVFree(&format_context_));
EXPECT_TRUE(demuxer_->Initialize(data_source_.get()));
+ message_loop_.RunAllPending();
EXPECT_TRUE(filter_host_->WaitForError(DEMUXER_ERROR_COULD_NOT_PARSE));
EXPECT_FALSE(filter_host_->IsInitialized());
}
@@ -229,6 +219,7 @@ TEST_F(FFmpegDemuxerTest, Initialize_NoStreams) {
format_context_.nb_streams = 0;
EXPECT_TRUE(demuxer_->Initialize(data_source_.get()));
+ message_loop_.RunAllPending();
EXPECT_TRUE(filter_host_->WaitForError(DEMUXER_ERROR_NO_SUPPORTED_STREAMS));
EXPECT_FALSE(filter_host_->IsInitialized());
}
@@ -243,6 +234,7 @@ TEST_F(FFmpegDemuxerTest, Initialize_DataStreamOnly) {
format_context_.nb_streams = 1;
EXPECT_TRUE(demuxer_->Initialize(data_source_.get()));
+ message_loop_.RunAllPending();
EXPECT_TRUE(filter_host_->WaitForError(DEMUXER_ERROR_NO_SUPPORTED_STREAMS));
EXPECT_FALSE(filter_host_->IsInitialized());
}
@@ -346,7 +338,7 @@ TEST_F(FFmpegDemuxerTest, Read) {
// Attempt a read from the audio stream and run the message loop until done.
scoped_refptr<DemuxerStreamReader> reader(new DemuxerStreamReader());
reader->Read(audio);
- EXPECT_TRUE(reader->WaitForRead());
+ message_loop_.RunAllPending();
EXPECT_TRUE(reader->called());
ASSERT_TRUE(reader->buffer());
EXPECT_FALSE(reader->buffer()->IsDiscontinuous());
@@ -358,12 +350,12 @@ TEST_F(FFmpegDemuxerTest, Read) {
// Manually release the last reference to the buffer.
reader->Reset();
- WaitForDemuxerThread();
+ message_loop_.RunAllPending();
MockFFmpeg::get()->CheckPoint(2);
// Attempt a read from the video stream and run the message loop until done.
reader->Read(video);
- EXPECT_TRUE(reader->WaitForRead());
+ message_loop_.RunAllPending();
EXPECT_TRUE(reader->called());
ASSERT_TRUE(reader->buffer());
EXPECT_FALSE(reader->buffer()->IsDiscontinuous());
@@ -375,7 +367,7 @@ TEST_F(FFmpegDemuxerTest, Read) {
// Manually release the last reference to the buffer and verify it was freed.
reader->Reset();
- WaitForDemuxerThread();
+ message_loop_.RunAllPending();
MockFFmpeg::get()->CheckPoint(4);
// We should now expect an end of stream buffer in both the audio and video
@@ -383,7 +375,7 @@ TEST_F(FFmpegDemuxerTest, Read) {
// Attempt a read from the audio stream and run the message loop until done.
reader->Read(audio);
- EXPECT_TRUE(reader->WaitForRead());
+ message_loop_.RunAllPending();
EXPECT_TRUE(reader->called());
ASSERT_TRUE(reader->buffer());
EXPECT_TRUE(reader->buffer()->IsEndOfStream());
@@ -392,12 +384,12 @@ TEST_F(FFmpegDemuxerTest, Read) {
// Manually release buffer, which should release any remaining AVPackets.
reader->Reset();
- WaitForDemuxerThread();
+ message_loop_.RunAllPending();
MockFFmpeg::get()->CheckPoint(5);
// Attempt a read from the audio stream and run the message loop until done.
reader->Read(video);
- EXPECT_TRUE(reader->WaitForRead());
+ message_loop_.RunAllPending();
EXPECT_TRUE(reader->called());
ASSERT_TRUE(reader->buffer());
EXPECT_TRUE(reader->buffer()->IsEndOfStream());
@@ -406,7 +398,7 @@ TEST_F(FFmpegDemuxerTest, Read) {
// Manually release buffer, which should release any remaining AVPackets.
reader->Reset();
- WaitForDemuxerThread();
+ message_loop_.RunAllPending();
MockFFmpeg::get()->CheckPoint(6);
}
@@ -481,7 +473,7 @@ TEST_F(FFmpegDemuxerTest, Seek) {
// Read a video packet and release it.
scoped_refptr<DemuxerStreamReader> reader(new DemuxerStreamReader());
reader->Read(video);
- EXPECT_TRUE(reader->WaitForRead());
+ message_loop_.RunAllPending();
EXPECT_TRUE(reader->called());
ASSERT_TRUE(reader->buffer());
EXPECT_FALSE(reader->buffer()->IsDiscontinuous());
@@ -490,12 +482,12 @@ TEST_F(FFmpegDemuxerTest, Seek) {
// Release the video packet and verify the other packets are still queued.
reader->Reset();
- WaitForDemuxerThread();
+ message_loop_.RunAllPending();
MockFFmpeg::get()->CheckPoint(1);
// Now issue a simple forward seek, which should discard queued packets.
demuxer_->Seek(base::TimeDelta::FromMicroseconds(kExpectedTimestamp));
- WaitForDemuxerThread();
+ message_loop_.RunAllPending();
MockFFmpeg::get()->CheckPoint(2);
// The next read from each stream should now be discontinuous, but subsequent
@@ -503,7 +495,7 @@ TEST_F(FFmpegDemuxerTest, Seek) {
// Audio read #1, should be discontinuous.
reader->Read(audio);
- EXPECT_TRUE(reader->WaitForRead());
+ message_loop_.RunAllPending();
EXPECT_TRUE(reader->called());
ASSERT_TRUE(reader->buffer());
EXPECT_TRUE(reader->buffer()->IsDiscontinuous());
@@ -513,7 +505,7 @@ TEST_F(FFmpegDemuxerTest, Seek) {
// Audio read #2, should not be discontinuous.
reader->Reset();
reader->Read(audio);
- EXPECT_TRUE(reader->WaitForRead());
+ message_loop_.RunAllPending();
EXPECT_TRUE(reader->called());
ASSERT_TRUE(reader->buffer());
EXPECT_FALSE(reader->buffer()->IsDiscontinuous());
@@ -523,7 +515,7 @@ TEST_F(FFmpegDemuxerTest, Seek) {
// Video read #1, should be discontinuous.
reader->Reset();
reader->Read(video);
- EXPECT_TRUE(reader->WaitForRead());
+ message_loop_.RunAllPending();
EXPECT_TRUE(reader->called());
ASSERT_TRUE(reader->buffer());
EXPECT_TRUE(reader->buffer()->IsDiscontinuous());
@@ -533,7 +525,7 @@ TEST_F(FFmpegDemuxerTest, Seek) {
// Video read #2, should not be discontinuous.
reader->Reset();
reader->Read(video);
- EXPECT_TRUE(reader->WaitForRead());
+ message_loop_.RunAllPending();
EXPECT_TRUE(reader->called());
ASSERT_TRUE(reader->buffer());
EXPECT_FALSE(reader->buffer()->IsDiscontinuous());
@@ -542,7 +534,7 @@ TEST_F(FFmpegDemuxerTest, Seek) {
// Manually release the last reference to the buffer and verify it was freed.
reader->Reset();
- WaitForDemuxerThread();
+ message_loop_.RunAllPending();
MockFFmpeg::get()->CheckPoint(3);
}
@@ -586,7 +578,7 @@ TEST_F(FFmpegDemuxerTest, MP3Hack) {
// contents should match.
scoped_refptr<DemuxerStreamReader> reader = new DemuxerStreamReader();
reader->Read(audio);
- EXPECT_TRUE(reader->WaitForRead());
+ message_loop_.RunAllPending();
EXPECT_TRUE(reader->called());
ASSERT_TRUE(reader->buffer());
EXPECT_FALSE(reader->buffer()->IsDiscontinuous());
@@ -599,7 +591,7 @@ TEST_F(FFmpegDemuxerTest, MP3Hack) {
// Manually release the last reference to the buffer and verify it was freed.
reader->Reset();
- WaitForDemuxerThread();
+ message_loop_.RunAllPending();
MockFFmpeg::get()->CheckPoint(2);
}
@@ -651,6 +643,7 @@ TEST_F(FFmpegDemuxerTest, Stop) {
// Attempt the read...
audio->Read(callback.release());
+ message_loop_.RunAllPending();
// ...and verify that |callback| was deleted.
MockFFmpeg::get()->CheckPoint(1);