diff options
author | scherkus@chromium.org <scherkus@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2009-07-25 02:12:11 +0000 |
---|---|---|
committer | scherkus@chromium.org <scherkus@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2009-07-25 02:12:11 +0000 |
commit | 354589750cf3959506e5c2a34e66cc462fb7f3c4 (patch) | |
tree | 7d1a283d2cc5e4c3caf2686c42fcd5a0b5f8eb04 /media/base | |
parent | 30bcf3718ace8f0ab46be60f318b476b658217b2 (diff) | |
download | chromium_src-354589750cf3959506e5c2a34e66cc462fb7f3c4.zip chromium_src-354589750cf3959506e5c2a34e66cc462fb7f3c4.tar.gz chromium_src-354589750cf3959506e5c2a34e66cc462fb7f3c4.tar.bz2 |
Implemented proper pause-then-seek behaviour for the media pipeline.
MediaFilter now has asynchronous Play() and Pause() methods that are implemented by default. Since we are a completely pull-based system, it turns out only AudioRendererBase and VideoRendererBase need to override them to halt reading from decoders.
When a seek is received by the pipeline, it goes through the following state transitions:
1) Playing -> Pausing -> Paused (notify filters to stop reading)
2) Paused -> Seeking (notify filters to flush buffers and preroll)
3) Seeking -> Playing (preroll completed, resume playback)
The key to this system is that there must be no pending reads in *any* filter when we start transitioning into the seeking state. That means the audio/video renderers do not complete their pausing->paused transition until they have completed all pending reads. To reiterate, since we're pulled based if the renderers are not reading any data, then *nothing* is happening in the pipeline.
We get a lot of nice benefits from this assertion, namely no callbacks are ever "lost", they are always replied to until we are fully paused. Furthermore, we can guarantee that the first buffer received after a Seek() will be guaranteed to be discontinuous.
This change also properly handles end-of-stream, seeking from an end-of-stream state, and displaying frames while paused. In summary, I was able to call Seek() in a while(true) loop without crashing or going out of sync.
BUG=16014,16021,17456
TEST=seeking should be way more robust
Review URL: http://codereview.chromium.org/160005
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@21611 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'media/base')
-rw-r--r-- | media/base/filters.h | 19 | ||||
-rw-r--r-- | media/base/pipeline.h | 1 | ||||
-rw-r--r-- | media/base/pipeline_impl.cc | 154 | ||||
-rw-r--r-- | media/base/pipeline_impl.h | 98 | ||||
-rw-r--r-- | media/base/pipeline_impl_unittest.cc | 16 |
5 files changed, 225 insertions, 63 deletions
diff --git a/media/base/filters.h b/media/base/filters.h index 3029cbc..8fdfe011 100644 --- a/media/base/filters.h +++ b/media/base/filters.h @@ -88,6 +88,25 @@ class MediaFilter : public base::RefCountedThreadSafe<MediaFilter> { return message_loop_; } + // The pipeline has resumed playback. Filters can continue requesting reads. + // Filters may implement this method if they need to respond to this call. + virtual void Play(FilterCallback* callback) { + if (callback) { + callback->Run(); + delete callback; + } + } + + // The pipeline has paused playback. Filters should fulfill any existing read + // requests and then idle. Filters may implement this method if they need to + // respond to this call. + virtual void Pause(FilterCallback* callback) { + if (callback) { + callback->Run(); + delete callback; + } + } + // The pipeline is being stopped either as a result of an error or because // the client called Stop(). virtual void Stop() = 0; diff --git a/media/base/pipeline.h b/media/base/pipeline.h index 58de36b..c0286e5 100644 --- a/media/base/pipeline.h +++ b/media/base/pipeline.h @@ -36,7 +36,6 @@ enum PipelineError { PIPELINE_ERROR_COULD_NOT_RENDER, PIPELINE_ERROR_READ, PIPELINE_ERROR_AUDIO_HARDWARE, - PIPELINE_ERROR_NO_DATA, // Demuxer related errors. DEMUXER_ERROR_COULD_NOT_OPEN, DEMUXER_ERROR_COULD_NOT_PARSE, diff --git a/media/base/pipeline_impl.cc b/media/base/pipeline_impl.cc index 07378c0..9b482f8 100644 --- a/media/base/pipeline_impl.cc +++ b/media/base/pipeline_impl.cc @@ -73,7 +73,8 @@ void DecrementCounter(Lock* lock, ConditionVariable* cond_var, int* count) { PipelineImpl::PipelineImpl(MessageLoop* message_loop) : message_loop_(message_loop), - state_(kCreated) { + state_(kCreated), + remaining_transitions_(0) { ResetState(); } @@ -142,7 +143,15 @@ bool PipelineImpl::IsInitialized() const { // lock, because this is breaching the contract that |state_| is only accessed // on |message_loop_|. AutoLock auto_lock(lock_); - return state_ == kStarted; + switch (state_) { + case kPausing: + case kSeeking: + case kStarting: + case kStarted: + return true; + default: + return false; + } } bool PipelineImpl::IsRendered(const std::string& major_mime_type) const { @@ -258,6 +267,23 @@ bool PipelineImpl::IsPipelineInitializing() { state_ == kInitVideoRenderer; } +// static +bool PipelineImpl::StateTransitionsToStarted(State state) { + return state == kPausing || state == kSeeking || state == kStarting; +} + +// static +PipelineImpl::State PipelineImpl::FindNextState(State current) { + // TODO(scherkus): refactor InitializeTask() to make use of this function. + if (current == kPausing) + return kSeeking; + if (current == kSeeking) + return kStarting; + if (current == kStarting) + return kStarted; + return current; +} + void PipelineImpl::SetError(PipelineError error) { DCHECK(IsRunning()); DCHECK(error != PIPELINE_OK) << "PIPELINE_OK isn't an error!"; @@ -331,9 +357,10 @@ void PipelineImpl::OnFilterInitialize() { } // Called from any thread. -void PipelineImpl::OnFilterSeek() { - // TODO(scherkus): have PipelineInternal wait to receive replies from every - // filter before calling the client's |seek_callback_|. +void PipelineImpl::OnFilterStateTransition() { + // Continue walking down the filters. + message_loop_->PostTask(FROM_HERE, + NewRunnableMethod(this, &PipelineImpl::FilterStateTransitionTask)); } void PipelineImpl::StartTask(FilterFactory* filter_factory, @@ -343,7 +370,7 @@ void PipelineImpl::StartTask(FilterFactory* filter_factory, DCHECK_EQ(kCreated, state_); filter_factory_ = filter_factory; url_ = url; - start_callback_.reset(start_callback); + seek_callback_.reset(start_callback); // Kick off initialization. InitializeTask(); @@ -430,16 +457,21 @@ void PipelineImpl::InitializeTask() { return; } - // Initialization was successful, set the volume and playback rate. + // We've successfully created and initialized every filter, so we no longer + // need the filter factory. + filter_factory_ = NULL; + + // Initialization was successful, we are now considered paused, so it's safe + // to set the initial playback rate and volume. PlaybackRateChangedTask(GetPlaybackRate()); VolumeChangedTask(GetVolume()); - state_ = kStarted; - filter_factory_ = NULL; - if (start_callback_.get()) { - start_callback_->Run(); - start_callback_.reset(); - } + // Fire the initial seek request to get the filters to preroll. + state_ = kSeeking; + remaining_transitions_ = filters_.size(); + seek_timestamp_ = base::TimeDelta(); + filters_.front()->Seek(seek_timestamp_, + NewCallback(this, &PipelineImpl::OnFilterStateTransition)); } } @@ -490,10 +522,10 @@ void PipelineImpl::ErrorChangedTask(PipelineError error) { } // Notify the client that starting did not complete, if necessary. - if (IsPipelineInitializing() && start_callback_.get()) { - start_callback_->Run(); + if (IsPipelineInitializing() && seek_callback_.get()) { + seek_callback_->Run(); } - start_callback_.reset(); + seek_callback_.reset(); filter_factory_ = NULL; // We no longer need to examine our previous state, set it to stopped. @@ -525,30 +557,82 @@ void PipelineImpl::VolumeChangedTask(float volume) { void PipelineImpl::SeekTask(base::TimeDelta time, PipelineCallback* seek_callback) { DCHECK_EQ(MessageLoop::current(), message_loop_); - seek_callback_.reset(seek_callback); - // Supress seeking if we haven't fully started. + // Suppress seeking if we're not fully started. if (state_ != kStarted) { + // TODO(scherkus): should we run the callback? I'm tempted to say the API + // will only execute the first Seek() request. + LOG(INFO) << "Media pipeline is not in started state, ignoring seek to " + << time.InMicroseconds(); + delete seek_callback; return; } - for (FilterVector::iterator iter = filters_.begin(); - iter != filters_.end(); - ++iter) { - (*iter)->Seek(time, NewCallback(this, &PipelineImpl::OnFilterSeek)); - } - - // TODO(hclam): we should set the time when the above seek operations were all - // successful and first frame/packet at the desired time is decoded. I'm - // setting the time here because once we do the callback the user can ask for - // current time immediately, which is the old time. In order to get rid this - // little glitch, we either assume the seek was successful and time is updated - // immediately here or we set time and do callback when we have new - // frames/packets. - SetTime(time); - if (seek_callback_.get()) { - seek_callback_->Run(); - seek_callback_.reset(); + // We'll need to pause every filter before seeking. The state transition + // is as follows: + // kStarted + // kPausing (for each filter) + // kSeeking (for each filter) + // kStarting (for each filter) + // kStarted + state_ = kPausing; + seek_timestamp_ = time; + seek_callback_.reset(seek_callback); + remaining_transitions_ = filters_.size(); + + // Kick off seeking! + filters_.front()->Pause( + NewCallback(this, &PipelineImpl::OnFilterStateTransition)); +} + +void PipelineImpl::FilterStateTransitionTask() { + DCHECK_EQ(MessageLoop::current(), message_loop_); + + if (!StateTransitionsToStarted(state_)) { + NOTREACHED() << "Invalid current state: " << state_; + SetError(PIPELINE_ERROR_ABORT); + return; + } + + // Decrement the number of remaining transitions, making sure to transition + // to the next state if needed. + CHECK(remaining_transitions_ <= filters_.size()); + CHECK(remaining_transitions_ > 0u); + if (--remaining_transitions_ == 0) { + state_ = FindNextState(state_); + if (StateTransitionsToStarted(state_)) { + remaining_transitions_ = filters_.size(); + } + } + + // Carry out the action for the current state. + if (StateTransitionsToStarted(state_)) { + MediaFilter* filter = filters_[filters_.size() - remaining_transitions_]; + if (state_ == kPausing) { + filter->Pause(NewCallback(this, &PipelineImpl::OnFilterStateTransition)); + } else if (state_ == kSeeking) { + filter->Seek(seek_timestamp_, + NewCallback(this, &PipelineImpl::OnFilterStateTransition)); + } else if (state_ == kStarting) { + filter->Play(NewCallback(this, &PipelineImpl::OnFilterStateTransition)); + } else { + NOTREACHED(); + } + } else if (state_ == kStarted) { + // We've completed the seek, update the time. + SetTime(seek_timestamp_); + + // Execute the seek callback, if present. Note that this might be the + // initial callback passed into Start(). + if (seek_callback_.get()) { + seek_callback_->Run(); + seek_callback_.reset(); + } + + // Finally, reset our seeking timestamp back to zero. + seek_timestamp_ = base::TimeDelta(); + } else { + NOTREACHED(); } } diff --git a/media/base/pipeline_impl.h b/media/base/pipeline_impl.h index 1050ef1..4eed436 100644 --- a/media/base/pipeline_impl.h +++ b/media/base/pipeline_impl.h @@ -23,22 +23,40 @@ namespace media { // PipelineImpl runs the media pipeline. Filters are created and called on the // message loop injected into this object. PipelineImpl works like a state -// machine to perform asynchronous initialization. Initialization is done in -// multiple passes by InitializeTask(). In each pass a different filter is -// created and chained with a previously created filter. +// machine to perform asynchronous initialization, pausing, seeking and playing. // // Here's a state diagram that describes the lifetime of this object. // -// [ *Created ] -> [ InitDataSource ] -> [ InitDemuxer ] -> -// [ InitAudioDecoder ] -> [ InitAudioRenderer ] -> -// [ InitVideoDecoder ] -> [ InitVideoRenderer ] -> [ Started ] -// | | | -// .-> [ Error ] .-> [ Stopped ] <-. +// [ *Created ] +// | Start() +// V +// [ InitXXX (for each filter) ] +// | +// V +// [ Seeking (for each filter) ] <----------------------. +// | | +// V | +// [ Starting (for each filter) ] | +// | | +// V Seek() | +// [ Started ] --------> [ Pausing (for each filter) ] -' // -// Initialization is a series of state transitions from "Created" to -// "Started". If any error happens during initialization, this object will -// transition to the "Error" state from any state. If Stop() is called during -// initialization, this object will transition to "Stopped" state. +// +// SetError() +// [ Any State ] -------------> [ Error ] +// | Stop() +// '--------------------> [ Stopped ] +// +// Initialization is a series of state transitions from "Created" through each +// filter initialization state. When all filter initialization states have +// completed, we are implicitly in a "Paused" state. At that point we simulate +// a Seek() to the beginning of the media to give filters a chance to preroll. +// From then on the normal Seek() transitions are carried out and we start +// playing the media. +// +// If any error ever happens, this object will transition to the "Error" state +// from any state. If Stop() is ever called, this object will transition to +// "Stopped" state. class PipelineImpl : public Pipeline, public FilterHost { public: explicit PipelineImpl(MessageLoop* message_loop); @@ -65,6 +83,23 @@ class PipelineImpl : public Pipeline, public FilterHost { virtual PipelineError GetError() const; private: + // Pipeline states, as described above. + enum State { + kCreated, + kInitDataSource, + kInitDemuxer, + kInitAudioDecoder, + kInitAudioRenderer, + kInitVideoDecoder, + kInitVideoRenderer, + kPausing, + kSeeking, + kStarting, + kStarted, + kStopped, + kError, + }; + virtual ~PipelineImpl(); // Reset the state of the pipeline object to the initial state. This method @@ -77,6 +112,13 @@ class PipelineImpl : public Pipeline, public FilterHost { // Helper method to tell whether we are in the state of initializing. bool IsPipelineInitializing(); + // Returns true if the given state is one that transitions to the started + // state. + static bool StateTransitionsToStarted(State state); + + // Given the current state, returns the next state. + static State FindNextState(State current); + // FilterHost implementation. virtual void SetError(PipelineError error); virtual base::TimeDelta GetTime() const; @@ -94,9 +136,11 @@ class PipelineImpl : public Pipeline, public FilterHost { // Method called during initialization to determine if we rendered anything. bool HasRenderedMimeTypes() const; - // Callback executed by filters upon completing initialization and seeking. + // Callback executed by filters upon completing initialization. void OnFilterInitialize(); - void OnFilterSeek(); + + // Callback executed by filters upon completing Play(), Pause() or Seek(). + void OnFilterStateTransition(); // The following "task" methods correspond to the public methods, but these // methods are run as the result of posting a task to the PipelineInternal's @@ -128,6 +172,9 @@ class PipelineImpl : public Pipeline, public FilterHost { // Carries out notifying filters that we are seeking to a new timestamp. void SeekTask(base::TimeDelta time, PipelineCallback* seek_callback); + // Carries out advancing to the next filter during Play()/Pause()/Seek(). + void FilterStateTransitionTask(); + // Internal methods used in the implementation of the pipeline thread. All // of these methods are only called on the pipeline thread. @@ -246,20 +293,18 @@ class PipelineImpl : public Pipeline, public FilterHost { // |message_loop_|. // Member that tracks the current state. - enum State { - kCreated, - kInitDataSource, - kInitDemuxer, - kInitAudioDecoder, - kInitAudioRenderer, - kInitVideoDecoder, - kInitVideoRenderer, - kStarted, - kStopped, - kError, - }; State state_; + // For kPausing, kSeeking and kStarting, we need to track how many filters + // have completed transitioning to the destination state. When + // |remaining_transitions_| reaches 0 the pipeline can transition out + // of the current state. + size_t remaining_transitions_; + + // For kSeeking we need to remember where we're seeking between filter + // replies. + base::TimeDelta seek_timestamp_; + // Filter factory as passed in by Start(). scoped_refptr<FilterFactory> filter_factory_; @@ -267,7 +312,6 @@ class PipelineImpl : public Pipeline, public FilterHost { std::string url_; // Callbacks for various pipeline operations. - scoped_ptr<PipelineCallback> start_callback_; scoped_ptr<PipelineCallback> seek_callback_; scoped_ptr<PipelineCallback> stop_callback_; diff --git a/media/base/pipeline_impl_unittest.cc b/media/base/pipeline_impl_unittest.cc index b2ebe54..e745761 100644 --- a/media/base/pipeline_impl_unittest.cc +++ b/media/base/pipeline_impl_unittest.cc @@ -68,6 +68,8 @@ class PipelineImplTest : public ::testing::Test { EXPECT_CALL(*mocks_->data_source(), Initialize("", NotNull())) .WillOnce(Invoke(&RunFilterCallback)); EXPECT_CALL(*mocks_->data_source(), SetPlaybackRate(0.0f)); + EXPECT_CALL(*mocks_->data_source(), Seek(base::TimeDelta(), NotNull())) + .WillOnce(Invoke(&RunFilterCallback)); EXPECT_CALL(*mocks_->data_source(), Stop()); } @@ -80,6 +82,8 @@ class PipelineImplTest : public ::testing::Test { EXPECT_CALL(*mocks_->demuxer(), GetNumberOfStreams()) .WillRepeatedly(Return(streams->size())); EXPECT_CALL(*mocks_->demuxer(), SetPlaybackRate(0.0f)); + EXPECT_CALL(*mocks_->demuxer(), Seek(base::TimeDelta(), NotNull())) + .WillOnce(Invoke(&RunFilterCallback)); EXPECT_CALL(*mocks_->demuxer(), Stop()); // Configure the demuxer to return the streams. @@ -95,6 +99,8 @@ class PipelineImplTest : public ::testing::Test { EXPECT_CALL(*mocks_->video_decoder(), Initialize(stream, NotNull())) .WillOnce(Invoke(&RunFilterCallback)); EXPECT_CALL(*mocks_->video_decoder(), SetPlaybackRate(0.0f)); + EXPECT_CALL(*mocks_->video_decoder(), Seek(base::TimeDelta(), NotNull())) + .WillOnce(Invoke(&RunFilterCallback)); EXPECT_CALL(*mocks_->video_decoder(), Stop()); } @@ -103,6 +109,8 @@ class PipelineImplTest : public ::testing::Test { EXPECT_CALL(*mocks_->audio_decoder(), Initialize(stream, NotNull())) .WillOnce(Invoke(&RunFilterCallback)); EXPECT_CALL(*mocks_->audio_decoder(), SetPlaybackRate(0.0f)); + EXPECT_CALL(*mocks_->audio_decoder(), Seek(base::TimeDelta(), NotNull())) + .WillOnce(Invoke(&RunFilterCallback)); EXPECT_CALL(*mocks_->audio_decoder(), Stop()); } @@ -112,6 +120,8 @@ class PipelineImplTest : public ::testing::Test { Initialize(mocks_->video_decoder(), NotNull())) .WillOnce(Invoke(&RunFilterCallback)); EXPECT_CALL(*mocks_->video_renderer(), SetPlaybackRate(0.0f)); + EXPECT_CALL(*mocks_->video_renderer(), Seek(base::TimeDelta(), NotNull())) + .WillOnce(Invoke(&RunFilterCallback)); EXPECT_CALL(*mocks_->video_renderer(), Stop()); } @@ -122,6 +132,8 @@ class PipelineImplTest : public ::testing::Test { .WillOnce(Invoke(&RunFilterCallback)); EXPECT_CALL(*mocks_->audio_renderer(), SetPlaybackRate(0.0f)); EXPECT_CALL(*mocks_->audio_renderer(), SetVolume(1.0f)); + EXPECT_CALL(*mocks_->audio_renderer(), Seek(base::TimeDelta(), NotNull())) + .WillOnce(Invoke(&RunFilterCallback)); EXPECT_CALL(*mocks_->audio_renderer(), Stop()); } @@ -356,7 +368,11 @@ TEST_F(PipelineImplTest, Seek) { pipeline_->Seek(expected, NewCallback(reinterpret_cast<CallbackHelper*>(&callbacks_), &CallbackHelper::OnSeek)); + + // We expect the time to be updated only after the seek has completed. + EXPECT_TRUE(expected != pipeline_->GetCurrentTime()); message_loop_.RunAllPending(); + EXPECT_TRUE(expected == pipeline_->GetCurrentTime()); } TEST_F(PipelineImplTest, SetVolume) { |