summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorscherkus@chromium.org <scherkus@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2009-03-31 01:18:38 +0000
committerscherkus@chromium.org <scherkus@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2009-03-31 01:18:38 +0000
commit8ed85f1f705ad8cf647a1e4cb17f00749b370c4e (patch)
tree8ac3c62fbd44396dcc90ccbb012131ab02ea64c9
parent245b3cd59ab5f548431b73ddb71a3e9a38a58d95 (diff)
downloadchromium_src-8ed85f1f705ad8cf647a1e4cb17f00749b370c4e.zip
chromium_src-8ed85f1f705ad8cf647a1e4cb17f00749b370c4e.tar.gz
chromium_src-8ed85f1f705ad8cf647a1e4cb17f00749b370c4e.tar.bz2
FFmpegDemuxer now uses a thread to handle demuxing.
Since FFmpegDemuxer no longer uses the pipeline thread to demuxer, I had to add a WaitableEvent to TestReader to make sure the read completed (or not!). Downside is that the test for when a read is never completed now waits 500ms before timing out. Oh well :( Review URL: http://codereview.chromium.org/55047 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@12831 0039d316-1c4b-4281-b951-d872f2087c98
-rw-r--r--media/base/pipeline.h1
-rw-r--r--media/filters/ffmpeg_demuxer.cc75
-rw-r--r--media/filters/ffmpeg_demuxer.h30
-rw-r--r--media/filters/ffmpeg_demuxer_unittest.cc21
4 files changed, 84 insertions, 43 deletions
diff --git a/media/base/pipeline.h b/media/base/pipeline.h
index f5b91ff..3b18b81 100644
--- a/media/base/pipeline.h
+++ b/media/base/pipeline.h
@@ -40,6 +40,7 @@ enum PipelineError {
DEMUXER_ERROR_COULD_NOT_OPEN,
DEMUXER_ERROR_COULD_NOT_PARSE,
DEMUXER_ERROR_NO_SUPPORTED_STREAMS,
+ DEMUXER_ERROR_COULD_NOT_CREATE_THREAD,
};
// Base class for Pipeline class which allows for read-only access to members.
diff --git a/media/filters/ffmpeg_demuxer.cc b/media/filters/ffmpeg_demuxer.cc
index 5208ed2..c0af41a 100644
--- a/media/filters/ffmpeg_demuxer.cc
+++ b/media/filters/ffmpeg_demuxer.cc
@@ -139,7 +139,7 @@ void FFmpegDemuxerStream::Read(Callback1<Buffer*>::Type* read_callback) {
read_queue_.push_back(read_callback);
}
if (FulfillPendingReads()) {
- demuxer_->ScheduleDemux();
+ demuxer_->SignalDemux();
}
}
@@ -169,21 +169,25 @@ bool FFmpegDemuxerStream::FulfillPendingReads() {
// FFmpegDemuxer
//
FFmpegDemuxer::FFmpegDemuxer()
- : demuxing_(false),
- format_context_(NULL) {
+ : format_context_(NULL),
+ thread_(NULL),
+ wait_for_demux_(false, false),
+ shutdown_(false) {
}
FFmpegDemuxer::~FFmpegDemuxer() {
+ if (thread_) {
+ shutdown_ = true;
+ SignalDemux();
+ PlatformThread::Join(thread_);
+ }
if (format_context_) {
av_free(format_context_);
}
}
-void FFmpegDemuxer::ScheduleDemux() {
- if (!demuxing_) {
- demuxing_ = true;
- host_->PostTask(NewRunnableMethod(this, &FFmpegDemuxer::Demux));
- }
+void FFmpegDemuxer::SignalDemux() {
+ wait_for_demux_.Signal();
}
void FFmpegDemuxer::Stop() {
@@ -241,8 +245,13 @@ bool FFmpegDemuxer::Initialize(DataSource* data_source) {
return false;
}
- // We have at least one supported stream, set the duration and notify we're
- // done initializing.
+ // We have some streams to demux so create our thread.
+ if (!PlatformThread::Create(0, this, &thread_)) {
+ host_->Error(DEMUXER_ERROR_COULD_NOT_CREATE_THREAD);
+ return false;
+ }
+
+ // Good to go: set the duration and notify we're done initializing.
host_->SetDuration(max_duration);
host_->InitializationComplete();
return true;
@@ -258,30 +267,34 @@ scoped_refptr<DemuxerStream> FFmpegDemuxer::GetStream(int stream) {
return streams_[stream].get();
}
-void FFmpegDemuxer::Demux() {
- DCHECK(demuxing_);
-
- // Loop until we've satisfied every stream.
- while (StreamsHavePendingReads()) {
- // Allocate and read an AVPacket from the media.
- scoped_ptr<AVPacket> packet(new AVPacket());
- int result = av_read_frame(format_context_, packet.get());
- if (result < 0) {
- // TODO(scherkus): handle end of stream by marking Buffer with the end of
- // stream flag.
- NOTIMPLEMENTED();
- break;
+void FFmpegDemuxer::ThreadMain() {
+ PlatformThread::SetName("DemuxerThread");
+ while (!shutdown_) {
+ // Loop until we've satisfied every stream.
+ while (StreamsHavePendingReads()) {
+ // Allocate and read an AVPacket from the media.
+ scoped_ptr<AVPacket> packet(new AVPacket());
+ int result = av_read_frame(format_context_, packet.get());
+ if (result < 0) {
+ // TODO(scherkus): handle end of stream by marking Buffer with the end
+ // of stream flag.
+ NOTIMPLEMENTED();
+ break;
+ }
+
+ // Queue the packet with the appropriate stream.
+ // TODO(scherkus): should we post this back to the pipeline thread? I'm
+ // worried about downstream filters (i.e., decoders) executing on this
+ // thread.
+ DCHECK(packet->stream_index >= 0);
+ DCHECK(packet->stream_index < static_cast<int>(streams_.size()));
+ FFmpegDemuxerStream* demuxer_stream = streams_[packet->stream_index];
+ demuxer_stream->EnqueuePacket(packet.release());
}
- // Queue the packet with the appropriate stream.
- DCHECK(packet->stream_index >= 0);
- DCHECK(packet->stream_index < static_cast<int>(streams_.size()));
- FFmpegDemuxerStream* demuxer_stream = streams_[packet->stream_index];
- demuxer_stream->EnqueuePacket(packet.release());
+ // Wait until we're signaled to either shutdown or satisfy more reads.
+ wait_for_demux_.Wait();
}
-
- // Finished demuxing.
- demuxing_ = false;
}
bool FFmpegDemuxer::StreamsHavePendingReads() {
diff --git a/media/filters/ffmpeg_demuxer.h b/media/filters/ffmpeg_demuxer.h
index bf03c2e..bcccf55 100644
--- a/media/filters/ffmpeg_demuxer.h
+++ b/media/filters/ffmpeg_demuxer.h
@@ -22,6 +22,8 @@
#include <vector>
#include "base/lock.h"
+#include "base/thread.h"
+#include "base/waitable_event.h"
#include "media/base/buffers.h"
#include "media/base/factory.h"
#include "media/base/filters.h"
@@ -48,6 +50,8 @@ class FFmpegDemuxerStream : public DemuxerStream {
virtual ~FFmpegDemuxerStream();
// Returns true is this stream has pending reads, false otherwise.
+ //
+ // Safe to call on any thread.
bool HasPendingReads();
// Enqueues and takes ownership over the given AVPacket.
@@ -89,15 +93,15 @@ class FFmpegDemuxerStream : public DemuxerStream {
DISALLOW_COPY_AND_ASSIGN(FFmpegDemuxerStream);
};
-class FFmpegDemuxer : public Demuxer {
+class FFmpegDemuxer : public Demuxer, public PlatformThread::Delegate {
public:
// FilterFactory provider.
static FilterFactory* CreateFilterFactory() {
return new FilterFactoryImpl0<FFmpegDemuxer>();
}
- // Called by FFmpegDemuxerStreams to schedule a Demux() task.
- void ScheduleDemux();
+ // Called by FFmpegDemuxerStreams to signal the demux event.
+ void SignalDemux();
// MediaFilter implementation.
virtual void Stop();
@@ -107,21 +111,20 @@ class FFmpegDemuxer : public Demuxer {
virtual size_t GetNumberOfStreams();
virtual scoped_refptr<DemuxerStream> GetStream(int stream_id);
+ // PlatformThread::Delegate implementation.
+ virtual void ThreadMain();
+
private:
// Only allow a factory to create this class.
friend class FilterFactoryImpl0<FFmpegDemuxer>;
FFmpegDemuxer();
virtual ~FFmpegDemuxer();
- // Demuxing task scheduled by streams.
- void Demux();
-
// Returns true if any of the streams have pending reads.
+ //
+ // Safe to call on any thread.
bool StreamsHavePendingReads();
- // Flag to prevent multiple Demux() tasks from being scheduled.
- bool demuxing_;
-
// FFmpeg context handle.
AVFormatContext* format_context_;
@@ -129,6 +132,15 @@ class FFmpegDemuxer : public Demuxer {
typedef std::vector< scoped_refptr<FFmpegDemuxerStream> > StreamVector;
StreamVector streams_;
+ // Thread handle.
+ PlatformThreadHandle thread_;
+
+ // Event to signal demux.
+ base::WaitableEvent wait_for_demux_;
+
+ // Used to signal |thread_| to terminate.
+ bool shutdown_;
+
DISALLOW_COPY_AND_ASSIGN(FFmpegDemuxer);
};
diff --git a/media/filters/ffmpeg_demuxer_unittest.cc b/media/filters/ffmpeg_demuxer_unittest.cc
index 21c0c45..e5da633 100644
--- a/media/filters/ffmpeg_demuxer_unittest.cc
+++ b/media/filters/ffmpeg_demuxer_unittest.cc
@@ -115,7 +115,12 @@ void InitializeFFmpegMocks() {
// Ref counted object so we can create callbacks to call DemuxerStream::Read().
class TestReader : public base::RefCountedThreadSafe<TestReader> {
public:
- TestReader() : called_(false), expecting_call_(false) {}
+ TestReader()
+ : called_(false),
+ expecting_call_(false),
+ wait_for_read_(false, false) {
+ }
+
virtual ~TestReader() {}
void Reset() {
@@ -123,6 +128,7 @@ class TestReader : public base::RefCountedThreadSafe<TestReader> {
expecting_call_ = false;
called_ = false;
buffer_ = NULL;
+ wait_for_read_.Reset();
}
void Read(DemuxerStream* stream) {
@@ -138,6 +144,11 @@ class TestReader : public base::RefCountedThreadSafe<TestReader> {
expecting_call_ = false;
called_ = true;
buffer_ = buffer;
+ wait_for_read_.Signal();
+ }
+
+ bool WaitForRead() {
+ return wait_for_read_.TimedWait(base::TimeDelta::FromMilliseconds(500));
}
// Mock getters/setters.
@@ -149,6 +160,7 @@ class TestReader : public base::RefCountedThreadSafe<TestReader> {
scoped_refptr<Buffer> buffer_;
bool called_;
bool expecting_call_;
+ base::WaitableEvent wait_for_read_;
};
} // namespace
@@ -353,8 +365,9 @@ TEST(FFmpegDemuxerTest, Read) {
scoped_refptr<TestReader> reader(new TestReader());
reader->Read(audio_stream);
pipeline.RunAllTasks();
+ EXPECT_TRUE(reader->WaitForRead());
EXPECT_TRUE(reader->called());
- EXPECT_TRUE(reader->buffer());
+ ASSERT_TRUE(reader->buffer());
EXPECT_EQ(audio_data, reader->buffer()->GetData());
EXPECT_EQ(kDataSize, reader->buffer()->GetDataSize());
@@ -367,8 +380,9 @@ TEST(FFmpegDemuxerTest, Read) {
reader->Reset();
reader->Read(video_stream);
pipeline.RunAllTasks();
+ EXPECT_TRUE(reader->WaitForRead());
EXPECT_TRUE(reader->called());
- EXPECT_TRUE(reader->buffer());
+ ASSERT_TRUE(reader->buffer());
EXPECT_EQ(video_data, reader->buffer()->GetData());
EXPECT_EQ(kDataSize, reader->buffer()->GetDataSize());
@@ -379,6 +393,7 @@ TEST(FFmpegDemuxerTest, Read) {
reader->Reset();
reader->Read(audio_stream);
pipeline.RunAllTasks();
+ EXPECT_FALSE(reader->WaitForRead());
EXPECT_FALSE(reader->called());
EXPECT_FALSE(reader->buffer());