summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorscherkus@chromium.org <scherkus@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2012-08-03 22:36:33 +0000
committerscherkus@chromium.org <scherkus@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2012-08-03 22:36:33 +0000
commita3fff4cdedbfde076b28897a65926490db95ff82 (patch)
treeaf606b69bf09285595d262d307d4007c07473a49
parent5ef4652296925e0cf17cf499e5a859bd1359afe4 (diff)
downloadchromium_src-a3fff4cdedbfde076b28897a65926490db95ff82.zip
chromium_src-a3fff4cdedbfde076b28897a65926490db95ff82.tar.gz
chromium_src-a3fff4cdedbfde076b28897a65926490db95ff82.tar.bz2
Replace RunInSeries() and RunInParallel() with SerialRunner helper class.
The biggest improvement here is that early termination of the callback series is accomplished by deleting the object. BUG=138583 Review URL: https://chromiumcodereview.appspot.com/10830146 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@149951 0039d316-1c4b-4281-b951-d872f2087c98
-rw-r--r--media/base/callback_util.cc134
-rw-r--r--media/base/callback_util.h53
-rw-r--r--media/base/pipeline.cc98
-rw-r--r--media/base/pipeline.h21
-rw-r--r--media/base/serial_runner.cc94
-rw-r--r--media/base/serial_runner.h76
-rw-r--r--media/media.gyp4
7 files changed, 238 insertions, 242 deletions
diff --git a/media/base/callback_util.cc b/media/base/callback_util.cc
deleted file mode 100644
index 6ab7818..0000000
--- a/media/base/callback_util.cc
+++ /dev/null
@@ -1,134 +0,0 @@
-// Copyright (c) 2012 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#include "media/base/callback_util.h"
-
-#include "base/bind.h"
-#include "base/synchronization/lock.h"
-#include "base/memory/ref_counted.h"
-#include "base/message_loop.h"
-#include "base/message_loop_proxy.h"
-
-namespace media {
-
-// Executes the given closure if and only if the closure returned by
-// GetClosure() has been executed exactly |count| times.
-//
-// |done_cb| will be executed on the same thread that created the CountingCB.
-class CountingCB : public base::RefCountedThreadSafe<CountingCB> {
- public:
- CountingCB(int count, const base::Closure& done_cb)
- : message_loop_(base::MessageLoopProxy::current()),
- count_(count),
- done_cb_(done_cb) {
- }
-
- // Returns a closure bound to this object.
- base::Closure GetClosure() {
- return base::Bind(&CountingCB::OnCallback, this);
- }
-
- protected:
- friend class base::RefCountedThreadSafe<CountingCB>;
- virtual ~CountingCB() {}
-
- private:
- void OnCallback() {
- {
- base::AutoLock l(lock_);
- count_--;
- DCHECK_GE(count_, 0) << "CountingCB executed too many times";
- if (count_ != 0)
- return;
- }
-
- if (!message_loop_->BelongsToCurrentThread()) {
- message_loop_->PostTask(FROM_HERE, done_cb_);
- return;
- }
-
- done_cb_.Run();
- }
-
- scoped_refptr<base::MessageLoopProxy> message_loop_;
- base::Lock lock_;
- int count_;
- base::Closure done_cb_;
-
- DISALLOW_COPY_AND_ASSIGN(CountingCB);
-};
-
-static void OnSeriesCallback(
- scoped_refptr<base::MessageLoopProxy> message_loop,
- scoped_ptr<std::queue<ClosureFunc> > closures,
- const base::Closure& done_cb) {
- if (!message_loop->BelongsToCurrentThread()) {
- message_loop->PostTask(FROM_HERE, base::Bind(
- &OnSeriesCallback, message_loop, base::Passed(&closures), done_cb));
- return;
- }
-
- if (closures->empty()) {
- done_cb.Run();
- return;
- }
-
- ClosureFunc cb = closures->front();
- closures->pop();
- cb.Run(base::Bind(
- &OnSeriesCallback, message_loop, base::Passed(&closures), done_cb));
-}
-
-void RunInSeries(scoped_ptr<std::queue<ClosureFunc> > closures,
- const base::Closure& done_cb) {
- OnSeriesCallback(base::MessageLoopProxy::current(),
- closures.Pass(), done_cb);
-}
-
-static void OnStatusCallback(
- scoped_refptr<base::MessageLoopProxy> message_loop,
- scoped_ptr<std::queue<PipelineStatusCBFunc> > status_cbs,
- const PipelineStatusCB& done_cb,
- PipelineStatus last_status) {
- if (!message_loop->BelongsToCurrentThread()) {
- message_loop->PostTask(FROM_HERE, base::Bind(
- &OnStatusCallback, message_loop, base::Passed(&status_cbs), done_cb,
- last_status));
- return;
- }
-
- if (status_cbs->empty() || last_status != PIPELINE_OK) {
- done_cb.Run(last_status);
- return;
- }
-
- PipelineStatusCBFunc status_cb = status_cbs->front();
- status_cbs->pop();
- status_cb.Run(base::Bind(
- &OnStatusCallback, message_loop, base::Passed(&status_cbs), done_cb));
-}
-
-void RunInSeriesWithStatus(
- scoped_ptr<std::queue<PipelineStatusCBFunc> > status_cbs,
- const PipelineStatusCB& done_cb) {
- OnStatusCallback(base::MessageLoopProxy::current(),
- status_cbs.Pass(), done_cb, PIPELINE_OK);
-}
-
-void RunInParallel(scoped_ptr<std::queue<ClosureFunc> > closures,
- const base::Closure& done_cb) {
- if (closures->empty()) {
- done_cb.Run();
- return;
- }
-
- scoped_refptr<CountingCB> counting_cb =
- new CountingCB(closures->size(), done_cb);
- while (!closures->empty()) {
- closures->front().Run(counting_cb->GetClosure());
- closures->pop();
- }
-}
-
-} // namespace media
diff --git a/media/base/callback_util.h b/media/base/callback_util.h
deleted file mode 100644
index d831191..0000000
--- a/media/base/callback_util.h
+++ /dev/null
@@ -1,53 +0,0 @@
-// Copyright (c) 2012 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#ifndef MEDIA_BASE_CALLBACK_UTIL_H_
-#define MEDIA_BASE_CALLBACK_UTIL_H_
-
-#include <queue>
-
-#include "base/callback.h"
-#include "base/memory/scoped_ptr.h"
-#include "media/base/pipeline_status.h"
-
-namespace media {
-
-typedef base::Callback<void(const base::Closure&)> ClosureFunc;
-typedef base::Callback<void(const PipelineStatusCB&)> PipelineStatusCBFunc;
-
-// Executes the closures in FIFO order, executing |done_cb| when the last
-// closure has completed running.
-//
-// All closures (including |done_cb|) are executed on same thread as the
-// calling thread.
-void RunInSeries(scoped_ptr<std::queue<ClosureFunc> > closures,
- const base::Closure& done_cb);
-
-// Executes the closures in FIFO order, executing |done_cb| when the last
-// closure has completed running, reporting the final status code.
-//
-// Closures will stop being executed if a previous closure in the series
-// returned an error status and |done_cb| will be executed prematurely.
-//
-// All closures (including |done_cb|) are executed on same thread as the
-// calling thread.
-void RunInSeriesWithStatus(
- scoped_ptr<std::queue<PipelineStatusCBFunc> > status_cbs,
- const PipelineStatusCB& done_cb);
-
-// Executes the closures in parallel, executing |done_cb| when all closures have
-// completed running.
-//
-// No attempt is made to parallelize execution of the closures. In other words,
-// this method will run all closures in FIFO order if said closures execute
-// synchronously on the same call stack.
-//
-// All closures (including |done_cb|) are executed on same thread as the
-// calling thread.
-void RunInParallel(scoped_ptr<std::queue<ClosureFunc> > closures,
- const base::Closure& done_cb);
-
-} // namespace media
-
-#endif // MEDIA_BASE_CALLBACK_UTIL_H_
diff --git a/media/base/pipeline.cc b/media/base/pipeline.cc
index 24868cf..85ec3e8 100644
--- a/media/base/pipeline.cc
+++ b/media/base/pipeline.cc
@@ -17,7 +17,6 @@
#include "base/synchronization/condition_variable.h"
#include "media/base/audio_decoder.h"
#include "media/base/audio_renderer.h"
-#include "media/base/callback_util.h"
#include "media/base/clock.h"
#include "media/base/filter_collection.h"
#include "media/base/media_log.h"
@@ -451,59 +450,63 @@ TimeDelta Pipeline::TimeForByteOffset_Locked(int64 byte_offset) const {
return time_offset;
}
-void Pipeline::DoPause(const base::Closure& done_cb) {
+void Pipeline::DoPause(const PipelineStatusCB& done_cb) {
DCHECK(message_loop_->BelongsToCurrentThread());
- scoped_ptr<std::queue<ClosureFunc> > closures(new std::queue<ClosureFunc>);
+ DCHECK(!pending_callbacks_.get());
+ SerialRunner::Queue bound_fns;
if (audio_renderer_)
- closures->push(base::Bind(&AudioRenderer::Pause, audio_renderer_));
+ bound_fns.Push(base::Bind(&AudioRenderer::Pause, audio_renderer_));
if (video_renderer_)
- closures->push(base::Bind(&VideoRenderer::Pause, video_renderer_));
+ bound_fns.Push(base::Bind(&VideoRenderer::Pause, video_renderer_));
- RunInSeries(closures.Pass(), done_cb);
+ pending_callbacks_ = SerialRunner::Run(bound_fns, done_cb);
}
-void Pipeline::DoFlush(const base::Closure& done_cb) {
+void Pipeline::DoFlush(const PipelineStatusCB& done_cb) {
DCHECK(message_loop_->BelongsToCurrentThread());
- scoped_ptr<std::queue<ClosureFunc> > closures(new std::queue<ClosureFunc>);
+ DCHECK(!pending_callbacks_.get());
+ SerialRunner::Queue bound_fns;
if (audio_renderer_)
- closures->push(base::Bind(&AudioRenderer::Flush, audio_renderer_));
+ bound_fns.Push(base::Bind(&AudioRenderer::Flush, audio_renderer_));
if (video_renderer_)
- closures->push(base::Bind(&VideoRenderer::Flush, video_renderer_));
+ bound_fns.Push(base::Bind(&VideoRenderer::Flush, video_renderer_));
- RunInParallel(closures.Pass(), done_cb);
+ pending_callbacks_ = SerialRunner::Run(bound_fns, done_cb);
}
-void Pipeline::DoPlay(const base::Closure& done_cb) {
+void Pipeline::DoPlay(const PipelineStatusCB& done_cb) {
DCHECK(message_loop_->BelongsToCurrentThread());
- scoped_ptr<std::queue<ClosureFunc> > closures(new std::queue<ClosureFunc>);
+ DCHECK(!pending_callbacks_.get());
+ SerialRunner::Queue bound_fns;
if (audio_renderer_)
- closures->push(base::Bind(&AudioRenderer::Play, audio_renderer_));
+ bound_fns.Push(base::Bind(&AudioRenderer::Play, audio_renderer_));
if (video_renderer_)
- closures->push(base::Bind(&VideoRenderer::Play, video_renderer_));
+ bound_fns.Push(base::Bind(&VideoRenderer::Play, video_renderer_));
- RunInSeries(closures.Pass(), done_cb);
+ pending_callbacks_ = SerialRunner::Run(bound_fns, done_cb);
}
-void Pipeline::DoStop(const base::Closure& done_cb) {
+void Pipeline::DoStop(const PipelineStatusCB& done_cb) {
DCHECK(message_loop_->BelongsToCurrentThread());
- scoped_ptr<std::queue<ClosureFunc> > closures(new std::queue<ClosureFunc>);
+ DCHECK(!pending_callbacks_.get());
+ SerialRunner::Queue bound_fns;
if (demuxer_)
- closures->push(base::Bind(&Demuxer::Stop, demuxer_));
+ bound_fns.Push(base::Bind(&Demuxer::Stop, demuxer_));
if (audio_renderer_)
- closures->push(base::Bind(&AudioRenderer::Stop, audio_renderer_));
+ bound_fns.Push(base::Bind(&AudioRenderer::Stop, audio_renderer_));
if (video_renderer_)
- closures->push(base::Bind(&VideoRenderer::Stop, video_renderer_));
+ bound_fns.Push(base::Bind(&VideoRenderer::Stop, video_renderer_));
- RunInSeries(closures.Pass(), done_cb);
+ pending_callbacks_ = SerialRunner::Run(bound_fns, done_cb);
}
void Pipeline::AddBufferedByteRange(int64 start, int64 end) {
@@ -545,25 +548,21 @@ void Pipeline::OnFilterInitialize(PipelineStatus status) {
}
// Called from any thread.
-void Pipeline::OnFilterStateTransition() {
- message_loop_->PostTask(FROM_HERE, base::Bind(
- &Pipeline::FilterStateTransitionTask, this));
-}
-
-// Called from any thread.
// This method makes the PipelineStatusCB behave like a Closure. It
// makes it look like a host()->SetError() call followed by a call to
// OnFilterStateTransition() when errors occur.
//
// TODO: Revisit this code when SetError() is removed from FilterHost and
// all the Closures are converted to PipelineStatusCB.
-void Pipeline::OnFilterStateTransitionWithStatus(PipelineStatus status) {
+void Pipeline::OnFilterStateTransition(PipelineStatus status) {
if (status != PIPELINE_OK)
SetError(status);
- OnFilterStateTransition();
+ message_loop_->PostTask(FROM_HERE, base::Bind(
+ &Pipeline::FilterStateTransitionTask, this));
}
-void Pipeline::OnTeardownStateTransition() {
+void Pipeline::OnTeardownStateTransition(PipelineStatus status) {
+ // Ignore any errors during teardown.
message_loop_->PostTask(FROM_HERE, base::Bind(
&Pipeline::TeardownStateTransitionTask, this));
}
@@ -694,7 +693,7 @@ void Pipeline::InitializeTask(PipelineStatus last_stage_status) {
SetState(kSeeking);
seek_timestamp_ = demuxer_->GetStartTime();
DoSeek(seek_timestamp_, true,
- base::Bind(&Pipeline::OnFilterStateTransitionWithStatus, this));
+ base::Bind(&Pipeline::OnFilterStateTransition, this));
}
}
@@ -889,6 +888,9 @@ void Pipeline::AudioDisabledTask() {
void Pipeline::FilterStateTransitionTask() {
DCHECK(message_loop_->BelongsToCurrentThread());
+ DCHECK(pending_callbacks_.get())
+ << "Filter state transitions must be completed via pending_callbacks_";
+ pending_callbacks_.reset();
// No reason transitioning if we've errored or have stopped.
if (IsPipelineStopped()) {
@@ -923,7 +925,7 @@ void Pipeline::FilterStateTransitionTask() {
DoFlush(base::Bind(&Pipeline::OnFilterStateTransition, this));
} else if (state_ == kSeeking) {
DoSeek(seek_timestamp_, false,
- base::Bind(&Pipeline::OnFilterStateTransitionWithStatus, this));
+ base::Bind(&Pipeline::OnFilterStateTransition, this));
} else if (state_ == kStarting) {
DoPlay(base::Bind(&Pipeline::OnFilterStateTransition, this));
} else if (state_ == kStopping) {
@@ -964,6 +966,10 @@ void Pipeline::FilterStateTransitionTask() {
void Pipeline::TeardownStateTransitionTask() {
DCHECK(IsPipelineTearingDown());
+ DCHECK(pending_callbacks_.get())
+ << "Teardown state transitions must be completed via pending_callbacks_";
+ pending_callbacks_.reset();
+
switch (state_) {
case kStopping:
SetState(error_caused_teardown_ ? kError : kStopped);
@@ -1171,6 +1177,9 @@ void Pipeline::TearDownPipeline() {
// Mark that we already start tearing down operation.
tearing_down_ = true;
+ // Cancel any pending operation so we can proceed with teardown.
+ pending_callbacks_.reset();
+
switch (state_) {
case kCreated:
case kError:
@@ -1229,22 +1238,25 @@ void Pipeline::DoSeek(base::TimeDelta seek_timestamp,
bool skip_demuxer_seek,
const PipelineStatusCB& done_cb) {
DCHECK(message_loop_->BelongsToCurrentThread());
- scoped_ptr<std::queue<PipelineStatusCBFunc> > status_cbs(
- new std::queue<PipelineStatusCBFunc>());
+ DCHECK(!pending_callbacks_.get());
+ SerialRunner::Queue bound_fns;
- if (!skip_demuxer_seek)
- status_cbs->push(base::Bind(&Demuxer::Seek, demuxer_, seek_timestamp));
+ if (!skip_demuxer_seek) {
+ bound_fns.Push(base::Bind(
+ &Demuxer::Seek, demuxer_, seek_timestamp));
+ }
- if (audio_renderer_)
- status_cbs->push(base::Bind(
+ if (audio_renderer_) {
+ bound_fns.Push(base::Bind(
&AudioRenderer::Preroll, audio_renderer_, seek_timestamp));
+ }
- if (video_renderer_)
- status_cbs->push(base::Bind(
+ if (video_renderer_) {
+ bound_fns.Push(base::Bind(
&VideoRenderer::Preroll, video_renderer_, seek_timestamp));
+ }
- RunInSeriesWithStatus(status_cbs.Pass(), base::Bind(
- &Pipeline::ReportStatus, this, done_cb));
+ pending_callbacks_ = SerialRunner::Run(bound_fns, done_cb);
}
void Pipeline::OnAudioUnderflow() {
diff --git a/media/base/pipeline.h b/media/base/pipeline.h
index ce028be..9bd23e2 100644
--- a/media/base/pipeline.h
+++ b/media/base/pipeline.h
@@ -15,6 +15,7 @@
#include "media/base/media_export.h"
#include "media/base/pipeline_status.h"
#include "media/base/ranges.h"
+#include "media/base/serial_runner.h"
#include "ui/gfx/size.h"
class MessageLoop;
@@ -310,14 +311,12 @@ class MEDIA_EXPORT Pipeline
// Callbacks executed by filters upon completing initialization.
void OnFilterInitialize(PipelineStatus status);
- // Callback executed by filters upon completing Play(), Pause(), or Stop().
- void OnFilterStateTransition();
-
- // Callback executed by filters upon completing Seek().
- void OnFilterStateTransitionWithStatus(PipelineStatus status);
+ // Callback executed by filters upon completing Play(), Pause(), Flush(),
+ // Seek() or Stop().
+ void OnFilterStateTransition(PipelineStatus status);
// Callback executed by filters when completing teardown operations.
- void OnTeardownStateTransition();
+ void OnTeardownStateTransition(PipelineStatus status);
// Callback executed by filters to update statistics.
void OnUpdateStatistics(const PipelineStatistics& stats);
@@ -415,10 +414,10 @@ class MEDIA_EXPORT Pipeline
// Initiates an asynchronous Pause/Seek/Play/Stop() call sequence executing
// |done_cb| when completed.
- void DoPause(const base::Closure& done_cb);
- void DoFlush(const base::Closure& done_cb);
- void DoPlay(const base::Closure& done_cb);
- void DoStop(const base::Closure& done_cb);
+ void DoPause(const PipelineStatusCB& done_cb);
+ void DoFlush(const PipelineStatusCB& done_cb);
+ void DoPlay(const PipelineStatusCB& done_cb);
+ void DoStop(const PipelineStatusCB& done_cb);
// Initiates an asynchronous Seek() and preroll call sequence executing
// |done_cb| with the final status when completed. If |skip_demuxer_seek| is
@@ -563,6 +562,8 @@ class MEDIA_EXPORT Pipeline
// reaches "kStarted", at which point it is used & zeroed out.
base::Time creation_time_;
+ scoped_ptr<SerialRunner> pending_callbacks_;
+
DISALLOW_COPY_AND_ASSIGN(Pipeline);
};
diff --git a/media/base/serial_runner.cc b/media/base/serial_runner.cc
new file mode 100644
index 0000000..b41ff0c
--- /dev/null
+++ b/media/base/serial_runner.cc
@@ -0,0 +1,94 @@
+// Copyright (c) 2012 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "media/base/serial_runner.h"
+
+#include "base/bind.h"
+#include "base/callback_helpers.h"
+#include "base/message_loop.h"
+#include "base/message_loop_proxy.h"
+
+namespace media {
+
+// Converts a bound function accepting a Closure into a bound function
+// accepting a PipelineStatusCB. Since closures have no way of reporting a
+// status |status_cb| is executed with PIPELINE_OK.
+static void RunBoundClosure(
+ const SerialRunner::BoundClosure& bound_closure,
+ const PipelineStatusCB& status_cb) {
+ bound_closure.Run(base::Bind(status_cb, PIPELINE_OK));
+}
+
+// Runs |status_cb| with |last_status| on |message_loop|, trampolining if
+// necessary.
+static void RunOnMessageLoop(
+ const scoped_refptr<base::MessageLoopProxy>& message_loop,
+ const PipelineStatusCB& status_cb,
+ PipelineStatus last_status) {
+ if (!message_loop->BelongsToCurrentThread()) {
+ message_loop->PostTask(FROM_HERE, base::Bind(
+ &RunOnMessageLoop, message_loop, status_cb, last_status));
+ return;
+ }
+ status_cb.Run(last_status);
+}
+
+SerialRunner::Queue::Queue() {}
+SerialRunner::Queue::~Queue() {}
+
+void SerialRunner::Queue::Push(
+ const BoundClosure& bound_closure) {
+ bound_fns_.push(base::Bind(&RunBoundClosure, bound_closure));
+}
+
+void SerialRunner::Queue::Push(
+ const BoundPipelineStatusCB& bound_status_cb) {
+ bound_fns_.push(bound_status_cb);
+}
+
+SerialRunner::BoundPipelineStatusCB SerialRunner::Queue::Pop() {
+ BoundPipelineStatusCB bound_fn = bound_fns_.front();
+ bound_fns_.pop();
+ return bound_fn;
+}
+
+bool SerialRunner::Queue::empty() {
+ return bound_fns_.empty();
+}
+
+SerialRunner::SerialRunner(
+ const Queue& bound_fns, const PipelineStatusCB& done_cb)
+ : weak_this_(this),
+ message_loop_(base::MessageLoopProxy::current()),
+ bound_fns_(bound_fns),
+ done_cb_(done_cb) {
+ message_loop_->PostTask(FROM_HERE, base::Bind(
+ &SerialRunner::RunNextInSeries, weak_this_.GetWeakPtr(),
+ PIPELINE_OK));
+}
+
+SerialRunner::~SerialRunner() {}
+
+scoped_ptr<SerialRunner> SerialRunner::Run(
+ const Queue& bound_fns, const PipelineStatusCB& done_cb) {
+ scoped_ptr<SerialRunner> callback_series(
+ new SerialRunner(bound_fns, done_cb));
+ return callback_series.Pass();
+}
+
+void SerialRunner::RunNextInSeries(PipelineStatus last_status) {
+ DCHECK(message_loop_->BelongsToCurrentThread());
+ DCHECK(!done_cb_.is_null());
+
+ if (bound_fns_.empty() || last_status != PIPELINE_OK) {
+ base::ResetAndReturn(&done_cb_).Run(last_status);
+ return;
+ }
+
+ BoundPipelineStatusCB bound_fn = bound_fns_.Pop();
+ bound_fn.Run(base::Bind(&RunOnMessageLoop, message_loop_, base::Bind(
+ &SerialRunner::RunNextInSeries, weak_this_.GetWeakPtr())));
+}
+
+} // namespace media
diff --git a/media/base/serial_runner.h b/media/base/serial_runner.h
new file mode 100644
index 0000000..16fa6f3
--- /dev/null
+++ b/media/base/serial_runner.h
@@ -0,0 +1,76 @@
+// Copyright (c) 2012 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef MEDIA_BASE_SERIAL_RUNNER_H_
+#define MEDIA_BASE_SERIAL_RUNNER_H_
+
+#include <queue>
+
+#include "base/callback.h"
+#include "base/memory/ref_counted.h"
+#include "base/memory/scoped_ptr.h"
+#include "base/memory/weak_ptr.h"
+#include "media/base/pipeline_status.h"
+
+namespace base {
+class MessageLoopProxy;
+}
+
+namespace media {
+
+// Runs a series of bound functions accepting Closures or PipelineStatusCB.
+// SerialRunner doesn't use regular Closure/PipelineStatusCBs as it late binds
+// the completion callback as the series progresses.
+class SerialRunner {
+ public:
+ typedef base::Callback<void(const base::Closure&)> BoundClosure;
+ typedef base::Callback<void(const PipelineStatusCB&)> BoundPipelineStatusCB;
+
+ // Serial queue of bound functions to run.
+ class Queue {
+ public:
+ Queue();
+ ~Queue();
+
+ void Push(const BoundClosure& bound_fn);
+ void Push(const BoundPipelineStatusCB& bound_fn);
+
+ private:
+ friend class SerialRunner;
+
+ BoundPipelineStatusCB Pop();
+ bool empty();
+
+ std::queue<BoundPipelineStatusCB> bound_fns_;
+ };
+
+ // Executes the bound functions in series, executing |done_cb| when finished.
+ //
+ // All bound functions are executed on the thread that Run() is called on,
+ // including |done_cb|.
+ //
+ // Deleting the object will prevent execution of any unstarted bound
+ // functions, including |done_cb|.
+ static scoped_ptr<SerialRunner> Run(
+ const Queue& bound_fns, const PipelineStatusCB& done_cb);
+
+ private:
+ friend class scoped_ptr<SerialRunner>;
+
+ SerialRunner(const Queue& bound_fns, const PipelineStatusCB& done_cb);
+ ~SerialRunner();
+
+ void RunNextInSeries(PipelineStatus last_status);
+
+ base::WeakPtrFactory<SerialRunner> weak_this_;
+ scoped_refptr<base::MessageLoopProxy> message_loop_;
+ Queue bound_fns_;
+ PipelineStatusCB done_cb_;
+
+ DISALLOW_COPY_AND_ASSIGN(SerialRunner);
+};
+
+} // namespace media
+
+#endif // MEDIA_BASE_SERIAL_RUNNER_H_
diff --git a/media/media.gyp b/media/media.gyp
index f9c362f..89b0229 100644
--- a/media/media.gyp
+++ b/media/media.gyp
@@ -155,8 +155,6 @@
'base/buffers.h',
'base/byte_queue.cc',
'base/byte_queue.h',
- 'base/callback_util.cc',
- 'base/callback_util.h',
'base/channel_layout.cc',
'base/channel_layout.h',
'base/clock.cc',
@@ -201,6 +199,8 @@
'base/ranges.h',
'base/seekable_buffer.cc',
'base/seekable_buffer.h',
+ 'base/serial_runner.cc',
+ 'base/serial_runner.h',
'base/sinc_resampler.cc',
'base/sinc_resampler.h',
'base/stream_parser.cc',