diff options
author | scherkus@chromium.org <scherkus@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2012-11-17 01:50:13 +0000 |
---|---|---|
committer | scherkus@chromium.org <scherkus@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2012-11-17 01:50:13 +0000 |
commit | fc2f1d08441c382a04a7c2d8865bcfa2a06a93b6 (patch) | |
tree | d86676e0830c0a8c56761659a64fcca9d4367a50 /media | |
parent | e5adb903876353423b7f6faeb021e1445752391b (diff) | |
download | chromium_src-fc2f1d08441c382a04a7c2d8865bcfa2a06a93b6.zip chromium_src-fc2f1d08441c382a04a7c2d8865bcfa2a06a93b6.tar.gz chromium_src-fc2f1d08441c382a04a7c2d8865bcfa2a06a93b6.tar.bz2 |
Move blocking FFmpegDemuxer operations to a dedicated thread.
Previously it was possible to block message loop execution if FFmpegDemuxer was waiting for IO. As a result we could have janky response to various media operations (e.g., seeking, shutdown, setting volume) on a laggy network connection. It also led to adding a bunch of hairy locking code in FFmpegDemuxer. No more!
BUG=160640
Review URL: https://codereview.chromium.org/11362251
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@168350 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'media')
-rw-r--r-- | media/filters/ffmpeg_demuxer.cc | 124 | ||||
-rw-r--r-- | media/filters/ffmpeg_demuxer.h | 13 | ||||
-rw-r--r-- | media/filters/ffmpeg_demuxer_unittest.cc | 97 |
3 files changed, 159 insertions, 75 deletions
diff --git a/media/filters/ffmpeg_demuxer.cc b/media/filters/ffmpeg_demuxer.cc index ee04455..0a690e8 100644 --- a/media/filters/ffmpeg_demuxer.cc +++ b/media/filters/ffmpeg_demuxer.cc @@ -14,8 +14,10 @@ #include "base/message_loop.h" #include "base/stl_util.h" #include "base/string_util.h" +#include "base/task_runner_util.h" #include "base/time.h" #include "media/base/audio_decoder_config.h" +#include "media/base/bind_to_loop.h" #include "media/base/decoder_buffer.h" #include "media/base/limits.h" #include "media/base/media_switches.h" @@ -72,8 +74,7 @@ bool FFmpegDemuxerStream::HasPendingReads() { return !read_queue_.empty(); } -void FFmpegDemuxerStream::EnqueuePacket( - scoped_ptr_malloc<AVPacket, ScopedPtrAVFreePacket> packet) { +void FFmpegDemuxerStream::EnqueuePacket(ScopedAVPacket packet) { DCHECK(demuxer_->message_loop()->BelongsToCurrentThread()); base::AutoLock auto_lock(lock_); @@ -262,13 +263,14 @@ FFmpegDemuxer::FFmpegDemuxer( const scoped_refptr<DataSource>& data_source) : host_(NULL), message_loop_(message_loop), + blocking_thread_("FFmpegDemuxer"), data_source_(data_source), bitrate_(0), start_time_(kNoTimestamp()), audio_disabled_(false), duration_known_(false), - url_protocol_(data_source, base::Bind( - &FFmpegDemuxer::OnDataSourceError, base::Unretained(this))) { + url_protocol_(data_source, BindToLoop(message_loop_, base::Bind( + &FFmpegDemuxer::OnDataSourceError, base::Unretained(this)))) { DCHECK(message_loop_); DCHECK(data_source_); } @@ -284,11 +286,6 @@ void FFmpegDemuxer::Stop(const base::Closure& callback) { // Post a task to notify the streams to stop as well. message_loop_->PostTask(FROM_HERE, base::Bind(&FFmpegDemuxer::StopTask, this, callback)); - - // TODO(scherkus): This should be on |message_loop_| but today that thread is - // potentially blocked. Move to StopTask() after all blocking calls are - // guaranteed to run on a separate thread. - url_protocol_.Abort(); } void FFmpegDemuxer::Seek(base::TimeDelta time, const PipelineStatusCB& cb) { @@ -389,19 +386,50 @@ void FFmpegDemuxer::InitializeTask(DemuxerHost* host, // available, so add a metadata entry to ensure some is always present. av_dict_set(&format_context->metadata, "skip_id3v1_tags", "", 0); - if (!glue_->OpenContext()) { + // Open the AVFormatContext using our glue layer. + CHECK(blocking_thread_.Start()); + base::PostTaskAndReplyWithResult( + blocking_thread_.message_loop_proxy(), FROM_HERE, + base::Bind(&FFmpegGlue::OpenContext, base::Unretained(glue_.get())), + base::Bind(&FFmpegDemuxer::OnOpenContextDone, this, status_cb)); +} + +void FFmpegDemuxer::OnOpenContextDone(const PipelineStatusCB& status_cb, + bool result) { + DCHECK(message_loop_->BelongsToCurrentThread()); + if (!blocking_thread_.IsRunning()) { + status_cb.Run(PIPELINE_ERROR_ABORT); + return; + } + + if (!result) { status_cb.Run(DEMUXER_ERROR_COULD_NOT_OPEN); return; } // Fully initialize AVFormatContext by parsing the stream a little. - int result = avformat_find_stream_info(format_context, NULL); + base::PostTaskAndReplyWithResult( + blocking_thread_.message_loop_proxy(), FROM_HERE, + base::Bind(&avformat_find_stream_info, glue_->format_context(), + static_cast<AVDictionary**>(NULL)), + base::Bind(&FFmpegDemuxer::OnFindStreamInfoDone, this, status_cb)); +} + +void FFmpegDemuxer::OnFindStreamInfoDone(const PipelineStatusCB& status_cb, + int result) { + DCHECK(message_loop_->BelongsToCurrentThread()); + if (!blocking_thread_.IsRunning()) { + status_cb.Run(PIPELINE_ERROR_ABORT); + return; + } + if (result < 0) { status_cb.Run(DEMUXER_ERROR_COULD_NOT_PARSE); return; } // Create demuxer stream entries for each possible AVStream. + AVFormatContext* format_context = glue_->format_context(); streams_.resize(format_context->nb_streams); bool found_audio_stream = false; bool found_video_stream = false; @@ -483,27 +511,40 @@ void FFmpegDemuxer::InitializeTask(DemuxerHost* host, void FFmpegDemuxer::SeekTask(base::TimeDelta time, const PipelineStatusCB& cb) { DCHECK(message_loop_->BelongsToCurrentThread()); - // Tell streams to flush buffers due to seeking. - StreamVector::iterator iter; - for (iter = streams_.begin(); iter != streams_.end(); ++iter) { - if (*iter) - (*iter)->FlushBuffers(); - } - // Always seek to a timestamp less than or equal to the desired timestamp. int flags = AVSEEK_FLAG_BACKWARD; // Passing -1 as our stream index lets FFmpeg pick a default stream. FFmpeg // will attempt to use the lowest-index video stream, if present, followed by // the lowest-index audio stream. - if (av_seek_frame(glue_->format_context(), -1, time.InMicroseconds(), - flags) < 0) { + base::PostTaskAndReplyWithResult( + blocking_thread_.message_loop_proxy(), FROM_HERE, + base::Bind(&av_seek_frame, glue_->format_context(), -1, + time.InMicroseconds(), flags), + base::Bind(&FFmpegDemuxer::OnSeekFrameDone, this, cb)); +} + +void FFmpegDemuxer::OnSeekFrameDone(const PipelineStatusCB& cb, int result) { + DCHECK(message_loop_->BelongsToCurrentThread()); + if (!blocking_thread_.IsRunning()) { + cb.Run(PIPELINE_ERROR_ABORT); + return; + } + + if (result < 0) { // Use VLOG(1) instead of NOTIMPLEMENTED() to prevent the message being // captured from stdout and contaminates testing. // TODO(scherkus): Implement this properly and signal error (BUG=23447). VLOG(1) << "Not implemented"; } + // Tell streams to flush buffers due to seeking. + StreamVector::iterator iter; + for (iter = streams_.begin(); iter != streams_.end(); ++iter) { + if (*iter) + (*iter)->FlushBuffers(); + } + // Notify we're finished seeking. cb.Run(PIPELINE_OK); } @@ -516,9 +557,24 @@ void FFmpegDemuxer::DemuxTask() { return; } - // Allocate and read an AVPacket from the media. - scoped_ptr_malloc<AVPacket, ScopedPtrAVFreePacket> packet(new AVPacket()); - int result = av_read_frame(glue_->format_context(), packet.get()); + // Allocate and read an AVPacket from the media. Save |packet_ptr| since + // evaluation order of packet.get() and base::Passed(&packet) is + // undefined. + ScopedAVPacket packet(new AVPacket()); + AVPacket* packet_ptr = packet.get(); + + base::PostTaskAndReplyWithResult( + blocking_thread_.message_loop_proxy(), FROM_HERE, + base::Bind(&av_read_frame, glue_->format_context(), packet_ptr), + base::Bind(&FFmpegDemuxer::OnReadFrameDone, this, base::Passed(&packet))); +} + +void FFmpegDemuxer::OnReadFrameDone(ScopedAVPacket packet, int result) { + DCHECK(message_loop_->BelongsToCurrentThread()); + if (!blocking_thread_.IsRunning()) { + return; + } + if (result < 0) { // Update the duration based on the audio stream if // it was previously unknown http://crbug.com/86830 @@ -574,11 +630,20 @@ void FFmpegDemuxer::StopTask(const base::Closure& callback) { if (*iter) (*iter)->Stop(); } - if (data_source_) { - data_source_->Stop(callback); - } else { - callback.Run(); - } + + url_protocol_.Abort(); + data_source_->Stop(BindToLoop(message_loop_, base::Bind( + &FFmpegDemuxer::OnDataSourceStopped, this, callback))); +} + +void FFmpegDemuxer::OnDataSourceStopped(const base::Closure& callback) { + // This will block until all tasks complete. Note that after this returns it's + // possible for reply tasks (e.g., OnReadFrameDone()) to be queued on this + // thread. Each of the reply task methods must check whether we've stopped the + // thread and drop their results on the floor. + DCHECK(message_loop_->BelongsToCurrentThread()); + blocking_thread_.Stop(); + callback.Run(); } void FFmpegDemuxer::DisableAudioStreamTask() { @@ -611,8 +676,7 @@ void FFmpegDemuxer::StreamHasEnded() { (audio_disabled_ && (*iter)->type() == DemuxerStream::AUDIO)) { continue; } - (*iter)->EnqueuePacket( - scoped_ptr_malloc<AVPacket, ScopedPtrAVFreePacket>()); + (*iter)->EnqueuePacket(ScopedAVPacket()); } } diff --git a/media/filters/ffmpeg_demuxer.h b/media/filters/ffmpeg_demuxer.h index eea4a0a..b5a2710 100644 --- a/media/filters/ffmpeg_demuxer.h +++ b/media/filters/ffmpeg_demuxer.h @@ -27,6 +27,7 @@ #include "base/callback.h" #include "base/gtest_prod_util.h" +#include "base/threading/thread.h" #include "media/base/audio_decoder_config.h" #include "media/base/decoder_buffer.h" #include "media/base/demuxer.h" @@ -46,6 +47,8 @@ class FFmpegGlue; class FFmpegH264ToAnnexBBitstreamConverter; class ScopedPtrAVFreePacket; +typedef scoped_ptr_malloc<AVPacket, ScopedPtrAVFreePacket> ScopedAVPacket; + class FFmpegDemuxerStream : public DemuxerStream { public: // Keeps a copy of |demuxer| and initializes itself using information @@ -59,7 +62,7 @@ class FFmpegDemuxerStream : public DemuxerStream { // Enqueues the given AVPacket. If |packet| is NULL an end of stream packet // is enqueued. - void EnqueuePacket(scoped_ptr_malloc<AVPacket, ScopedPtrAVFreePacket> packet); + void EnqueuePacket(ScopedAVPacket packet); // Signals to empty the buffer queue and mark next packet as discontinuous. void FlushBuffers(); @@ -172,15 +175,20 @@ class MEDIA_EXPORT FFmpegDemuxer : public Demuxer { // Carries out initialization on the demuxer thread. void InitializeTask(DemuxerHost* host, const PipelineStatusCB& status_cb); + void OnOpenContextDone(const PipelineStatusCB& status_cb, bool result); + void OnFindStreamInfoDone(const PipelineStatusCB& status_cb, int result); // Carries out a seek on the demuxer thread. void SeekTask(base::TimeDelta time, const PipelineStatusCB& cb); + void OnSeekFrameDone(const PipelineStatusCB& cb, int result); // Carries out demuxing and satisfying stream reads on the demuxer thread. void DemuxTask(); + void OnReadFrameDone(ScopedAVPacket packet, int result); // Carries out stopping the demuxer streams on the demuxer thread. void StopTask(const base::Closure& callback); + void OnDataSourceStopped(const base::Closure& callback); // Carries out disabling the audio stream on the demuxer thread. void DisableAudioStreamTask(); @@ -209,6 +217,9 @@ class MEDIA_EXPORT FFmpegDemuxer : public Demuxer { scoped_refptr<base::MessageLoopProxy> message_loop_; + // Thread on which all blocking FFmpeg operations are executed. + base::Thread blocking_thread_; + // |streams_| mirrors the AVStream array in |format_context_|. It contains // FFmpegDemuxerStreams encapsluating AVStream objects at the same index. // diff --git a/media/filters/ffmpeg_demuxer_unittest.cc b/media/filters/ffmpeg_demuxer_unittest.cc index fd52beb..9ce8659 100644 --- a/media/filters/ffmpeg_demuxer_unittest.cc +++ b/media/filters/ffmpeg_demuxer_unittest.cc @@ -39,6 +39,9 @@ MATCHER(IsEndOfStreamBuffer, static void EosOnReadDone(bool* got_eos_buffer, DemuxerStream::Status status, const scoped_refptr<DecoderBuffer>& buffer) { + MessageLoop::current()->PostTask( + FROM_HERE, MessageLoop::QuitWhenIdleClosure()); + EXPECT_EQ(status, DemuxerStream::kOk); if (buffer->IsEndOfStream()) { *got_eos_buffer = true; @@ -61,13 +64,10 @@ class FFmpegDemuxerTest : public testing::Test { virtual ~FFmpegDemuxerTest() { if (demuxer_) { - // Call Stop() to shut down internal threads. - demuxer_->Stop(NewExpectedClosure()); + demuxer_->Stop(MessageLoop::QuitWhenIdleClosure()); + message_loop_.Run(); } - // Finish up any remaining tasks. - message_loop_.RunUntilIdle(); - // Release the reference to the demuxer. demuxer_ = NULL; } @@ -87,8 +87,8 @@ class FFmpegDemuxerTest : public testing::Test { void InitializeDemuxer() { EXPECT_CALL(host_, SetDuration(_)); - demuxer_->Initialize(&host_, NewExpectedStatusCB(PIPELINE_OK)); - message_loop_.RunUntilIdle(); + demuxer_->Initialize(&host_, NewStatusCB(PIPELINE_OK)); + message_loop_.Run(); } MOCK_METHOD2(OnReadDoneCalled, void(int, int64)); @@ -110,6 +110,9 @@ class FFmpegDemuxerTest : public testing::Test { EXPECT_EQ(size, buffer->GetDataSize()); EXPECT_EQ(base::TimeDelta::FromMicroseconds(timestampInMicroseconds), buffer->GetTimestamp()); + + DCHECK_EQ(&message_loop_, MessageLoop::current()); + message_loop_.PostTask(FROM_HERE, MessageLoop::QuitWhenIdleClosure()); } DemuxerStream::ReadCB NewReadCB(const tracked_objects::Location& location, @@ -119,6 +122,18 @@ class FFmpegDemuxerTest : public testing::Test { location, size, timestampInMicroseconds); } + PipelineStatusCB NewStatusCB(PipelineStatus expected) { + return base::Bind(&FFmpegDemuxerTest::OnStatusDone, + base::Unretained(this), expected); + } + + void OnStatusDone(PipelineStatus expected, PipelineStatus status) { + EXPECT_EQ(expected, status); + + DCHECK_EQ(&message_loop_, MessageLoop::current()); + message_loop_.PostTask(FROM_HERE, MessageLoop::QuitWhenIdleClosure()); + } + // Accessor to demuxer internals. void set_duration_known(bool duration_known) { demuxer_->duration_known_ = duration_known; @@ -149,7 +164,7 @@ class FFmpegDemuxerTest : public testing::Test { const int kMaxBuffers = 170; for (int i = 0; !got_eos_buffer && i < kMaxBuffers; i++) { audio->Read(base::Bind(&EosOnReadDone, &got_eos_buffer)); - message_loop_.RunUntilIdle(); + message_loop_.Run(); } EXPECT_TRUE(got_eos_buffer); @@ -176,11 +191,9 @@ class FFmpegDemuxerTest : public testing::Test { TEST_F(FFmpegDemuxerTest, Initialize_OpenFails) { // Simulate avformat_open_input() failing. - CreateDemuxer("ten_byte_file"), - demuxer_->Initialize( - &host_, NewExpectedStatusCB(DEMUXER_ERROR_COULD_NOT_OPEN)); - - message_loop_.RunUntilIdle(); + CreateDemuxer("ten_byte_file"); + demuxer_->Initialize(&host_, NewStatusCB(DEMUXER_ERROR_COULD_NOT_OPEN)); + message_loop_.Run(); } // TODO(acolwell): Uncomment this test when we discover a file that passes @@ -196,17 +209,15 @@ TEST_F(FFmpegDemuxerTest, Initialize_OpenFails) { TEST_F(FFmpegDemuxerTest, Initialize_NoStreams) { // Open a file with no streams whatsoever. CreateDemuxer("no_streams.webm"); - demuxer_->Initialize( - &host_, NewExpectedStatusCB(DEMUXER_ERROR_NO_SUPPORTED_STREAMS)); - message_loop_.RunUntilIdle(); + demuxer_->Initialize(&host_, NewStatusCB(DEMUXER_ERROR_NO_SUPPORTED_STREAMS)); + message_loop_.Run(); } TEST_F(FFmpegDemuxerTest, Initialize_NoAudioVideo) { // Open a file containing streams but none of which are audio/video streams. CreateDemuxer("no_audio_video.webm"); - demuxer_->Initialize( - &host_, NewExpectedStatusCB(DEMUXER_ERROR_NO_SUPPORTED_STREAMS)); - message_loop_.RunUntilIdle(); + demuxer_->Initialize(&host_, NewStatusCB(DEMUXER_ERROR_NO_SUPPORTED_STREAMS)); + message_loop_.Run(); } TEST_F(FFmpegDemuxerTest, Initialize_Successful) { @@ -289,10 +300,10 @@ TEST_F(FFmpegDemuxerTest, Read_Audio) { demuxer_->GetStream(DemuxerStream::AUDIO); audio->Read(NewReadCB(FROM_HERE, 29, 0)); - message_loop_.RunUntilIdle(); + message_loop_.Run(); audio->Read(NewReadCB(FROM_HERE, 27, 3000)); - message_loop_.RunUntilIdle(); + message_loop_.Run(); } TEST_F(FFmpegDemuxerTest, Read_Video) { @@ -305,10 +316,10 @@ TEST_F(FFmpegDemuxerTest, Read_Video) { demuxer_->GetStream(DemuxerStream::VIDEO); video->Read(NewReadCB(FROM_HERE, 22084, 0)); - message_loop_.RunUntilIdle(); + message_loop_.Run(); video->Read(NewReadCB(FROM_HERE, 1057, 33000)); - message_loop_.RunUntilIdle(); + message_loop_.Run(); } TEST_F(FFmpegDemuxerTest, Read_VideoNonZeroStart) { @@ -324,11 +335,11 @@ TEST_F(FFmpegDemuxerTest, Read_VideoNonZeroStart) { // Check first buffer in video stream. video->Read(NewReadCB(FROM_HERE, 5636, 400000)); - message_loop_.RunUntilIdle(); + message_loop_.Run(); // Check first buffer in audio stream. audio->Read(NewReadCB(FROM_HERE, 165, 396000)); - message_loop_.RunUntilIdle(); + message_loop_.Run(); // Verify that the start time is equal to the lowest timestamp (ie the audio). EXPECT_EQ(demuxer_->GetStartTime().InMicroseconds(), 396000); @@ -366,28 +377,28 @@ TEST_F(FFmpegDemuxerTest, Seek) { // Read a video packet and release it. video->Read(NewReadCB(FROM_HERE, 22084, 0)); - message_loop_.RunUntilIdle(); + message_loop_.Run(); // Issue a simple forward seek, which should discard queued packets. demuxer_->Seek(base::TimeDelta::FromMicroseconds(1000000), - NewExpectedStatusCB(PIPELINE_OK)); - message_loop_.RunUntilIdle(); + NewStatusCB(PIPELINE_OK)); + message_loop_.Run(); // Audio read #1. audio->Read(NewReadCB(FROM_HERE, 145, 803000)); - message_loop_.RunUntilIdle(); + message_loop_.Run(); // Audio read #2. audio->Read(NewReadCB(FROM_HERE, 148, 826000)); - message_loop_.RunUntilIdle(); + message_loop_.Run(); // Video read #1. video->Read(NewReadCB(FROM_HERE, 5425, 801000)); - message_loop_.RunUntilIdle(); + message_loop_.Run(); // Video read #2. video->Read(NewReadCB(FROM_HERE, 1906, 834000)); - message_loop_.RunUntilIdle(); + message_loop_.Run(); } // A mocked callback specialization for calling Read(). Since RunWithParams() @@ -459,10 +470,8 @@ TEST_F(FFmpegDemuxerTest, StreamReadAfterStopAndDemuxerDestruction) { demuxer_->GetStream(DemuxerStream::AUDIO); ASSERT_TRUE(audio); - demuxer_->Stop(NewExpectedClosure()); - - // Finish up any remaining tasks. - message_loop_.RunUntilIdle(); + demuxer_->Stop(MessageLoop::QuitWhenIdleClosure()); + message_loop_.Run(); // Expect all calls in sequence. InSequence s; @@ -516,7 +525,7 @@ TEST_F(FFmpegDemuxerTest, DisableAudioStream) { // Attempt a read from the video stream: it should return valid data. video->Read(NewReadCB(FROM_HERE, 22084, 0)); - message_loop_.RunUntilIdle(); + message_loop_.Run(); // Attempt a read from the audio stream: it should immediately return end of // stream without requiring the message loop to read data. @@ -541,28 +550,28 @@ TEST_F(FFmpegDemuxerTest, SeekWithCuesBeforeFirstCluster) { // Read a video packet and release it. video->Read(NewReadCB(FROM_HERE, 22084, 0)); - message_loop_.RunUntilIdle(); + message_loop_.Run(); // Issue a simple forward seek, which should discard queued packets. demuxer_->Seek(base::TimeDelta::FromMicroseconds(2500000), - NewExpectedStatusCB(PIPELINE_OK)); - message_loop_.RunUntilIdle(); + NewStatusCB(PIPELINE_OK)); + message_loop_.Run(); // Audio read #1. audio->Read(NewReadCB(FROM_HERE, 40, 2403000)); - message_loop_.RunUntilIdle(); + message_loop_.Run(); // Audio read #2. audio->Read(NewReadCB(FROM_HERE, 42, 2406000)); - message_loop_.RunUntilIdle(); + message_loop_.Run(); // Video read #1. video->Read(NewReadCB(FROM_HERE, 5276, 2402000)); - message_loop_.RunUntilIdle(); + message_loop_.Run(); // Video read #2. video->Read(NewReadCB(FROM_HERE, 1740, 2436000)); - message_loop_.RunUntilIdle(); + message_loop_.Run(); } // Ensure ID3v1 tag reading is disabled. id3_test.mp3 has an ID3v1 tag with the |