diff options
Diffstat (limited to 'media/base/pipeline_impl.cc')
-rw-r--r-- | media/base/pipeline_impl.cc | 159 |
1 files changed, 101 insertions, 58 deletions
diff --git a/media/base/pipeline_impl.cc b/media/base/pipeline_impl.cc index 3a68374..3b27943 100644 --- a/media/base/pipeline_impl.cc +++ b/media/base/pipeline_impl.cc @@ -22,7 +22,6 @@ class PipelineImpl::PipelineInitState { scoped_refptr<Demuxer> demuxer_; scoped_refptr<AudioDecoder> audio_decoder_; scoped_refptr<VideoDecoder> video_decoder_; - scoped_refptr<CompositeFilter> composite_; }; PipelineImpl::PipelineImpl(MessageLoop* message_loop) @@ -30,6 +29,7 @@ PipelineImpl::PipelineImpl(MessageLoop* message_loop) clock_(new ClockImpl(&base::Time::Now)), waiting_for_clock_update_(false), state_(kCreated), + remaining_transitions_(0), current_bytes_(0) { ResetState(); } @@ -317,10 +317,6 @@ void PipelineImpl::ResetState() { rendered_mime_types_.clear(); } -void PipelineImpl::set_state(State next_state) { - state_ = next_state; -} - bool PipelineImpl::IsPipelineOk() { return PIPELINE_OK == GetError(); } @@ -576,25 +572,22 @@ void PipelineImpl::InitializeTask() { // Just created, create data source. if (state_ == kCreated) { - set_state(kInitDataSource); + state_ = kInitDataSource; pipeline_init_state_.reset(new PipelineInitState()); - pipeline_init_state_->composite_ = new CompositeFilter(message_loop_); - pipeline_init_state_->composite_->set_host(this); - InitializeDataSource(); return; } // Data source created, create demuxer. if (state_ == kInitDataSource) { - set_state(kInitDemuxer); + state_ = kInitDemuxer; InitializeDemuxer(pipeline_init_state_->data_source_); return; } // Demuxer created, create audio decoder. if (state_ == kInitDemuxer) { - set_state(kInitAudioDecoder); + state_ = kInitAudioDecoder; // If this method returns false, then there's no audio stream. if (InitializeAudioDecoder(pipeline_init_state_->demuxer_)) return; @@ -602,7 +595,7 @@ void PipelineImpl::InitializeTask() { // Assuming audio decoder was created, create audio renderer. if (state_ == kInitAudioDecoder) { - set_state(kInitAudioRenderer); + state_ = kInitAudioRenderer; // Returns false if there's no audio stream. if (InitializeAudioRenderer(pipeline_init_state_->audio_decoder_)) { InsertRenderedMimeType(mime_type::kMajorTypeAudio); @@ -613,14 +606,14 @@ void PipelineImpl::InitializeTask() { // Assuming audio renderer was created, create video decoder. if (state_ == kInitAudioRenderer) { // Then perform the stage of initialization, i.e. initialize video decoder. - set_state(kInitVideoDecoder); + state_ = kInitVideoDecoder; if (InitializeVideoDecoder(pipeline_init_state_->demuxer_)) return; } // Assuming video decoder was created, create video renderer. if (state_ == kInitVideoDecoder) { - set_state(kInitVideoRenderer); + state_ = kInitVideoRenderer; if (InitializeVideoRenderer(pipeline_init_state_->video_decoder_)) { InsertRenderedMimeType(mime_type::kMajorTypeVideo); return; @@ -636,8 +629,6 @@ void PipelineImpl::InitializeTask() { // Clear the collection of filters. filter_collection_->Clear(); - pipeline_filter_ = pipeline_init_state_->composite_; - // Clear init state since we're done initializing. pipeline_init_state_.reset(); @@ -646,11 +637,12 @@ void PipelineImpl::InitializeTask() { PlaybackRateChangedTask(GetPlaybackRate()); VolumeChangedTask(GetVolume()); - // Fire the seek request to get the filters to preroll. + // Fire the initial seek request to get the filters to preroll. seek_pending_ = true; - set_state(kSeeking); + state_ = kSeeking; + remaining_transitions_ = filters_.size(); seek_timestamp_ = base::TimeDelta(); - pipeline_filter_->Seek(seek_timestamp_, + filters_.front()->Seek(seek_timestamp_, NewCallback(this, &PipelineImpl::OnFilterStateTransition)); } } @@ -713,7 +705,11 @@ void PipelineImpl::PlaybackRateChangedTask(float playback_rate) { AutoLock auto_lock(lock_); clock_->SetPlaybackRate(playback_rate); } - pipeline_filter_->SetPlaybackRate(playback_rate); + for (FilterVector::iterator iter = filters_.begin(); + iter != filters_.end(); + ++iter) { + (*iter)->SetPlaybackRate(playback_rate); + } } void PipelineImpl::VolumeChangedTask(float volume) { @@ -749,9 +745,10 @@ void PipelineImpl::SeekTask(base::TimeDelta time, // kSeeking (for each filter) // kStarting (for each filter) // kStarted - set_state(kPausing); + state_ = kPausing; seek_timestamp_ = time; seek_callback_.reset(seek_callback); + remaining_transitions_ = filters_.size(); // Kick off seeking! { @@ -760,7 +757,7 @@ void PipelineImpl::SeekTask(base::TimeDelta time, if (!waiting_for_clock_update_) clock_->Pause(); } - pipeline_filter_->Pause( + filters_.front()->Pause( NewCallback(this, &PipelineImpl::OnFilterStateTransition)); } @@ -793,7 +790,7 @@ void PipelineImpl::NotifyEndedTask() { } // Transition to ended, executing the callback if present. - set_state(kEnded); + state_ = kEnded; if (ended_callback_.get()) { ended_callback_->Run(); } @@ -817,7 +814,11 @@ void PipelineImpl::DisableAudioRendererTask() { audio_disabled_ = true; // Notify all filters of disabled audio renderer. - pipeline_filter_->OnAudioRendererDisabled(); + for (FilterVector::iterator iter = filters_.begin(); + iter != filters_.end(); + ++iter) { + (*iter)->OnAudioRendererDisabled(); + } } void PipelineImpl::FilterStateTransitionTask() { @@ -836,31 +837,42 @@ void PipelineImpl::FilterStateTransitionTask() { // Decrement the number of remaining transitions, making sure to transition // to the next state if needed. - set_state(FindNextState(state_)); - if (state_ == kSeeking) { - AutoLock auto_lock(lock_); - clock_->SetTime(seek_timestamp_); + DCHECK(remaining_transitions_ <= filters_.size()); + DCHECK(remaining_transitions_ > 0u); + if (--remaining_transitions_ == 0) { + state_ = FindNextState(state_); + if (state_ == kSeeking) { + AutoLock auto_lock(lock_); + clock_->SetTime(seek_timestamp_); + } + + if (TransientState(state_)) { + remaining_transitions_ = filters_.size(); + } } // Carry out the action for the current state. if (TransientState(state_)) { + Filter* filter = filters_[filters_.size() - remaining_transitions_]; if (state_ == kPausing) { - pipeline_filter_->Pause( - NewCallback(this, &PipelineImpl::OnFilterStateTransition)); + filter->Pause(NewCallback(this, &PipelineImpl::OnFilterStateTransition)); } else if (state_ == kFlushing) { - pipeline_filter_->Flush( - NewCallback(this, &PipelineImpl::OnFilterStateTransition)); + // We had to use parallel flushing all filters. + if (remaining_transitions_ == filters_.size()) { + for (size_t i = 0; i < filters_.size(); i++) { + filters_[i]->Flush( + NewCallback(this, &PipelineImpl::OnFilterStateTransition)); + } + } } else if (state_ == kSeeking) { - pipeline_filter_->Seek(seek_timestamp_, + filter->Seek(seek_timestamp_, NewCallback(this, &PipelineImpl::OnFilterStateTransition)); } else if (state_ == kStarting) { - pipeline_filter_->Play( - NewCallback(this,&PipelineImpl::OnFilterStateTransition)); + filter->Play(NewCallback(this, &PipelineImpl::OnFilterStateTransition)); } else if (state_ == kStopping) { - pipeline_filter_->Stop( - NewCallback(this, &PipelineImpl::OnFilterStateTransition)); + filter->Stop(NewCallback(this, &PipelineImpl::OnFilterStateTransition)); } else { - NOTREACHED() << "Unexpected state: " << state_; + NOTREACHED(); } } else if (state_ == kStarted) { FinishInitialization(); @@ -885,7 +897,7 @@ void PipelineImpl::FilterStateTransitionTask() { } else if (IsPipelineStopped()) { FinishDestroyingFiltersTask(); } else { - NOTREACHED() << "Unexpected state: " << state_; + NOTREACHED(); } } @@ -893,11 +905,24 @@ void PipelineImpl::FinishDestroyingFiltersTask() { DCHECK_EQ(MessageLoop::current(), message_loop_); DCHECK(IsPipelineStopped()); + // Stop every running filter thread. + // + // TODO(scherkus): can we watchdog this section to detect wedged threads? + for (FilterThreadVector::iterator iter = filter_threads_.begin(); + iter != filter_threads_.end(); + ++iter) { + (*iter)->Stop(); + } + // Clear renderer references. audio_renderer_ = NULL; video_renderer_ = NULL; - pipeline_filter_ = NULL; + // Reset the pipeline, which will decrement a reference to this object. + // We will get destroyed as soon as the remaining tasks finish executing. + // To be safe, we'll set our pipeline reference to NULL. + filters_.clear(); + STLDeleteElements(&filter_threads_); stop_pending_ = false; tearing_down_ = false; @@ -913,7 +938,7 @@ void PipelineImpl::FinishDestroyingFiltersTask() { } } else { // Destroying filters due to SetError(). - set_state(kError); + state_ = kError; // If our owner has requested to be notified of an error. if (error_callback_.get()) { error_callback_->Run(); @@ -922,12 +947,28 @@ void PipelineImpl::FinishDestroyingFiltersTask() { } bool PipelineImpl::PrepareFilter(scoped_refptr<Filter> filter) { - bool ret = pipeline_init_state_->composite_->AddFilter(filter.get()); + DCHECK_EQ(MessageLoop::current(), message_loop_); + DCHECK(IsPipelineOk()); + + // Create a dedicated thread for this filter if applicable. + if (filter->requires_message_loop()) { + scoped_ptr<base::Thread> thread( + new base::Thread(filter->message_loop_name())); + if (!thread.get() || !thread->Start()) { + NOTREACHED() << "Could not start filter thread"; + SetError(PIPELINE_ERROR_INITIALIZATION_FAILED); + return false; + } - if (!ret) { - SetError(PIPELINE_ERROR_INITIALIZATION_FAILED); + filter->set_message_loop(thread->message_loop()); + filter_threads_.push_back(thread.release()); } - return ret; + + // Register ourselves as the filter's host. + DCHECK(IsPipelineOk()); + filter->set_host(this); + filters_.push_back(make_scoped_refptr(filter.get())); + return true; } void PipelineImpl::InitializeDataSource() { @@ -1104,21 +1145,23 @@ void PipelineImpl::TearDownPipeline() { tearing_down_ = true; if (IsPipelineInitializing()) { - // Make it look like initialization was successful. - pipeline_filter_ = pipeline_init_state_->composite_; - pipeline_init_state_.reset(); - - set_state(kStopping); - pipeline_filter_->Stop(NewCallback( - this, &PipelineImpl::OnFilterStateTransition)); - + // Notify the client that starting did not complete, if necessary. FinishInitialization(); - } else if (pipeline_filter_.get()) { - set_state(kPausing); - pipeline_filter_->Pause(NewCallback( - this, &PipelineImpl::OnFilterStateTransition)); + } + + remaining_transitions_ = filters_.size(); + if (remaining_transitions_ > 0) { + if (IsPipelineInitializing()) { + state_ = kStopping; + filters_.front()->Stop(NewCallback( + this, &PipelineImpl::OnFilterStateTransition)); + } else { + state_ = kPausing; + filters_.front()->Pause(NewCallback( + this, &PipelineImpl::OnFilterStateTransition)); + } } else { - set_state(kStopped); + state_ = kStopped; message_loop_->PostTask(FROM_HERE, NewRunnableMethod(this, &PipelineImpl::FinishDestroyingFiltersTask)); } |