diff options
author | scherkus@chromium.org <scherkus@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2009-03-31 01:18:38 +0000 |
---|---|---|
committer | scherkus@chromium.org <scherkus@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2009-03-31 01:18:38 +0000 |
commit | 8ed85f1f705ad8cf647a1e4cb17f00749b370c4e (patch) | |
tree | 8ac3c62fbd44396dcc90ccbb012131ab02ea64c9 | |
parent | 245b3cd59ab5f548431b73ddb71a3e9a38a58d95 (diff) | |
download | chromium_src-8ed85f1f705ad8cf647a1e4cb17f00749b370c4e.zip chromium_src-8ed85f1f705ad8cf647a1e4cb17f00749b370c4e.tar.gz chromium_src-8ed85f1f705ad8cf647a1e4cb17f00749b370c4e.tar.bz2 |
FFmpegDemuxer now uses a thread to handle demuxing.
Since FFmpegDemuxer no longer uses the pipeline thread to demuxer, I had to add a WaitableEvent to TestReader to make sure the read completed (or not!).
Downside is that the test for when a read is never completed now waits 500ms before timing out. Oh well :(
Review URL: http://codereview.chromium.org/55047
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@12831 0039d316-1c4b-4281-b951-d872f2087c98
-rw-r--r-- | media/base/pipeline.h | 1 | ||||
-rw-r--r-- | media/filters/ffmpeg_demuxer.cc | 75 | ||||
-rw-r--r-- | media/filters/ffmpeg_demuxer.h | 30 | ||||
-rw-r--r-- | media/filters/ffmpeg_demuxer_unittest.cc | 21 |
4 files changed, 84 insertions, 43 deletions
diff --git a/media/base/pipeline.h b/media/base/pipeline.h index f5b91ff..3b18b81 100644 --- a/media/base/pipeline.h +++ b/media/base/pipeline.h @@ -40,6 +40,7 @@ enum PipelineError { DEMUXER_ERROR_COULD_NOT_OPEN, DEMUXER_ERROR_COULD_NOT_PARSE, DEMUXER_ERROR_NO_SUPPORTED_STREAMS, + DEMUXER_ERROR_COULD_NOT_CREATE_THREAD, }; // Base class for Pipeline class which allows for read-only access to members. diff --git a/media/filters/ffmpeg_demuxer.cc b/media/filters/ffmpeg_demuxer.cc index 5208ed2..c0af41a 100644 --- a/media/filters/ffmpeg_demuxer.cc +++ b/media/filters/ffmpeg_demuxer.cc @@ -139,7 +139,7 @@ void FFmpegDemuxerStream::Read(Callback1<Buffer*>::Type* read_callback) { read_queue_.push_back(read_callback); } if (FulfillPendingReads()) { - demuxer_->ScheduleDemux(); + demuxer_->SignalDemux(); } } @@ -169,21 +169,25 @@ bool FFmpegDemuxerStream::FulfillPendingReads() { // FFmpegDemuxer // FFmpegDemuxer::FFmpegDemuxer() - : demuxing_(false), - format_context_(NULL) { + : format_context_(NULL), + thread_(NULL), + wait_for_demux_(false, false), + shutdown_(false) { } FFmpegDemuxer::~FFmpegDemuxer() { + if (thread_) { + shutdown_ = true; + SignalDemux(); + PlatformThread::Join(thread_); + } if (format_context_) { av_free(format_context_); } } -void FFmpegDemuxer::ScheduleDemux() { - if (!demuxing_) { - demuxing_ = true; - host_->PostTask(NewRunnableMethod(this, &FFmpegDemuxer::Demux)); - } +void FFmpegDemuxer::SignalDemux() { + wait_for_demux_.Signal(); } void FFmpegDemuxer::Stop() { @@ -241,8 +245,13 @@ bool FFmpegDemuxer::Initialize(DataSource* data_source) { return false; } - // We have at least one supported stream, set the duration and notify we're - // done initializing. + // We have some streams to demux so create our thread. + if (!PlatformThread::Create(0, this, &thread_)) { + host_->Error(DEMUXER_ERROR_COULD_NOT_CREATE_THREAD); + return false; + } + + // Good to go: set the duration and notify we're done initializing. host_->SetDuration(max_duration); host_->InitializationComplete(); return true; @@ -258,30 +267,34 @@ scoped_refptr<DemuxerStream> FFmpegDemuxer::GetStream(int stream) { return streams_[stream].get(); } -void FFmpegDemuxer::Demux() { - DCHECK(demuxing_); - - // Loop until we've satisfied every stream. - while (StreamsHavePendingReads()) { - // Allocate and read an AVPacket from the media. - scoped_ptr<AVPacket> packet(new AVPacket()); - int result = av_read_frame(format_context_, packet.get()); - if (result < 0) { - // TODO(scherkus): handle end of stream by marking Buffer with the end of - // stream flag. - NOTIMPLEMENTED(); - break; +void FFmpegDemuxer::ThreadMain() { + PlatformThread::SetName("DemuxerThread"); + while (!shutdown_) { + // Loop until we've satisfied every stream. + while (StreamsHavePendingReads()) { + // Allocate and read an AVPacket from the media. + scoped_ptr<AVPacket> packet(new AVPacket()); + int result = av_read_frame(format_context_, packet.get()); + if (result < 0) { + // TODO(scherkus): handle end of stream by marking Buffer with the end + // of stream flag. + NOTIMPLEMENTED(); + break; + } + + // Queue the packet with the appropriate stream. + // TODO(scherkus): should we post this back to the pipeline thread? I'm + // worried about downstream filters (i.e., decoders) executing on this + // thread. + DCHECK(packet->stream_index >= 0); + DCHECK(packet->stream_index < static_cast<int>(streams_.size())); + FFmpegDemuxerStream* demuxer_stream = streams_[packet->stream_index]; + demuxer_stream->EnqueuePacket(packet.release()); } - // Queue the packet with the appropriate stream. - DCHECK(packet->stream_index >= 0); - DCHECK(packet->stream_index < static_cast<int>(streams_.size())); - FFmpegDemuxerStream* demuxer_stream = streams_[packet->stream_index]; - demuxer_stream->EnqueuePacket(packet.release()); + // Wait until we're signaled to either shutdown or satisfy more reads. + wait_for_demux_.Wait(); } - - // Finished demuxing. - demuxing_ = false; } bool FFmpegDemuxer::StreamsHavePendingReads() { diff --git a/media/filters/ffmpeg_demuxer.h b/media/filters/ffmpeg_demuxer.h index bf03c2e..bcccf55 100644 --- a/media/filters/ffmpeg_demuxer.h +++ b/media/filters/ffmpeg_demuxer.h @@ -22,6 +22,8 @@ #include <vector> #include "base/lock.h" +#include "base/thread.h" +#include "base/waitable_event.h" #include "media/base/buffers.h" #include "media/base/factory.h" #include "media/base/filters.h" @@ -48,6 +50,8 @@ class FFmpegDemuxerStream : public DemuxerStream { virtual ~FFmpegDemuxerStream(); // Returns true is this stream has pending reads, false otherwise. + // + // Safe to call on any thread. bool HasPendingReads(); // Enqueues and takes ownership over the given AVPacket. @@ -89,15 +93,15 @@ class FFmpegDemuxerStream : public DemuxerStream { DISALLOW_COPY_AND_ASSIGN(FFmpegDemuxerStream); }; -class FFmpegDemuxer : public Demuxer { +class FFmpegDemuxer : public Demuxer, public PlatformThread::Delegate { public: // FilterFactory provider. static FilterFactory* CreateFilterFactory() { return new FilterFactoryImpl0<FFmpegDemuxer>(); } - // Called by FFmpegDemuxerStreams to schedule a Demux() task. - void ScheduleDemux(); + // Called by FFmpegDemuxerStreams to signal the demux event. + void SignalDemux(); // MediaFilter implementation. virtual void Stop(); @@ -107,21 +111,20 @@ class FFmpegDemuxer : public Demuxer { virtual size_t GetNumberOfStreams(); virtual scoped_refptr<DemuxerStream> GetStream(int stream_id); + // PlatformThread::Delegate implementation. + virtual void ThreadMain(); + private: // Only allow a factory to create this class. friend class FilterFactoryImpl0<FFmpegDemuxer>; FFmpegDemuxer(); virtual ~FFmpegDemuxer(); - // Demuxing task scheduled by streams. - void Demux(); - // Returns true if any of the streams have pending reads. + // + // Safe to call on any thread. bool StreamsHavePendingReads(); - // Flag to prevent multiple Demux() tasks from being scheduled. - bool demuxing_; - // FFmpeg context handle. AVFormatContext* format_context_; @@ -129,6 +132,15 @@ class FFmpegDemuxer : public Demuxer { typedef std::vector< scoped_refptr<FFmpegDemuxerStream> > StreamVector; StreamVector streams_; + // Thread handle. + PlatformThreadHandle thread_; + + // Event to signal demux. + base::WaitableEvent wait_for_demux_; + + // Used to signal |thread_| to terminate. + bool shutdown_; + DISALLOW_COPY_AND_ASSIGN(FFmpegDemuxer); }; diff --git a/media/filters/ffmpeg_demuxer_unittest.cc b/media/filters/ffmpeg_demuxer_unittest.cc index 21c0c45..e5da633 100644 --- a/media/filters/ffmpeg_demuxer_unittest.cc +++ b/media/filters/ffmpeg_demuxer_unittest.cc @@ -115,7 +115,12 @@ void InitializeFFmpegMocks() { // Ref counted object so we can create callbacks to call DemuxerStream::Read(). class TestReader : public base::RefCountedThreadSafe<TestReader> { public: - TestReader() : called_(false), expecting_call_(false) {} + TestReader() + : called_(false), + expecting_call_(false), + wait_for_read_(false, false) { + } + virtual ~TestReader() {} void Reset() { @@ -123,6 +128,7 @@ class TestReader : public base::RefCountedThreadSafe<TestReader> { expecting_call_ = false; called_ = false; buffer_ = NULL; + wait_for_read_.Reset(); } void Read(DemuxerStream* stream) { @@ -138,6 +144,11 @@ class TestReader : public base::RefCountedThreadSafe<TestReader> { expecting_call_ = false; called_ = true; buffer_ = buffer; + wait_for_read_.Signal(); + } + + bool WaitForRead() { + return wait_for_read_.TimedWait(base::TimeDelta::FromMilliseconds(500)); } // Mock getters/setters. @@ -149,6 +160,7 @@ class TestReader : public base::RefCountedThreadSafe<TestReader> { scoped_refptr<Buffer> buffer_; bool called_; bool expecting_call_; + base::WaitableEvent wait_for_read_; }; } // namespace @@ -353,8 +365,9 @@ TEST(FFmpegDemuxerTest, Read) { scoped_refptr<TestReader> reader(new TestReader()); reader->Read(audio_stream); pipeline.RunAllTasks(); + EXPECT_TRUE(reader->WaitForRead()); EXPECT_TRUE(reader->called()); - EXPECT_TRUE(reader->buffer()); + ASSERT_TRUE(reader->buffer()); EXPECT_EQ(audio_data, reader->buffer()->GetData()); EXPECT_EQ(kDataSize, reader->buffer()->GetDataSize()); @@ -367,8 +380,9 @@ TEST(FFmpegDemuxerTest, Read) { reader->Reset(); reader->Read(video_stream); pipeline.RunAllTasks(); + EXPECT_TRUE(reader->WaitForRead()); EXPECT_TRUE(reader->called()); - EXPECT_TRUE(reader->buffer()); + ASSERT_TRUE(reader->buffer()); EXPECT_EQ(video_data, reader->buffer()->GetData()); EXPECT_EQ(kDataSize, reader->buffer()->GetDataSize()); @@ -379,6 +393,7 @@ TEST(FFmpegDemuxerTest, Read) { reader->Reset(); reader->Read(audio_stream); pipeline.RunAllTasks(); + EXPECT_FALSE(reader->WaitForRead()); EXPECT_FALSE(reader->called()); EXPECT_FALSE(reader->buffer()); |