summaryrefslogtreecommitdiffstats
path: root/media/base/pipeline_impl.cc
diff options
context:
space:
mode:
authorralphl@chromium.org <ralphl@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2009-01-28 17:21:23 +0000
committerralphl@chromium.org <ralphl@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2009-01-28 17:21:23 +0000
commitb026f4aeb159244e47d3b55c5faad7bb3247fd55 (patch)
treeefda81548664ea009160ded8588904185b384ca4 /media/base/pipeline_impl.cc
parentabc659b6d95cf197c2b96a544351a667e8b96f95 (diff)
downloadchromium_src-b026f4aeb159244e47d3b55c5faad7bb3247fd55.zip
chromium_src-b026f4aeb159244e47d3b55c5faad7bb3247fd55.tar.gz
chromium_src-b026f4aeb159244e47d3b55c5faad7bb3247fd55.tar.bz2
Implementation of Pipeline and FilterHost interfaces. This is a large change, but all of the objects are interrelated.
I am also checking in a basic unit test that creates pipeline, and the data source hangs during initialization. The test sleeps one second and then stops the pipeline. Andrew has already done a first pass on this, and the code has come largely from our working experimental branch. Review URL: http://codereview.chromium.org/18546 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@8805 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'media/base/pipeline_impl.cc')
-rw-r--r--media/base/pipeline_impl.cc517
1 files changed, 460 insertions, 57 deletions
diff --git a/media/base/pipeline_impl.cc b/media/base/pipeline_impl.cc
index 50e8776..e49e719 100644
--- a/media/base/pipeline_impl.cc
+++ b/media/base/pipeline_impl.cc
@@ -2,110 +2,513 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
+#include "base/compiler_specific.h"
+#include "media/base/filter_host_impl.h"
+#include "media/base/media_format.h"
#include "media/base/pipeline_impl.h"
namespace media {
PipelineImpl::PipelineImpl() {
- // TODO(ralphl): implement PipelineImpl constructor.
- NOTIMPLEMENTED();
+ ResetState();
}
PipelineImpl::~PipelineImpl() {
- // TODO(ralphl): implement PipelineImpl destructor.
- NOTIMPLEMENTED();
+ Stop();
}
bool PipelineImpl::IsInitialized() const {
- // TODO(ralphl): implement IsInitialized.
- NOTIMPLEMENTED();
- return false;
+ return initialized_;
}
-int64 PipelineImpl::GetDuration() const {
- // TODO(ralphl): implement GetDuration.
- NOTIMPLEMENTED();
- return 0;
+base::TimeDelta PipelineImpl::GetDuration() const {
+ return duration_;
}
-int64 PipelineImpl::GetBufferedTime() const {
- // TODO(ralphl): implement GetBufferedTime.
- NOTIMPLEMENTED();
- return 0;
+base::TimeDelta PipelineImpl::GetBufferedTime() const {
+ return buffered_time_;
}
int64 PipelineImpl::GetTotalBytes() const {
- // TODO(ralphl): implement GetTotalBytes.
- NOTIMPLEMENTED();
- return 0;
+ return total_bytes_;
}
int64 PipelineImpl::GetBufferedBytes() const {
- // TODO(ralphl): implement GetBufferedBytes.
- NOTIMPLEMENTED();
- return 0;
+ return buffered_bytes_;
}
void PipelineImpl::GetVideoSize(size_t* width_out, size_t* height_out) const {
- // TODO(ralphl): implement GetVideoSize.
- NOTIMPLEMENTED();
- width_out = 0;
- height_out = 0;
+ DCHECK(width_out);
+ DCHECK(height_out);
+ AutoLock auto_lock(const_cast<Lock&>(video_size_access_lock_));
+ *width_out = video_width_;
+ *height_out = video_height_;
}
float PipelineImpl::GetVolume() const {
- // TODO(ralphl): implement GetVolume.
- NOTIMPLEMENTED();
- return 0;
+ return volume_;
}
float PipelineImpl::GetPlaybackRate() const {
- // TODO(ralphl): implement GetPlaybackRate.
- NOTIMPLEMENTED();
- return 0;
+ return playback_rate_;
}
-int64 PipelineImpl::GetTime() const {
- // TODO(ralphl): implement GetTime.
- NOTIMPLEMENTED();
- return 0;
+base::TimeDelta PipelineImpl::GetTime() const {
+ return time_;
}
PipelineError PipelineImpl::GetError() const {
- // TODO(ralphl): implement GetError.
- NOTIMPLEMENTED();
- return PIPELINE_ERROR_INITIALIZATION_FAILED;
+ return error_;
}
-bool PipelineImpl::Start(FilterFactory* filter_factory,
- const std::string& uri,
+// Creates the PipelineThread and calls it's start method.
+bool PipelineImpl::Start(FilterFactory* factory,
+ const std::string& url,
Callback1<bool>::Type* init_complete_callback) {
- // TODO(ralphl): implement Start.
- NOTIMPLEMENTED();
- return false;
-}
+ DCHECK(!pipeline_thread_);
+ DCHECK(factory);
+ DCHECK(!initialized_);
+ if (!pipeline_thread_ && factory) {
+ pipeline_thread_ = new PipelineThread(this);
+ if (pipeline_thread_) {
+ // TODO(ralphl): Does the callback get copied by these fancy templates?
+ // if so, then do I want to always delete it here???
+ if (pipeline_thread_->Start(factory, url, init_complete_callback)) {
+ return true;
+ }
+ pipeline_thread_ = NULL; // Releases reference to destroy thread
+ }
+ }
+ delete init_complete_callback;
+ return false;
+}
+// Stop the PipelineThread and return to a state identical to that of a newly
+// created PipelineImpl object.
void PipelineImpl::Stop() {
- // TODO(ralphl): implement Stop.
- NOTIMPLEMENTED();
+ if (pipeline_thread_) {
+ pipeline_thread_->Stop();
+ }
+ ResetState();
}
-bool PipelineImpl::SetPlaybackRate(float rate) {
- // TODO(ralphl): implement SetPlaybackRate.
- NOTIMPLEMENTED();
- return false;
+
+
+void PipelineImpl::SetPlaybackRate(float rate) {
+ if (OkToCallThread() && rate >= 0.0f) {
+ pipeline_thread_->SetPlaybackRate(rate);
+ } else {
+ NOTREACHED();
+ }
}
-bool PipelineImpl::Seek(int64 time) {
- // TODO(ralphl): implement Seek.
- NOTIMPLEMENTED();
- return false;
+void PipelineImpl::Seek(base::TimeDelta time) {
+ if (OkToCallThread()) {
+ pipeline_thread_->Seek(time);
+ } else {
+ NOTREACHED();
+ }
+}
+
+void PipelineImpl::SetVolume(float volume) {
+ if (OkToCallThread() && volume >= 0.0f && volume <= 1.0f) {
+ pipeline_thread_->SetVolume(volume);
+ } else {
+ NOTREACHED();
+ }
+}
+
+void PipelineImpl::ResetState() {
+ pipeline_thread_ = NULL;
+ initialized_ = false;
+ duration_ = base::TimeDelta();
+ buffered_time_ = base::TimeDelta();
+ buffered_bytes_ = 0;
+ total_bytes_ = 0;
+ video_width_ = 0;
+ video_height_ = 0;
+ volume_ = 0.0f;
+ playback_rate_ = 0.0f;
+ time_ = base::TimeDelta();
+ error_ = PIPELINE_OK;
+}
+
+void PipelineImpl::SetVideoSize(size_t width, size_t height) {
+ AutoLock auto_lock(video_size_access_lock_);
+ width = width;
+ height = height;
+}
+
+//-----------------------------------------------------------------------------
+
+PipelineThread::PipelineThread(PipelineImpl* pipeline)
+ : pipeline_(pipeline),
+ thread_("PipelineThread"),
+ time_update_callback_scheduled_(false),
+ host_initializing_(NULL) {
+}
+
+PipelineThread::~PipelineThread() {
+ Stop();
+}
+
+// This method is called on the client's thread. It starts the pipeline's
+// dedicated thread and posts a task to call the StartTask method on that
+// thread.
+bool PipelineThread::Start(FilterFactory* filter_factory,
+ const std::string& url,
+ Callback1<bool>::Type* init_complete_callback) {
+ if (thread_.Start()) {
+ filter_factory->AddRef();
+ PostTask(NewRunnableMethod(this,
+ &PipelineThread::StartTask,
+ filter_factory,
+ url,
+ // TODO(ralphl): what happens to this callback?
+ // is it copied by NewRunnableTask? Just pointer
+ // or is the callback itself copied?
+ init_complete_callback));
+ return true;
+ }
+ return false;
+}
+
+// Called on the client's thread. If the thread has been started, then posts
+// a task to call the StopTask method, then waits until the thread has stopped.
+// There is a critical section that wraps the entire duration of the StartTask
+// method. This method waits for that Lock to be released so that we know
+// that the thread is not executing a nested message loop. This way we know
+// that that Thread::Stop call will quit the appropriate message loop.
+void PipelineThread::Stop() {
+ if (thread_.IsRunning()) {
+ PostTask(NewRunnableMethod(this, &PipelineThread::StopTask));
+ AutoLock lock_crit(initialization_lock_);
+ thread_.Stop();
+ }
+ DCHECK(filter_hosts_.empty());
+}
+
+// Called on client's thread.
+void PipelineThread::SetPlaybackRate(float rate) {
+ PostTask(NewRunnableMethod(this, &PipelineThread::SetPlaybackRateTask, rate));
+}
+
+// Called on client's thread.
+void PipelineThread::Seek(base::TimeDelta time) {
+ PostTask(NewRunnableMethod(this, &PipelineThread::SeekTask, time));
+}
+
+// Called on client's thread.
+void PipelineThread::SetVolume(float volume) {
+ PostTask(NewRunnableMethod(this, &PipelineThread::SetVolumeTask, volume));
+}
+
+// May be called on any thread, and therefore we always assume the worst
+// possible race condition. This could, for example, be called from a filter's
+// thread just as the pipeline thread is exiting the call to the filter's
+// Initialize() method. Therefore, we make NO assumptions, and post work
+// in every case, even the trivial one of a thread calling this method from
+// within it's Initialize method. This means that we will always run a nested
+// message loop, and the InitializationCompleteTask will Quit that loop
+// immediately in the trivial case.
+void PipelineThread::InitializationComplete(FilterHostImpl* host) {
+ DCHECK(host == host_initializing_);
+ PostTask(NewRunnableMethod(this,
+ &PipelineThread::InitializationCompleteTask,
+ host));
+}
+
+// Called from any thread. Updates the pipeline time and schedules a task to
+// call back to filters that have registered a callback for time updates.
+void PipelineThread::SetTime(base::TimeDelta time) {
+ pipeline()->time_ = time;
+ if (!time_update_callback_scheduled_) {
+ time_update_callback_scheduled_ = true;
+ PostTask(NewRunnableMethod(this, &PipelineThread::SetTimeTask));
+ }
+}
+
+// Called from any thread. Sets the pipeline error_ member and schedules a
+// task to stop all the filters in the pipeline. Note that the thread will
+// continue to run until the client calls Pipeline::Stop, but nothing will
+// be processed since filters will not be able to post tasks.
+void PipelineThread::Error(PipelineError error) {
+ DCHECK(PIPELINE_OK != error);
+ if (PIPELINE_OK == pipeline()->error_) {
+ pipeline()->error_ = error;
+ PostTask(NewRunnableMethod(this, &PipelineThread::StopTask));
+ }
+}
+
+// Called from any thread. Used by FilterHostImpl::PostTask method and used
+// internally.
+void PipelineThread::PostTask(Task* task) {
+ message_loop()->PostTask(FROM_HERE, task);
+}
+
+
+// Main initialization method called on the pipeline thread. This code attempts
+// to use the specified filter factory to build a pipeline. It starts by
+// creating a DataSource, connects it to a Demuxer, and then connects the
+// Demuxer's audio stream to an AudioDecoder which is then connected to an
+// AudioRenderer. If the media has video, then it connects a VideoDecoder to
+// the Demuxer's video stream, and then connects the VideoDecoder to a
+// VideoRenderer. When all required filters have been created and have called
+// their FilterHost's InitializationComplete method, the pipeline's
+// initialized_ member is set to true, and, if the client provided an
+// init_complete_callback, it is called with "true".
+// If initializatoin fails, the client's callback will still be called, but
+// the bool parameter passed to it will be false.
+//
+// Note that at each step in this process, the initialization of any filter
+// may require running the pipeline thread's message loop recursively. This is
+// handled by the CreateFilter method.
+void PipelineThread::StartTask(FilterFactory* filter_factory,
+ const std::string& url,
+ Callback1<bool>::Type* init_complete_callback) {
+ bool success = true;
+
+ // During the entire StartTask we hold the initialization_lock_ so that
+ // if the client calls the Pipeline::Stop method while we are running a
+ // nested message loop, we can correctly unwind out of it before calling
+ // the Thread::Stop method.
+ AutoLock auto_lock(initialization_lock_);
+
+ // Add ourselves as a destruction observer of the thread's message loop so
+ // we can delete filters at an appropriate time (when all tasks have been
+ // processed and the thread is about to be destroyed).
+ message_loop()->AddDestructionObserver(this);
+ success = CreateDataSource(filter_factory, url);
+ if (success) {
+ success = CreateAndConnect<Demuxer, DataSource>(filter_factory);
+ }
+ if (success) {
+ success = CreateDecoder<AudioDecoder>(filter_factory);
+ }
+ if (success) {
+ success = CreateAndConnect<AudioRenderer, AudioDecoder>(filter_factory);
+ }
+ if (success && HasVideo()) {
+ success = CreateDecoder<VideoDecoder>(filter_factory);
+ if (success) {
+ success = CreateAndConnect<VideoRenderer, VideoDecoder>(filter_factory);
+ }
+ }
+ if (success) {
+ pipeline_->initialized_ = true;
+ } else if (PIPELINE_OK == pipeline_->error_) {
+ Error(PIPELINE_ERROR_INITIALIZATION_FAILED);
+ }
+
+ // No matter what, we're done with the filter factory, and
+ // client callback so get rid of them.
+ filter_factory->Release();
+ if (init_complete_callback) {
+ init_complete_callback->Run(success);
+ delete init_complete_callback;
+ }
+}
+
+// This method is called as a result of the client calling Pipeline::Stop() or
+// as the result of an error condition. If there is no error, then set the
+// pipeline's error_ member to PIPELINE_STOPPING. We stop the filters in the
+// reverse order.
+void PipelineThread::StopTask() {
+ if (PIPELINE_OK == pipeline_->error_) {
+ pipeline_->error_ = PIPELINE_STOPPING;
+ }
+ FilterHostVector::reverse_iterator riter = filter_hosts_.rbegin();
+ while (riter != filter_hosts_.rend()) {
+ (*riter)->Stop();
+ ++riter;
+ }
+ if (host_initializing_) {
+ host_initializing_ = NULL;
+ message_loop()->Quit();
+ }
+}
+
+// Task runs as a result of a filter calling InitializationComplete. If for
+// some reason StopTask has been executed prior to this, the host_initializing_
+// member will be NULL, and the message loop will have been quit already, so
+// we don't want to do it again.
+void PipelineThread::InitializationCompleteTask(FilterHostImpl* host) {
+ if (host == host_initializing_) {
+ host_initializing_ = NULL;
+ message_loop()->Quit();
+ } else {
+ DCHECK(!host_initializing_);
+ }
+}
+
+void PipelineThread::SetPlaybackRateTask(float rate) {
+ pipeline_->playback_rate_ = rate;
+ FilterHostVector::iterator iter = filter_hosts_.begin();
+ while (iter != filter_hosts_.end()) {
+ (*iter)->media_filter()->SetPlaybackRate(rate);
+ ++iter;
+ }
+}
+
+void PipelineThread::SeekTask(base::TimeDelta time) {
+ FilterHostVector::iterator iter = filter_hosts_.begin();
+ while (iter != filter_hosts_.end()) {
+ (*iter)->media_filter()->Seek(time);
+ ++iter;
+ }
+}
+
+void PipelineThread::SetVolumeTask(float volume) {
+ pipeline_->volume_ = volume;
+ AudioRenderer* audio_renderer = GetFilter<AudioRenderer>();
+ if (audio_renderer) {
+ audio_renderer->SetVolume(volume);
+ }
+}
+
+void PipelineThread::SetTimeTask() {
+ time_update_callback_scheduled_ = false;
+ FilterHostVector::iterator iter = filter_hosts_.begin();
+ while (iter != filter_hosts_.end()) {
+ (*iter)->RunTimeUpdateCallback(pipeline_->time_);
+ ++iter;
+ }
+}
+
+template <class Filter>
+Filter* PipelineThread::GetFilter() const {
+ Filter* filter = NULL;
+ FilterHostVector::const_iterator iter = filter_hosts_.begin();
+ while (iter != filter_hosts_.end() && NULL == filter) {
+ filter = (*iter)->GetFilter<Filter>();
+ ++iter;
+ }
+ return filter;
+}
+
+template <class NewFilter, class Source>
+bool PipelineThread::CreateFilter(FilterFactory* filter_factory,
+ Source source,
+ const MediaFormat* source_media_format) {
+ NewFilter* new_filter;
+ bool success;
+ success = filter_factory->Create(source_media_format, &new_filter);
+ if (success) {
+ DCHECK(!host_initializing_);
+ host_initializing_ = new FilterHostImpl(this, new_filter);
+ if (!host_initializing_) {
+ success = false;
+ new_filter->AddRef();
+ new_filter->Release();
+ }
+ }
+ if (success) {
+ filter_hosts_.push_back(host_initializing_);
+ new_filter->SetFilterHost(host_initializing_);
+
+ // The filter must return true from initialize and there must still not
+ // be an error or it's not successful.
+ success = (new_filter->Initialize(source) &&
+ PIPELINE_OK == pipeline_->error_);
+ }
+ if (success) {
+ // Now we run the thread's message loop recursively. We want all
+ // pending tasks to be processed, so we set nestable tasks to be allowed
+ // and then run the loop. The only way we exit the loop is as the result
+ // of a call to FilterHost::InitializationComplete, FilterHost::Error, or
+ // Pipeline::Stop. In each of these cases, the corresponding task method
+ // sets host_initializing_ to NULL to signal that the message loop's Quit
+ // method has already been called, and then calls message_loop()->Quit().
+ // The setting of |host_initializing_| to NULL in the task prevents a
+ // subsequent task from accidentally quitting the wrong (non-nested) loop.
+ message_loop()->SetNestableTasksAllowed(true);
+ message_loop()->Run();
+ message_loop()->SetNestableTasksAllowed(false);
+ DCHECK(!host_initializing_);
+
+ // If an error occurred while we were in the nested Run state, then
+ // not successful. When stopping, the |error_| member is set to a value of
+ // PIPELINE_STOPPING so we will exit in that case also with false.
+ success = (PIPELINE_OK == pipeline_->error_);
+ }
+
+ // This could still be set if we never ran the message loop (for example,
+ // if the fiter returned false from it's Initialize method), so make sure
+ // to reset it.
+ host_initializing_ = NULL;
+
+ // If this method fails, but no error set, then indicate a general
+ // initialization failure.
+ if (PIPELINE_OK == pipeline_->error_ && (!success)) {
+ Error(PIPELINE_ERROR_INITIALIZATION_FAILED);
+ }
+ return success;
+}
+
+bool PipelineThread::CreateDataSource(FilterFactory* filter_factory,
+ const std::string& url) {
+ MediaFormat url_format;
+ url_format.SetAsString(MediaFormat::kMimeType, mime_type::kURL);
+ url_format.SetAsString(MediaFormat::kURL, url);
+ return CreateFilter<DataSource>(filter_factory, url, &url_format);
+}
+
+template <class Decoder>
+bool PipelineThread::CreateDecoder(FilterFactory* filter_factory) {
+ Demuxer* demuxer = GetFilter<Demuxer>();
+ if (demuxer) {
+ int num_outputs = demuxer->GetNumberOfStreams();
+ for (int i = 0; i < num_outputs; ++i) {
+ DemuxerStream* stream = demuxer->GetStream(i);
+ const MediaFormat* stream_format = stream->GetMediaFormat();
+ if (IsMajorMimeType(stream_format, Decoder::major_mime_type())) {
+ return CreateFilter<Decoder>(filter_factory, stream, stream_format);
+ }
+ }
+ }
+ return false;
+}
+
+template <class NewFilter, class SourceFilter>
+bool PipelineThread::CreateAndConnect(FilterFactory* filter_factory) {
+ SourceFilter* source_filter = GetFilter<SourceFilter>();
+ bool success = (source_filter &&
+ CreateFilter<NewFilter>(filter_factory,
+ source_filter,
+ source_filter->GetMediaFormat()));
+ return success;
}
-bool PipelineImpl::SetVolume(float volume) {
- // TODO(ralphl): implement SetVolume.
- NOTIMPLEMENTED();
+// TODO(ralphl): Consider making this part of the demuxer interface.
+bool PipelineThread::HasVideo() const {
+ Demuxer* demuxer = GetFilter<Demuxer>();
+ if (demuxer) {
+ int num_outputs = demuxer->GetNumberOfStreams();
+ for (int i = 0; i < num_outputs; ++i) {
+ if (IsMajorMimeType(demuxer->GetStream(i)->GetMediaFormat(),
+ mime_type::kMajorTypeVideo)) {
+ return true;
+ }
+ }
+ }
+ return false;
+}
+
+bool PipelineThread::IsMajorMimeType(const MediaFormat* media_format,
+ const std::string& major_mime_type) const {
+ std::string value;
+ if (media_format->GetAsString(MediaFormat::kMimeType, &value)) {
+ return (0 == value.compare(0, major_mime_type.length(), major_mime_type));
+ }
return false;
}
+
+// Called as a result of destruction of the thread.
+void PipelineThread::WillDestroyCurrentMessageLoop() {
+ while (!filter_hosts_.empty()) {
+ delete filter_hosts_.back();
+ filter_hosts_.pop_back();
+ }
+}
} // namespace media