diff options
author | ralphl@chromium.org <ralphl@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2009-01-28 17:21:23 +0000 |
---|---|---|
committer | ralphl@chromium.org <ralphl@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2009-01-28 17:21:23 +0000 |
commit | b026f4aeb159244e47d3b55c5faad7bb3247fd55 (patch) | |
tree | efda81548664ea009160ded8588904185b384ca4 /media/base/pipeline_impl.cc | |
parent | abc659b6d95cf197c2b96a544351a667e8b96f95 (diff) | |
download | chromium_src-b026f4aeb159244e47d3b55c5faad7bb3247fd55.zip chromium_src-b026f4aeb159244e47d3b55c5faad7bb3247fd55.tar.gz chromium_src-b026f4aeb159244e47d3b55c5faad7bb3247fd55.tar.bz2 |
Implementation of Pipeline and FilterHost interfaces. This is a large change, but all of the objects are interrelated.
I am also checking in a basic unit test that creates pipeline,
and the data source hangs during initialization. The test sleeps one second and then stops the pipeline.
Andrew has already done a first pass on this, and the code has come largely from our working experimental branch.
Review URL: http://codereview.chromium.org/18546
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@8805 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'media/base/pipeline_impl.cc')
-rw-r--r-- | media/base/pipeline_impl.cc | 517 |
1 files changed, 460 insertions, 57 deletions
diff --git a/media/base/pipeline_impl.cc b/media/base/pipeline_impl.cc index 50e8776..e49e719 100644 --- a/media/base/pipeline_impl.cc +++ b/media/base/pipeline_impl.cc @@ -2,110 +2,513 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. +#include "base/compiler_specific.h" +#include "media/base/filter_host_impl.h" +#include "media/base/media_format.h" #include "media/base/pipeline_impl.h" namespace media { PipelineImpl::PipelineImpl() { - // TODO(ralphl): implement PipelineImpl constructor. - NOTIMPLEMENTED(); + ResetState(); } PipelineImpl::~PipelineImpl() { - // TODO(ralphl): implement PipelineImpl destructor. - NOTIMPLEMENTED(); + Stop(); } bool PipelineImpl::IsInitialized() const { - // TODO(ralphl): implement IsInitialized. - NOTIMPLEMENTED(); - return false; + return initialized_; } -int64 PipelineImpl::GetDuration() const { - // TODO(ralphl): implement GetDuration. - NOTIMPLEMENTED(); - return 0; +base::TimeDelta PipelineImpl::GetDuration() const { + return duration_; } -int64 PipelineImpl::GetBufferedTime() const { - // TODO(ralphl): implement GetBufferedTime. - NOTIMPLEMENTED(); - return 0; +base::TimeDelta PipelineImpl::GetBufferedTime() const { + return buffered_time_; } int64 PipelineImpl::GetTotalBytes() const { - // TODO(ralphl): implement GetTotalBytes. - NOTIMPLEMENTED(); - return 0; + return total_bytes_; } int64 PipelineImpl::GetBufferedBytes() const { - // TODO(ralphl): implement GetBufferedBytes. - NOTIMPLEMENTED(); - return 0; + return buffered_bytes_; } void PipelineImpl::GetVideoSize(size_t* width_out, size_t* height_out) const { - // TODO(ralphl): implement GetVideoSize. - NOTIMPLEMENTED(); - width_out = 0; - height_out = 0; + DCHECK(width_out); + DCHECK(height_out); + AutoLock auto_lock(const_cast<Lock&>(video_size_access_lock_)); + *width_out = video_width_; + *height_out = video_height_; } float PipelineImpl::GetVolume() const { - // TODO(ralphl): implement GetVolume. - NOTIMPLEMENTED(); - return 0; + return volume_; } float PipelineImpl::GetPlaybackRate() const { - // TODO(ralphl): implement GetPlaybackRate. - NOTIMPLEMENTED(); - return 0; + return playback_rate_; } -int64 PipelineImpl::GetTime() const { - // TODO(ralphl): implement GetTime. - NOTIMPLEMENTED(); - return 0; +base::TimeDelta PipelineImpl::GetTime() const { + return time_; } PipelineError PipelineImpl::GetError() const { - // TODO(ralphl): implement GetError. - NOTIMPLEMENTED(); - return PIPELINE_ERROR_INITIALIZATION_FAILED; + return error_; } -bool PipelineImpl::Start(FilterFactory* filter_factory, - const std::string& uri, +// Creates the PipelineThread and calls it's start method. +bool PipelineImpl::Start(FilterFactory* factory, + const std::string& url, Callback1<bool>::Type* init_complete_callback) { - // TODO(ralphl): implement Start. - NOTIMPLEMENTED(); - return false; -} + DCHECK(!pipeline_thread_);
+ DCHECK(factory);
+ DCHECK(!initialized_);
+ if (!pipeline_thread_ && factory) {
+ pipeline_thread_ = new PipelineThread(this);
+ if (pipeline_thread_) {
+ // TODO(ralphl): Does the callback get copied by these fancy templates?
+ // if so, then do I want to always delete it here???
+ if (pipeline_thread_->Start(factory, url, init_complete_callback)) {
+ return true;
+ }
+ pipeline_thread_ = NULL; // Releases reference to destroy thread
+ }
+ }
+ delete init_complete_callback;
+ return false;
+}
+// Stop the PipelineThread and return to a state identical to that of a newly +// created PipelineImpl object. void PipelineImpl::Stop() { - // TODO(ralphl): implement Stop. - NOTIMPLEMENTED(); + if (pipeline_thread_) { + pipeline_thread_->Stop(); + } + ResetState(); } -bool PipelineImpl::SetPlaybackRate(float rate) { - // TODO(ralphl): implement SetPlaybackRate. - NOTIMPLEMENTED(); - return false; + + +void PipelineImpl::SetPlaybackRate(float rate) { + if (OkToCallThread() && rate >= 0.0f) { + pipeline_thread_->SetPlaybackRate(rate); + } else { + NOTREACHED(); + } } -bool PipelineImpl::Seek(int64 time) { - // TODO(ralphl): implement Seek. - NOTIMPLEMENTED(); - return false; +void PipelineImpl::Seek(base::TimeDelta time) { + if (OkToCallThread()) { + pipeline_thread_->Seek(time); + } else { + NOTREACHED(); + } +} + +void PipelineImpl::SetVolume(float volume) { + if (OkToCallThread() && volume >= 0.0f && volume <= 1.0f) { + pipeline_thread_->SetVolume(volume); + } else { + NOTREACHED(); + } +} + +void PipelineImpl::ResetState() { + pipeline_thread_ = NULL; + initialized_ = false; + duration_ = base::TimeDelta(); + buffered_time_ = base::TimeDelta(); + buffered_bytes_ = 0; + total_bytes_ = 0; + video_width_ = 0; + video_height_ = 0; + volume_ = 0.0f; + playback_rate_ = 0.0f; + time_ = base::TimeDelta(); + error_ = PIPELINE_OK; +} + +void PipelineImpl::SetVideoSize(size_t width, size_t height) { + AutoLock auto_lock(video_size_access_lock_); + width = width; + height = height; +} + +//----------------------------------------------------------------------------- + +PipelineThread::PipelineThread(PipelineImpl* pipeline)
+ : pipeline_(pipeline),
+ thread_("PipelineThread"),
+ time_update_callback_scheduled_(false),
+ host_initializing_(NULL) {
+}
+
+PipelineThread::~PipelineThread() {
+ Stop();
+}
+
+// This method is called on the client's thread. It starts the pipeline's
+// dedicated thread and posts a task to call the StartTask method on that
+// thread.
+bool PipelineThread::Start(FilterFactory* filter_factory,
+ const std::string& url,
+ Callback1<bool>::Type* init_complete_callback) {
+ if (thread_.Start()) {
+ filter_factory->AddRef();
+ PostTask(NewRunnableMethod(this,
+ &PipelineThread::StartTask,
+ filter_factory,
+ url,
+ // TODO(ralphl): what happens to this callback?
+ // is it copied by NewRunnableTask? Just pointer
+ // or is the callback itself copied?
+ init_complete_callback));
+ return true;
+ }
+ return false;
+}
+ +// Called on the client's thread. If the thread has been started, then posts +// a task to call the StopTask method, then waits until the thread has stopped. +// There is a critical section that wraps the entire duration of the StartTask +// method. This method waits for that Lock to be released so that we know +// that the thread is not executing a nested message loop. This way we know +// that that Thread::Stop call will quit the appropriate message loop. +void PipelineThread::Stop() {
+ if (thread_.IsRunning()) {
+ PostTask(NewRunnableMethod(this, &PipelineThread::StopTask));
+ AutoLock lock_crit(initialization_lock_);
+ thread_.Stop();
+ }
+ DCHECK(filter_hosts_.empty());
+}
+
+// Called on client's thread.
+void PipelineThread::SetPlaybackRate(float rate) {
+ PostTask(NewRunnableMethod(this, &PipelineThread::SetPlaybackRateTask, rate));
+}
+
+// Called on client's thread.
+void PipelineThread::Seek(base::TimeDelta time) {
+ PostTask(NewRunnableMethod(this, &PipelineThread::SeekTask, time));
+}
+ +// Called on client's thread.
+void PipelineThread::SetVolume(float volume) { + PostTask(NewRunnableMethod(this, &PipelineThread::SetVolumeTask, volume)); +} + +// May be called on any thread, and therefore we always assume the worst
+// possible race condition. This could, for example, be called from a filter's
+// thread just as the pipeline thread is exiting the call to the filter's
+// Initialize() method. Therefore, we make NO assumptions, and post work
+// in every case, even the trivial one of a thread calling this method from
+// within it's Initialize method. This means that we will always run a nested
+// message loop, and the InitializationCompleteTask will Quit that loop
+// immediately in the trivial case.
+void PipelineThread::InitializationComplete(FilterHostImpl* host) { + DCHECK(host == host_initializing_);
+ PostTask(NewRunnableMethod(this,
+ &PipelineThread::InitializationCompleteTask,
+ host));
+}
+ +// Called from any thread. Updates the pipeline time and schedules a task to +// call back to filters that have registered a callback for time updates. +void PipelineThread::SetTime(base::TimeDelta time) { + pipeline()->time_ = time; + if (!time_update_callback_scheduled_) { + time_update_callback_scheduled_ = true; + PostTask(NewRunnableMethod(this, &PipelineThread::SetTimeTask)); + } +} + +// Called from any thread. Sets the pipeline error_ member and schedules a +// task to stop all the filters in the pipeline. Note that the thread will +// continue to run until the client calls Pipeline::Stop, but nothing will +// be processed since filters will not be able to post tasks. +void PipelineThread::Error(PipelineError error) { + DCHECK(PIPELINE_OK != error); + if (PIPELINE_OK == pipeline()->error_) { + pipeline()->error_ = error; + PostTask(NewRunnableMethod(this, &PipelineThread::StopTask)); + } +} + +// Called from any thread. Used by FilterHostImpl::PostTask method and used +// internally. +void PipelineThread::PostTask(Task* task) { + message_loop()->PostTask(FROM_HERE, task); +} + + +// Main initialization method called on the pipeline thread. This code attempts +// to use the specified filter factory to build a pipeline. It starts by +// creating a DataSource, connects it to a Demuxer, and then connects the +// Demuxer's audio stream to an AudioDecoder which is then connected to an +// AudioRenderer. If the media has video, then it connects a VideoDecoder to +// the Demuxer's video stream, and then connects the VideoDecoder to a +// VideoRenderer. When all required filters have been created and have called +// their FilterHost's InitializationComplete method, the pipeline's +// initialized_ member is set to true, and, if the client provided an +// init_complete_callback, it is called with "true". +// If initializatoin fails, the client's callback will still be called, but +// the bool parameter passed to it will be false. +// +// Note that at each step in this process, the initialization of any filter +// may require running the pipeline thread's message loop recursively. This is +// handled by the CreateFilter method. +void PipelineThread::StartTask(FilterFactory* filter_factory, + const std::string& url, + Callback1<bool>::Type* init_complete_callback) { + bool success = true; + + // During the entire StartTask we hold the initialization_lock_ so that + // if the client calls the Pipeline::Stop method while we are running a + // nested message loop, we can correctly unwind out of it before calling + // the Thread::Stop method. + AutoLock auto_lock(initialization_lock_); + + // Add ourselves as a destruction observer of the thread's message loop so + // we can delete filters at an appropriate time (when all tasks have been + // processed and the thread is about to be destroyed). + message_loop()->AddDestructionObserver(this); + success = CreateDataSource(filter_factory, url); + if (success) { + success = CreateAndConnect<Demuxer, DataSource>(filter_factory); + } + if (success) { + success = CreateDecoder<AudioDecoder>(filter_factory); + } + if (success) { + success = CreateAndConnect<AudioRenderer, AudioDecoder>(filter_factory); + } + if (success && HasVideo()) { + success = CreateDecoder<VideoDecoder>(filter_factory); + if (success) { + success = CreateAndConnect<VideoRenderer, VideoDecoder>(filter_factory);
+ }
+ }
+ if (success) {
+ pipeline_->initialized_ = true;
+ } else if (PIPELINE_OK == pipeline_->error_) {
+ Error(PIPELINE_ERROR_INITIALIZATION_FAILED);
+ }
+
+ // No matter what, we're done with the filter factory, and
+ // client callback so get rid of them.
+ filter_factory->Release();
+ if (init_complete_callback) {
+ init_complete_callback->Run(success);
+ delete init_complete_callback;
+ }
+}
+ +// This method is called as a result of the client calling Pipeline::Stop() or +// as the result of an error condition. If there is no error, then set the +// pipeline's error_ member to PIPELINE_STOPPING. We stop the filters in the +// reverse order. +void PipelineThread::StopTask() { + if (PIPELINE_OK == pipeline_->error_) {
+ pipeline_->error_ = PIPELINE_STOPPING;
+ }
+ FilterHostVector::reverse_iterator riter = filter_hosts_.rbegin(); + while (riter != filter_hosts_.rend()) { + (*riter)->Stop(); + ++riter;
+ }
+ if (host_initializing_) {
+ host_initializing_ = NULL;
+ message_loop()->Quit();
+ }
+} + +// Task runs as a result of a filter calling InitializationComplete. If for +// some reason StopTask has been executed prior to this, the host_initializing_ +// member will be NULL, and the message loop will have been quit already, so +// we don't want to do it again. +void PipelineThread::InitializationCompleteTask(FilterHostImpl* host) { + if (host == host_initializing_) { + host_initializing_ = NULL; + message_loop()->Quit(); + } else { + DCHECK(!host_initializing_); + } +} + +void PipelineThread::SetPlaybackRateTask(float rate) { + pipeline_->playback_rate_ = rate; + FilterHostVector::iterator iter = filter_hosts_.begin(); + while (iter != filter_hosts_.end()) { + (*iter)->media_filter()->SetPlaybackRate(rate);
+ ++iter;
+ }
+} + +void PipelineThread::SeekTask(base::TimeDelta time) { + FilterHostVector::iterator iter = filter_hosts_.begin(); + while (iter != filter_hosts_.end()) { + (*iter)->media_filter()->Seek(time);
+ ++iter;
+ }
+} + +void PipelineThread::SetVolumeTask(float volume) { + pipeline_->volume_ = volume; + AudioRenderer* audio_renderer = GetFilter<AudioRenderer>();
+ if (audio_renderer) {
+ audio_renderer->SetVolume(volume);
+ }
+}
+ +void PipelineThread::SetTimeTask() {
+ time_update_callback_scheduled_ = false;
+ FilterHostVector::iterator iter = filter_hosts_.begin(); + while (iter != filter_hosts_.end()) { + (*iter)->RunTimeUpdateCallback(pipeline_->time_);
+ ++iter;
+ }
+}
+
+template <class Filter>
+Filter* PipelineThread::GetFilter() const {
+ Filter* filter = NULL;
+ FilterHostVector::const_iterator iter = filter_hosts_.begin(); + while (iter != filter_hosts_.end() && NULL == filter) { + filter = (*iter)->GetFilter<Filter>(); + ++iter; + } + return filter; +}
+
+template <class NewFilter, class Source>
+bool PipelineThread::CreateFilter(FilterFactory* filter_factory,
+ Source source,
+ const MediaFormat* source_media_format) {
+ NewFilter* new_filter;
+ bool success;
+ success = filter_factory->Create(source_media_format, &new_filter);
+ if (success) {
+ DCHECK(!host_initializing_);
+ host_initializing_ = new FilterHostImpl(this, new_filter);
+ if (!host_initializing_) {
+ success = false;
+ new_filter->AddRef();
+ new_filter->Release();
+ }
+ }
+ if (success) {
+ filter_hosts_.push_back(host_initializing_);
+ new_filter->SetFilterHost(host_initializing_);
+
+ // The filter must return true from initialize and there must still not
+ // be an error or it's not successful.
+ success = (new_filter->Initialize(source) &&
+ PIPELINE_OK == pipeline_->error_);
+ }
+ if (success) {
+ // Now we run the thread's message loop recursively. We want all
+ // pending tasks to be processed, so we set nestable tasks to be allowed
+ // and then run the loop. The only way we exit the loop is as the result
+ // of a call to FilterHost::InitializationComplete, FilterHost::Error, or
+ // Pipeline::Stop. In each of these cases, the corresponding task method
+ // sets host_initializing_ to NULL to signal that the message loop's Quit
+ // method has already been called, and then calls message_loop()->Quit().
+ // The setting of |host_initializing_| to NULL in the task prevents a
+ // subsequent task from accidentally quitting the wrong (non-nested) loop.
+ message_loop()->SetNestableTasksAllowed(true);
+ message_loop()->Run();
+ message_loop()->SetNestableTasksAllowed(false);
+ DCHECK(!host_initializing_);
+
+ // If an error occurred while we were in the nested Run state, then
+ // not successful. When stopping, the |error_| member is set to a value of
+ // PIPELINE_STOPPING so we will exit in that case also with false.
+ success = (PIPELINE_OK == pipeline_->error_);
+ }
+
+ // This could still be set if we never ran the message loop (for example,
+ // if the fiter returned false from it's Initialize method), so make sure
+ // to reset it.
+ host_initializing_ = NULL;
+
+ // If this method fails, but no error set, then indicate a general
+ // initialization failure.
+ if (PIPELINE_OK == pipeline_->error_ && (!success)) {
+ Error(PIPELINE_ERROR_INITIALIZATION_FAILED);
+ }
+ return success;
+}
+ +bool PipelineThread::CreateDataSource(FilterFactory* filter_factory,
+ const std::string& url) {
+ MediaFormat url_format;
+ url_format.SetAsString(MediaFormat::kMimeType, mime_type::kURL);
+ url_format.SetAsString(MediaFormat::kURL, url);
+ return CreateFilter<DataSource>(filter_factory, url, &url_format);
+}
+ +template <class Decoder>
+bool PipelineThread::CreateDecoder(FilterFactory* filter_factory) {
+ Demuxer* demuxer = GetFilter<Demuxer>();
+ if (demuxer) {
+ int num_outputs = demuxer->GetNumberOfStreams();
+ for (int i = 0; i < num_outputs; ++i) {
+ DemuxerStream* stream = demuxer->GetStream(i);
+ const MediaFormat* stream_format = stream->GetMediaFormat();
+ if (IsMajorMimeType(stream_format, Decoder::major_mime_type())) {
+ return CreateFilter<Decoder>(filter_factory, stream, stream_format);
+ }
+ }
+ }
+ return false;
+}
+
+template <class NewFilter, class SourceFilter>
+bool PipelineThread::CreateAndConnect(FilterFactory* filter_factory) {
+ SourceFilter* source_filter = GetFilter<SourceFilter>();
+ bool success = (source_filter &&
+ CreateFilter<NewFilter>(filter_factory,
+ source_filter,
+ source_filter->GetMediaFormat()));
+ return success;
} -bool PipelineImpl::SetVolume(float volume) { - // TODO(ralphl): implement SetVolume. - NOTIMPLEMENTED(); +// TODO(ralphl): Consider making this part of the demuxer interface. +bool PipelineThread::HasVideo() const {
+ Demuxer* demuxer = GetFilter<Demuxer>();
+ if (demuxer) {
+ int num_outputs = demuxer->GetNumberOfStreams();
+ for (int i = 0; i < num_outputs; ++i) {
+ if (IsMajorMimeType(demuxer->GetStream(i)->GetMediaFormat(),
+ mime_type::kMajorTypeVideo)) {
+ return true;
+ }
+ }
+ }
+ return false;
+}
+
+bool PipelineThread::IsMajorMimeType(const MediaFormat* media_format, + const std::string& major_mime_type) const { + std::string value; + if (media_format->GetAsString(MediaFormat::kMimeType, &value)) { + return (0 == value.compare(0, major_mime_type.length(), major_mime_type)); + } return false; } +
+// Called as a result of destruction of the thread.
+void PipelineThread::WillDestroyCurrentMessageLoop() {
+ while (!filter_hosts_.empty()) {
+ delete filter_hosts_.back();
+ filter_hosts_.pop_back();
+ }
+}
} // namespace media |