summaryrefslogtreecommitdiffstats
path: root/media/base/pipeline_impl.cc
diff options
context:
space:
mode:
Diffstat (limited to 'media/base/pipeline_impl.cc')
-rw-r--r--media/base/pipeline_impl.cc159
1 files changed, 101 insertions, 58 deletions
diff --git a/media/base/pipeline_impl.cc b/media/base/pipeline_impl.cc
index 3a68374..3b27943 100644
--- a/media/base/pipeline_impl.cc
+++ b/media/base/pipeline_impl.cc
@@ -22,7 +22,6 @@ class PipelineImpl::PipelineInitState {
scoped_refptr<Demuxer> demuxer_;
scoped_refptr<AudioDecoder> audio_decoder_;
scoped_refptr<VideoDecoder> video_decoder_;
- scoped_refptr<CompositeFilter> composite_;
};
PipelineImpl::PipelineImpl(MessageLoop* message_loop)
@@ -30,6 +29,7 @@ PipelineImpl::PipelineImpl(MessageLoop* message_loop)
clock_(new ClockImpl(&base::Time::Now)),
waiting_for_clock_update_(false),
state_(kCreated),
+ remaining_transitions_(0),
current_bytes_(0) {
ResetState();
}
@@ -317,10 +317,6 @@ void PipelineImpl::ResetState() {
rendered_mime_types_.clear();
}
-void PipelineImpl::set_state(State next_state) {
- state_ = next_state;
-}
-
bool PipelineImpl::IsPipelineOk() {
return PIPELINE_OK == GetError();
}
@@ -576,25 +572,22 @@ void PipelineImpl::InitializeTask() {
// Just created, create data source.
if (state_ == kCreated) {
- set_state(kInitDataSource);
+ state_ = kInitDataSource;
pipeline_init_state_.reset(new PipelineInitState());
- pipeline_init_state_->composite_ = new CompositeFilter(message_loop_);
- pipeline_init_state_->composite_->set_host(this);
-
InitializeDataSource();
return;
}
// Data source created, create demuxer.
if (state_ == kInitDataSource) {
- set_state(kInitDemuxer);
+ state_ = kInitDemuxer;
InitializeDemuxer(pipeline_init_state_->data_source_);
return;
}
// Demuxer created, create audio decoder.
if (state_ == kInitDemuxer) {
- set_state(kInitAudioDecoder);
+ state_ = kInitAudioDecoder;
// If this method returns false, then there's no audio stream.
if (InitializeAudioDecoder(pipeline_init_state_->demuxer_))
return;
@@ -602,7 +595,7 @@ void PipelineImpl::InitializeTask() {
// Assuming audio decoder was created, create audio renderer.
if (state_ == kInitAudioDecoder) {
- set_state(kInitAudioRenderer);
+ state_ = kInitAudioRenderer;
// Returns false if there's no audio stream.
if (InitializeAudioRenderer(pipeline_init_state_->audio_decoder_)) {
InsertRenderedMimeType(mime_type::kMajorTypeAudio);
@@ -613,14 +606,14 @@ void PipelineImpl::InitializeTask() {
// Assuming audio renderer was created, create video decoder.
if (state_ == kInitAudioRenderer) {
// Then perform the stage of initialization, i.e. initialize video decoder.
- set_state(kInitVideoDecoder);
+ state_ = kInitVideoDecoder;
if (InitializeVideoDecoder(pipeline_init_state_->demuxer_))
return;
}
// Assuming video decoder was created, create video renderer.
if (state_ == kInitVideoDecoder) {
- set_state(kInitVideoRenderer);
+ state_ = kInitVideoRenderer;
if (InitializeVideoRenderer(pipeline_init_state_->video_decoder_)) {
InsertRenderedMimeType(mime_type::kMajorTypeVideo);
return;
@@ -636,8 +629,6 @@ void PipelineImpl::InitializeTask() {
// Clear the collection of filters.
filter_collection_->Clear();
- pipeline_filter_ = pipeline_init_state_->composite_;
-
// Clear init state since we're done initializing.
pipeline_init_state_.reset();
@@ -646,11 +637,12 @@ void PipelineImpl::InitializeTask() {
PlaybackRateChangedTask(GetPlaybackRate());
VolumeChangedTask(GetVolume());
- // Fire the seek request to get the filters to preroll.
+ // Fire the initial seek request to get the filters to preroll.
seek_pending_ = true;
- set_state(kSeeking);
+ state_ = kSeeking;
+ remaining_transitions_ = filters_.size();
seek_timestamp_ = base::TimeDelta();
- pipeline_filter_->Seek(seek_timestamp_,
+ filters_.front()->Seek(seek_timestamp_,
NewCallback(this, &PipelineImpl::OnFilterStateTransition));
}
}
@@ -713,7 +705,11 @@ void PipelineImpl::PlaybackRateChangedTask(float playback_rate) {
AutoLock auto_lock(lock_);
clock_->SetPlaybackRate(playback_rate);
}
- pipeline_filter_->SetPlaybackRate(playback_rate);
+ for (FilterVector::iterator iter = filters_.begin();
+ iter != filters_.end();
+ ++iter) {
+ (*iter)->SetPlaybackRate(playback_rate);
+ }
}
void PipelineImpl::VolumeChangedTask(float volume) {
@@ -749,9 +745,10 @@ void PipelineImpl::SeekTask(base::TimeDelta time,
// kSeeking (for each filter)
// kStarting (for each filter)
// kStarted
- set_state(kPausing);
+ state_ = kPausing;
seek_timestamp_ = time;
seek_callback_.reset(seek_callback);
+ remaining_transitions_ = filters_.size();
// Kick off seeking!
{
@@ -760,7 +757,7 @@ void PipelineImpl::SeekTask(base::TimeDelta time,
if (!waiting_for_clock_update_)
clock_->Pause();
}
- pipeline_filter_->Pause(
+ filters_.front()->Pause(
NewCallback(this, &PipelineImpl::OnFilterStateTransition));
}
@@ -793,7 +790,7 @@ void PipelineImpl::NotifyEndedTask() {
}
// Transition to ended, executing the callback if present.
- set_state(kEnded);
+ state_ = kEnded;
if (ended_callback_.get()) {
ended_callback_->Run();
}
@@ -817,7 +814,11 @@ void PipelineImpl::DisableAudioRendererTask() {
audio_disabled_ = true;
// Notify all filters of disabled audio renderer.
- pipeline_filter_->OnAudioRendererDisabled();
+ for (FilterVector::iterator iter = filters_.begin();
+ iter != filters_.end();
+ ++iter) {
+ (*iter)->OnAudioRendererDisabled();
+ }
}
void PipelineImpl::FilterStateTransitionTask() {
@@ -836,31 +837,42 @@ void PipelineImpl::FilterStateTransitionTask() {
// Decrement the number of remaining transitions, making sure to transition
// to the next state if needed.
- set_state(FindNextState(state_));
- if (state_ == kSeeking) {
- AutoLock auto_lock(lock_);
- clock_->SetTime(seek_timestamp_);
+ DCHECK(remaining_transitions_ <= filters_.size());
+ DCHECK(remaining_transitions_ > 0u);
+ if (--remaining_transitions_ == 0) {
+ state_ = FindNextState(state_);
+ if (state_ == kSeeking) {
+ AutoLock auto_lock(lock_);
+ clock_->SetTime(seek_timestamp_);
+ }
+
+ if (TransientState(state_)) {
+ remaining_transitions_ = filters_.size();
+ }
}
// Carry out the action for the current state.
if (TransientState(state_)) {
+ Filter* filter = filters_[filters_.size() - remaining_transitions_];
if (state_ == kPausing) {
- pipeline_filter_->Pause(
- NewCallback(this, &PipelineImpl::OnFilterStateTransition));
+ filter->Pause(NewCallback(this, &PipelineImpl::OnFilterStateTransition));
} else if (state_ == kFlushing) {
- pipeline_filter_->Flush(
- NewCallback(this, &PipelineImpl::OnFilterStateTransition));
+ // We had to use parallel flushing all filters.
+ if (remaining_transitions_ == filters_.size()) {
+ for (size_t i = 0; i < filters_.size(); i++) {
+ filters_[i]->Flush(
+ NewCallback(this, &PipelineImpl::OnFilterStateTransition));
+ }
+ }
} else if (state_ == kSeeking) {
- pipeline_filter_->Seek(seek_timestamp_,
+ filter->Seek(seek_timestamp_,
NewCallback(this, &PipelineImpl::OnFilterStateTransition));
} else if (state_ == kStarting) {
- pipeline_filter_->Play(
- NewCallback(this,&PipelineImpl::OnFilterStateTransition));
+ filter->Play(NewCallback(this, &PipelineImpl::OnFilterStateTransition));
} else if (state_ == kStopping) {
- pipeline_filter_->Stop(
- NewCallback(this, &PipelineImpl::OnFilterStateTransition));
+ filter->Stop(NewCallback(this, &PipelineImpl::OnFilterStateTransition));
} else {
- NOTREACHED() << "Unexpected state: " << state_;
+ NOTREACHED();
}
} else if (state_ == kStarted) {
FinishInitialization();
@@ -885,7 +897,7 @@ void PipelineImpl::FilterStateTransitionTask() {
} else if (IsPipelineStopped()) {
FinishDestroyingFiltersTask();
} else {
- NOTREACHED() << "Unexpected state: " << state_;
+ NOTREACHED();
}
}
@@ -893,11 +905,24 @@ void PipelineImpl::FinishDestroyingFiltersTask() {
DCHECK_EQ(MessageLoop::current(), message_loop_);
DCHECK(IsPipelineStopped());
+ // Stop every running filter thread.
+ //
+ // TODO(scherkus): can we watchdog this section to detect wedged threads?
+ for (FilterThreadVector::iterator iter = filter_threads_.begin();
+ iter != filter_threads_.end();
+ ++iter) {
+ (*iter)->Stop();
+ }
+
// Clear renderer references.
audio_renderer_ = NULL;
video_renderer_ = NULL;
- pipeline_filter_ = NULL;
+ // Reset the pipeline, which will decrement a reference to this object.
+ // We will get destroyed as soon as the remaining tasks finish executing.
+ // To be safe, we'll set our pipeline reference to NULL.
+ filters_.clear();
+ STLDeleteElements(&filter_threads_);
stop_pending_ = false;
tearing_down_ = false;
@@ -913,7 +938,7 @@ void PipelineImpl::FinishDestroyingFiltersTask() {
}
} else {
// Destroying filters due to SetError().
- set_state(kError);
+ state_ = kError;
// If our owner has requested to be notified of an error.
if (error_callback_.get()) {
error_callback_->Run();
@@ -922,12 +947,28 @@ void PipelineImpl::FinishDestroyingFiltersTask() {
}
bool PipelineImpl::PrepareFilter(scoped_refptr<Filter> filter) {
- bool ret = pipeline_init_state_->composite_->AddFilter(filter.get());
+ DCHECK_EQ(MessageLoop::current(), message_loop_);
+ DCHECK(IsPipelineOk());
+
+ // Create a dedicated thread for this filter if applicable.
+ if (filter->requires_message_loop()) {
+ scoped_ptr<base::Thread> thread(
+ new base::Thread(filter->message_loop_name()));
+ if (!thread.get() || !thread->Start()) {
+ NOTREACHED() << "Could not start filter thread";
+ SetError(PIPELINE_ERROR_INITIALIZATION_FAILED);
+ return false;
+ }
- if (!ret) {
- SetError(PIPELINE_ERROR_INITIALIZATION_FAILED);
+ filter->set_message_loop(thread->message_loop());
+ filter_threads_.push_back(thread.release());
}
- return ret;
+
+ // Register ourselves as the filter's host.
+ DCHECK(IsPipelineOk());
+ filter->set_host(this);
+ filters_.push_back(make_scoped_refptr(filter.get()));
+ return true;
}
void PipelineImpl::InitializeDataSource() {
@@ -1104,21 +1145,23 @@ void PipelineImpl::TearDownPipeline() {
tearing_down_ = true;
if (IsPipelineInitializing()) {
- // Make it look like initialization was successful.
- pipeline_filter_ = pipeline_init_state_->composite_;
- pipeline_init_state_.reset();
-
- set_state(kStopping);
- pipeline_filter_->Stop(NewCallback(
- this, &PipelineImpl::OnFilterStateTransition));
-
+ // Notify the client that starting did not complete, if necessary.
FinishInitialization();
- } else if (pipeline_filter_.get()) {
- set_state(kPausing);
- pipeline_filter_->Pause(NewCallback(
- this, &PipelineImpl::OnFilterStateTransition));
+ }
+
+ remaining_transitions_ = filters_.size();
+ if (remaining_transitions_ > 0) {
+ if (IsPipelineInitializing()) {
+ state_ = kStopping;
+ filters_.front()->Stop(NewCallback(
+ this, &PipelineImpl::OnFilterStateTransition));
+ } else {
+ state_ = kPausing;
+ filters_.front()->Pause(NewCallback(
+ this, &PipelineImpl::OnFilterStateTransition));
+ }
} else {
- set_state(kStopped);
+ state_ = kStopped;
message_loop_->PostTask(FROM_HERE,
NewRunnableMethod(this, &PipelineImpl::FinishDestroyingFiltersTask));
}