summaryrefslogtreecommitdiffstats
path: root/media/audio
diff options
context:
space:
mode:
authortommi@chromium.org <tommi@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2012-03-27 20:37:55 +0000
committertommi@chromium.org <tommi@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2012-03-27 20:37:55 +0000
commit409a01172dc2b4721f4817fee405ddd6933678a6 (patch)
tree3cd8217d5551e4ccf89609f26f91fc9d7ba548c0 /media/audio
parentefbd89cd06131ca00c1b7ea08755f287ec485625 (diff)
downloadchromium_src-409a01172dc2b4721f4817fee405ddd6933678a6.zip
chromium_src-409a01172dc2b4721f4817fee405ddd6933678a6.tar.gz
chromium_src-409a01172dc2b4721f4817fee405ddd6933678a6.tar.bz2
Add a CrossProcessNotification class.
This class will be used to synchronize multiple audio objects with minimal thread and IPC socket usage. TEST=Several tests included. Run media unittests. BUG=114699 Review URL: https://chromiumcodereview.appspot.com/9605015 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@129263 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'media/audio')
-rw-r--r--media/audio/cross_process_notification.cc30
-rw-r--r--media/audio/cross_process_notification.h172
-rw-r--r--media/audio/cross_process_notification_posix.cc114
-rw-r--r--media/audio/cross_process_notification_unittest.cc458
-rw-r--r--media/audio/cross_process_notification_win.cc268
5 files changed, 1042 insertions, 0 deletions
diff --git a/media/audio/cross_process_notification.cc b/media/audio/cross_process_notification.cc
new file mode 100644
index 0000000..1806f77
--- /dev/null
+++ b/media/audio/cross_process_notification.cc
@@ -0,0 +1,30 @@
+// Copyright (c) 2012 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "media/audio/cross_process_notification.h"
+
+#include "base/logging.h"
+#include "base/memory/scoped_ptr.h"
+
+CrossProcessNotification::CrossProcessNotification() {}
+
+CrossProcessNotification::WaitForMultiple::WaitForMultiple(
+ const Notifications* notifications) {
+ Reset(notifications);
+}
+
+int CrossProcessNotification::WaitForMultiple::Wait() {
+ DCHECK(CalledOnValidThread());
+ int ret = WaitMultiple(*notifications_, wait_offset_);
+ wait_offset_ = (ret + 1) % notifications_->size();
+ return ret;
+}
+
+void CrossProcessNotification::WaitForMultiple::Reset(
+ const Notifications* notifications) {
+ DCHECK(CalledOnValidThread());
+ wait_offset_ = 0;
+ notifications_ = notifications;
+ DCHECK(!notifications_->empty());
+}
diff --git a/media/audio/cross_process_notification.h b/media/audio/cross_process_notification.h
new file mode 100644
index 0000000..cae7435
--- /dev/null
+++ b/media/audio/cross_process_notification.h
@@ -0,0 +1,172 @@
+// Copyright (c) 2012 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef MEDIA_AUDIO_CROSS_PROCESS_NOTIFICATION_H_
+#define MEDIA_AUDIO_CROSS_PROCESS_NOTIFICATION_H_
+
+#include <vector>
+
+#include "base/basictypes.h"
+#include "base/process.h"
+#include "base/threading/non_thread_safe.h"
+#include "media/base/media_export.h"
+
+#if defined(OS_WIN)
+#include "base/win/scoped_handle.h"
+#else
+#include "base/file_descriptor_posix.h"
+#include "base/sync_socket.h"
+#endif
+
+// A mechanism to synchronize access to a shared resource between two parties
+// when the usage pattern resembles that of two players playing a game of chess.
+// Each end has an instance of CrossProcessNotification and calls Signal() when
+// it has finished using the shared resource.
+// Before accessing the resource, it must call Wait() in order to know when the
+// other end has called Signal().
+//
+// Here's some pseudo code for how this class can be used:
+//
+// This method is used by both processes as it's a general way to use the
+// shared resource and then grant the privilege to the other process:
+//
+// void WriteToSharedMemory(CrossProcessNotification* notification,
+// SharedMemory* mem,
+// const char my_char) {
+// notification->Wait(); // Wait for the other process to yield access.
+// reinterpret_cast<char*>(mem->memory())[0] = my_char;
+// notification->Signal(); // Grant the other process access.
+// }
+//
+// Process A:
+//
+// class A {
+// public:
+// void Initialize(base::ProcessHandle process_b) {
+// mem_.CreateNamed("foo", false, 1024);
+//
+// CrossProcessNotification other;
+// CHECK(CrossProcessNotification::InitializePair(&notification_, &other));
+// CrossProcessNotification::IPCHandle handle_1, handle_2;
+// CHECK(other.ShareToProcess(process_b, &handle_1, &handle_2));
+// // This could be implemented by using some IPC mechanism
+// // such as MessageLoop.
+// SendToProcessB(mem_, handle_1, handle_2);
+// // Allow process B the first chance to write to the memory:
+// notification_.Signal();
+// // Once B is done, we'll write 'A' to the shared memory.
+// WriteToSharedMemory(&notification_, &mem_, 'A');
+// }
+//
+// CrossProcessNotification notification_;
+// SharedMemory mem_;
+// };
+//
+// Process B:
+//
+// class B {
+// public:
+// // Called when we receive the IPC message from A.
+// void Initialize(SharedMemoryHandle mem,
+// CrossProcessNotification::IPCHandle handle_1,
+// CrossProcessNotification::IPCHandle handle_2) {
+// mem_.reset(new SharedMemory(mem, false));
+// notification_.reset(new CrossProcessNotification(handle_1, handle_2));
+// WriteToSharedMemory(&notification_, &mem_, 'B');
+// }
+//
+// CrossProcessNotification notification_;
+// scoped_ptr<SharedMemory> mem_;
+// };
+//
+class MEDIA_EXPORT CrossProcessNotification {
+ public:
+#if defined(OS_WIN)
+ typedef HANDLE IPCHandle;
+#else
+ typedef base::FileDescriptor IPCHandle;
+#endif
+
+ typedef std::vector<CrossProcessNotification*> Notifications;
+
+ // Default ctor. Initializes a NULL notification. User must call
+ // InitializePair() to initialize the instance along with a connected one.
+ CrossProcessNotification();
+
+ // Ctor for the user that does not call InitializePair but instead receives
+ // handles from the one that did. These handles come from a call to
+ // ShareToProcess.
+ CrossProcessNotification(IPCHandle handle_1, IPCHandle handle_2);
+ ~CrossProcessNotification();
+
+ // Raises a signal that the shared resource now can be accessed by the other
+ // party.
+ // NOTE: Calling Signal() more than once without calling Wait() in between
+ // is not a supported scenario and will result in undefined behavior (and
+ // different depending on platform).
+ void Signal();
+
+ // Waits for the other party to finish using the shared resource.
+ // NOTE: As with Signal(), you must not call Wait() more than once without
+ // calling Signal() in between.
+ void Wait();
+
+ bool IsValid() const;
+
+ // Copies the internal handles to the output parameters, |handle_1| and
+ // |handle_2|. The operation can fail, so the caller must be prepared to
+ // handle that case.
+ bool ShareToProcess(base::ProcessHandle process, IPCHandle* handle_1,
+ IPCHandle* handle_2);
+
+ // Initializes a pair of CrossProcessNotification instances. Note that this
+ // can fail (e.g. due to EMFILE on Linux).
+ static bool InitializePair(CrossProcessNotification* a,
+ CrossProcessNotification* b);
+
+ // Use an instance of this class when you have to repeatedly wait for multiple
+ // notifications on the same thread. The class will store information about
+ // which notification was last signaled and try to distribute the signals so
+ // that all notifications get a chance to be processed in times of high load
+ // and a busy one won't starve the others.
+ // TODO(tommi): Support a way to abort the wait.
+ class MEDIA_EXPORT WaitForMultiple :
+ public NON_EXPORTED_BASE(base::NonThreadSafe) {
+ public:
+ // Caller must make sure that the lifetime of the array is greater than
+ // that of the WaitForMultiple instance.
+ explicit WaitForMultiple(const Notifications* notifications);
+
+ // Waits for any of the notifications to be signaled. Returns the 0 based
+ // index of a signaled notification.
+ int Wait();
+
+ // Call when the array changes. This should be called on the same thread
+ // as Wait() is called on and the array must never change while a Wait()
+ // is in progress.
+ void Reset(const Notifications* notifications);
+
+ private:
+ const Notifications* notifications_;
+ size_t wait_offset_;
+ };
+
+ private:
+ // Only called by the WaitForMultiple class. See documentation
+ // for WaitForMultiple and comments inside WaitMultiple for details.
+ static int WaitMultiple(const Notifications& notifications,
+ size_t wait_offset);
+
+#if defined(OS_WIN)
+ base::win::ScopedHandle mine_;
+ base::win::ScopedHandle other_;
+#else
+ typedef base::CancelableSyncSocket SocketClass;
+ SocketClass socket_;
+#endif
+
+ DISALLOW_COPY_AND_ASSIGN(CrossProcessNotification);
+};
+
+#endif // MEDIA_AUDIO_CROSS_PROCESS_NOTIFICATION_H_
diff --git a/media/audio/cross_process_notification_posix.cc b/media/audio/cross_process_notification_posix.cc
new file mode 100644
index 0000000..070ef06
--- /dev/null
+++ b/media/audio/cross_process_notification_posix.cc
@@ -0,0 +1,114 @@
+// Copyright (c) 2012 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "media/audio/cross_process_notification.h"
+
+#include <errno.h>
+#include <sys/poll.h>
+
+#include "base/logging.h"
+#include "base/memory/scoped_ptr.h"
+#include "base/file_descriptor_posix.h"
+
+CrossProcessNotification::~CrossProcessNotification() {}
+
+CrossProcessNotification::CrossProcessNotification(IPCHandle handle_1,
+ IPCHandle handle_2)
+ : socket_(handle_1.fd) {
+ DCHECK_NE(handle_1.fd, -1);
+ DCHECK_EQ(handle_2.fd, -1);
+ DCHECK(IsValid());
+}
+
+void CrossProcessNotification::Signal() {
+ DCHECK(IsValid());
+ char signal = 1;
+ size_t bytes = socket_.Send(&signal, sizeof(signal));
+ DCHECK_EQ(bytes, 1U) << "errno: " << errno;
+}
+
+void CrossProcessNotification::Wait() {
+ DCHECK(IsValid());
+ char signal = 0;
+ size_t bytes = socket_.Receive(&signal, sizeof(signal));
+ DCHECK_EQ(bytes, 1U) << "errno: " << errno;
+ DCHECK_EQ(signal, 1);
+}
+
+bool CrossProcessNotification::IsValid() const {
+ return socket_.handle() != SocketClass::kInvalidHandle;
+}
+
+bool CrossProcessNotification::ShareToProcess(base::ProcessHandle process,
+ IPCHandle* handle_1,
+ IPCHandle* handle_2) {
+ DCHECK(IsValid());
+ handle_1->fd = socket_.handle();
+ handle_1->auto_close = false;
+ handle_2->fd = -1;
+ return true;
+}
+
+// static
+bool CrossProcessNotification::InitializePair(CrossProcessNotification* a,
+ CrossProcessNotification* b) {
+ DCHECK(!a->IsValid());
+ DCHECK(!b->IsValid());
+
+ bool ok = SocketClass::CreatePair(&a->socket_, &b->socket_);
+
+ DLOG_IF(WARNING, !ok) << "failed to create socket: " << errno;
+ DCHECK(!ok || a->IsValid());
+ DCHECK(!ok || b->IsValid());
+ return ok;
+}
+
+// static
+int CrossProcessNotification::WaitMultiple(const Notifications& notifications,
+ size_t wait_offset) {
+ DCHECK_LT(wait_offset, notifications.size());
+
+ for (size_t i = 0; i < notifications.size(); ++i) {
+ DCHECK(notifications[i]->IsValid());
+ }
+
+ // Below, we always check the |revents| of the first socket in the array
+ // and return the index of that socket if set. This can cause sockets
+ // that come later in the array to starve when the first sockets are
+ // very busy. So to avoid the starving problem, we use the |wait_offset|
+ // variable to split up the array so that the last socket to be signaled
+ // becomes the last socket in the array and all the other sockets will have
+ // priority the next time WaitMultiple is called.
+ scoped_array<struct pollfd> sockets(new struct pollfd[notifications.size()]);
+ memset(&sockets[0], 0, notifications.size() * sizeof(sockets[0]));
+ size_t index = 0;
+ for (size_t i = wait_offset; i < notifications.size(); ++i) {
+ struct pollfd& fd = sockets[index++];
+ fd.events = POLLIN;
+ fd.fd = notifications[i]->socket_.handle();
+ }
+
+ for (size_t i = 0; i < wait_offset; ++i) {
+ struct pollfd& fd = sockets[index++];
+ fd.events = POLLIN;
+ fd.fd = notifications[i]->socket_.handle();
+ }
+ DCHECK_EQ(index, notifications.size());
+
+ int err = poll(&sockets[0], notifications.size(), -1);
+ if (err != -1) {
+ for (size_t i = 0; i < notifications.size(); ++i) {
+ if (sockets[i].revents) {
+ size_t ret = (i + wait_offset) % notifications.size();
+ DCHECK_EQ(sockets[i].fd, notifications[ret]->socket_.handle());
+ notifications[ret]->Wait();
+ return ret;
+ }
+ }
+ }
+ // Either poll() failed or we failed to find a single socket that was
+ // signaled. Either way continuing will result in undefined behavior.
+ LOG(FATAL) << "poll() failed: " << errno;
+ return -1;
+}
diff --git a/media/audio/cross_process_notification_unittest.cc b/media/audio/cross_process_notification_unittest.cc
new file mode 100644
index 0000000..9ff6196
--- /dev/null
+++ b/media/audio/cross_process_notification_unittest.cc
@@ -0,0 +1,458 @@
+// Copyright (c) 2012 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "base/compiler_specific.h"
+#include "base/logging.h"
+#include "base/shared_memory.h"
+#include "base/stl_util.h"
+#include "base/test/multiprocess_test.h"
+#include "base/threading/platform_thread.h"
+#include "media/audio/cross_process_notification.h"
+#include "testing/gtest/include/gtest/gtest.h"
+#include "testing/multiprocess_func_list.h"
+
+#if defined(OS_POSIX)
+#include <utility> // NOLINT
+#endif
+
+namespace {
+
+// Initializes (ctor) and deletes (dtor) two vectors of pairs of
+// CrossProcessNotification instances.
+class NotificationsOwner {
+ public:
+ // Attempts to create up to |number_of_pairs| number of pairs. Call size()
+ // after construction to find out how many pairs were actually created.
+ explicit NotificationsOwner(size_t number_of_pairs) {
+ CreateMultiplePairs(number_of_pairs);
+ }
+ ~NotificationsOwner() {
+ STLDeleteElements(&a_);
+ STLDeleteElements(&b_);
+ }
+
+ size_t size() const {
+ DCHECK_EQ(a_.size(), b_.size());
+ return a_.size();
+ }
+
+ const CrossProcessNotification::Notifications& a() { return a_; }
+ const CrossProcessNotification::Notifications& b() { return b_; }
+
+ private:
+ void CreateMultiplePairs(size_t count) {
+ a_.resize(count);
+ b_.resize(count);
+ size_t i = 0;
+ for (; i < count; ++i) {
+ a_[i] = new CrossProcessNotification();
+ b_[i] = new CrossProcessNotification();
+ if (!CrossProcessNotification::InitializePair(a_[i], b_[i])) {
+ LOG(WARNING) << "InitializePair failed at " << i;
+ delete a_[i];
+ delete b_[i];
+ break;
+ }
+ }
+ a_.resize(i);
+ b_.resize(i);
+ }
+
+ CrossProcessNotification::Notifications a_;
+ CrossProcessNotification::Notifications b_;
+};
+
+// A simple thread that we'll run two instances of. Both threads get a pointer
+// to the same |shared_data| and use a CrossProcessNotification to control when
+// each thread can read/write.
+class SingleNotifierWorker : public base::PlatformThread::Delegate {
+ public:
+ SingleNotifierWorker(size_t* shared_data, size_t repeats,
+ CrossProcessNotification* notifier)
+ : shared_data_(shared_data), repeats_(repeats),
+ notifier_(notifier) {
+ }
+ virtual ~SingleNotifierWorker() {}
+
+ virtual void ThreadMain() OVERRIDE {
+ for (size_t i = 0; i < repeats_; ++i) {
+ notifier_->Wait();
+ ++(*shared_data_);
+ notifier_->Signal();
+ }
+ }
+
+ private:
+ size_t* shared_data_;
+ size_t repeats_;
+ CrossProcessNotification* notifier_;
+ DISALLOW_COPY_AND_ASSIGN(SingleNotifierWorker);
+};
+
+// Similar to SingleNotifierWorker, except each instance of this class will
+// have >1 instances of CrossProcessNotification to Wait/Signal and an equal
+// amount of |shared_data| that the notifiers control access to.
+class MultiNotifierWorker : public base::PlatformThread::Delegate {
+ public:
+ MultiNotifierWorker(size_t* shared_data, size_t repeats,
+ const CrossProcessNotification::Notifications* notifiers)
+ : shared_data_(shared_data), repeats_(repeats),
+ notifiers_(notifiers) {
+ }
+ virtual ~MultiNotifierWorker() {}
+
+ virtual void ThreadMain() OVERRIDE {
+ CrossProcessNotification::WaitForMultiple waiter(notifiers_);
+ for (size_t i = 0; i < repeats_; ++i) {
+ int signaled = waiter.Wait();
+ ++shared_data_[signaled];
+ (*notifiers_)[signaled]->Signal();
+ }
+ }
+
+ private:
+ size_t* shared_data_;
+ size_t repeats_;
+ const CrossProcessNotification::Notifications* notifiers_;
+ size_t count_;
+ DISALLOW_COPY_AND_ASSIGN(MultiNotifierWorker);
+};
+
+// A fixed array of bool flags. Each flag uses 1 bit. Use sizeof(FlagArray)
+// to determine how much memory you need. The number of flags will therefore
+// be sizeof(FlagArray) * 8.
+// We use 'struct' to signify that this structures represents compiler
+// independent structured data. I.e. you must be able to map this class
+// to a piece of shared memory of size sizeof(FlagArray) and be able to
+// use the class. No vtables etc.
+// TODO(tommi): Move this to its own header when we start using it for signaling
+// audio devices. As is, it's just here for perf comparison against the
+// "multiple notifiers" approach.
+struct FlagArray {
+ public:
+ FlagArray() : flags_() {}
+
+ bool is_set(size_t index) const {
+ return (flags_[index >> 5] & (1 << (index & 31))) ? true : false;
+ }
+
+ void set(size_t index) {
+ flags_[index >> 5] |= (1U << (static_cast<uint32>(index) & 31));
+ }
+
+ void clear(size_t index) {
+ flags_[index >> 5] &= ~(1U << (static_cast<uint32>(index) & 31));
+ }
+
+ // Returns the number of flags that can be set/checked.
+ size_t size() const { return sizeof(flags_) * 8; }
+
+ private:
+ // 256 * 32 = 8192 flags in 1KB.
+ uint32 flags_[256];
+ DISALLOW_COPY_AND_ASSIGN(FlagArray);
+};
+
+class MultiNotifierWorkerFlagArray : public base::PlatformThread::Delegate {
+ public:
+ MultiNotifierWorkerFlagArray(size_t count, FlagArray* signals,
+ size_t* shared_data, size_t repeats,
+ CrossProcessNotification* notifier)
+ : count_(count), signals_(signals), shared_data_(shared_data),
+ repeats_(repeats), notifier_(notifier) {
+ }
+ virtual ~MultiNotifierWorkerFlagArray() {}
+
+ virtual void ThreadMain() OVERRIDE {
+ for (size_t i = 0; i < repeats_; ++i) {
+ notifier_->Wait();
+ for (size_t s = 0; s < count_; ++s) {
+ if (signals_->is_set(s)) {
+ ++shared_data_[s];
+ // We don't clear the flag here but simply leave it signaled because
+ // we want the other thread to also increment this variable.
+ }
+ }
+ notifier_->Signal();
+ }
+ }
+
+ private:
+ size_t count_;
+ FlagArray* signals_;
+ size_t* shared_data_;
+ size_t repeats_;
+ CrossProcessNotification* notifier_;
+ DISALLOW_COPY_AND_ASSIGN(MultiNotifierWorkerFlagArray);
+};
+
+} // end namespace
+
+TEST(CrossProcessNotification, FlagArray) {
+ FlagArray flags;
+ EXPECT_GT(flags.size(), 1000U);
+ for (size_t i = 0; i < flags.size(); ++i) {
+ EXPECT_FALSE(flags.is_set(i));
+ flags.set(i);
+ EXPECT_TRUE(flags.is_set(i));
+ flags.clear(i);
+ EXPECT_FALSE(flags.is_set(i));
+ }
+}
+
+// Initializes two notifiers, signals the each one and make sure the others
+// wait is satisfied.
+TEST(CrossProcessNotification, Basic) {
+ CrossProcessNotification a, b;
+ ASSERT_TRUE(CrossProcessNotification::InitializePair(&a, &b));
+ EXPECT_TRUE(a.IsValid());
+ EXPECT_TRUE(b.IsValid());
+
+ a.Signal();
+ b.Wait();
+
+ b.Signal();
+ a.Wait();
+}
+
+// Spins two worker threads, each with their own CrossProcessNotification
+// that they use to read and write from a shared memory buffer.
+TEST(CrossProcessNotification, TwoThreads) {
+ CrossProcessNotification a, b;
+ ASSERT_TRUE(CrossProcessNotification::InitializePair(&a, &b));
+
+ size_t data = 0;
+ const size_t kRepeats = 10000;
+ SingleNotifierWorker worker1(&data, kRepeats, &a);
+ SingleNotifierWorker worker2(&data, kRepeats, &b);
+ base::PlatformThreadHandle thread1, thread2;
+ base::PlatformThread::Create(0, &worker1, &thread1);
+ base::PlatformThread::Create(0, &worker2, &thread2);
+
+ // Start the first thread. They should ping pong a few times and take turns
+ // incrementing the shared variable and never step on each other's toes.
+ a.Signal();
+
+ base::PlatformThread::Join(thread1);
+ base::PlatformThread::Join(thread2);
+
+ EXPECT_EQ(kRepeats * 2, data);
+}
+
+// Uses a pair of threads to access up to 1000 pieces of synchronized shared
+// data. On regular dev machines, the number of notifiers should be 1000, but on
+// mac and linux bots, the number will be smaller due to the RLIMIT_NOFILE
+// limit. Specifically, linux will have this limit at 1024 which means for this
+// test that the max number of notifiers will be in the range 500-512. On Mac
+// the limit is 256, so |count| will be ~120. Oh, and raising the limit via
+// setrlimit() won't work.
+TEST(CrossProcessNotification, ThousandNotifiersTwoThreads) {
+ const size_t kCount = 1000;
+ NotificationsOwner pairs(kCount);
+ size_t data[kCount] = {0};
+ // We use a multiple of the count so that the division in the check below
+ // will be nice and round.
+ size_t repeats = pairs.size() * 1;
+
+ MultiNotifierWorker worker_1(&data[0], repeats, &pairs.a());
+ MultiNotifierWorker worker_2(&data[0], repeats, &pairs.b());
+ base::PlatformThreadHandle thread_1, thread_2;
+ base::PlatformThread::Create(0, &worker_1, &thread_1);
+ base::PlatformThread::Create(0, &worker_2, &thread_2);
+
+ for (size_t i = 0; i < pairs.size(); ++i)
+ pairs.a()[i]->Signal();
+
+ base::PlatformThread::Join(thread_1);
+ base::PlatformThread::Join(thread_2);
+
+ size_t expected_total = pairs.size() * 2;
+ size_t total = 0;
+ for (size_t i = 0; i < pairs.size(); ++i) {
+ // The CrossProcessNotification::WaitForMultiple class should have ensured
+ // that all notifiers had the same quality of service.
+ EXPECT_EQ(expected_total / pairs.size(), data[i]);
+ total += data[i];
+ }
+ EXPECT_EQ(expected_total, total);
+}
+
+// Functionally equivalent (as far as the shared data goes) to the
+// ThousandNotifiersTwoThreads test but uses a single pair of notifiers +
+// FlagArray for the 1000 signals. This approach is significantly faster.
+TEST(CrossProcessNotification, TwoNotifiersTwoThreads1000Signals) {
+ CrossProcessNotification a, b;
+ ASSERT_TRUE(CrossProcessNotification::InitializePair(&a, &b));
+
+ const size_t kCount = 1000;
+ FlagArray signals;
+ ASSERT_GE(signals.size(), kCount);
+ size_t data[kCount] = {0};
+
+ // Since this algorithm checks all events each time the notifier is
+ // signaled, |repeat| doesn't mean the same thing here as it does in
+ // ThousandNotifiersTwoThreads. 1 repeat here is the same as kCount
+ // repeats in ThousandNotifiersTwoThreads.
+ size_t repeats = 1;
+ MultiNotifierWorkerFlagArray worker1(kCount, &signals, &data[0], repeats, &a);
+ MultiNotifierWorkerFlagArray worker2(kCount, &signals, &data[0], repeats, &b);
+ base::PlatformThreadHandle thread1, thread2;
+ base::PlatformThread::Create(0, &worker1, &thread1);
+ base::PlatformThread::Create(0, &worker2, &thread2);
+
+ for (size_t i = 0; i < kCount; ++i)
+ signals.set(i);
+ a.Signal();
+
+ base::PlatformThread::Join(thread1);
+ base::PlatformThread::Join(thread2);
+
+ size_t expected_total = kCount * 2;
+ size_t total = 0;
+ for (size_t i = 0; i < kCount; ++i) {
+ // Since for each signal, we process all signaled events, the shared data
+ // variables should all be equal.
+ EXPECT_EQ(expected_total / kCount, data[i]);
+ total += data[i];
+ }
+ EXPECT_EQ(expected_total, total);
+}
+
+// Test the maximum number of notifiers without spinning further wait
+// threads on Windows. This test assumes we can always create 64 pairs and
+// bails if we can't.
+TEST(CrossProcessNotification, MultipleWaits64) {
+ const size_t kCount = 64;
+ NotificationsOwner pairs(kCount);
+ ASSERT_TRUE(pairs.size() == kCount);
+
+ CrossProcessNotification::WaitForMultiple waiter(&pairs.b());
+ for (size_t i = 0; i < kCount; ++i) {
+ pairs.a()[i]->Signal();
+ int index = waiter.Wait();
+ EXPECT_EQ(i, static_cast<size_t>(index));
+ }
+}
+
+// Tests waiting for more notifiers than the OS supports on one thread.
+// The test will create at most 1000 pairs, but on mac/linux bots the actual
+// number will be lower. See comment about the RLIMIT_NOFILE limit above for
+// more details.
+TEST(CrossProcessNotification, MultipleWaits1000) {
+ // A 1000 notifiers requires 16 threads on Windows, including the current
+ // one, to perform the wait operation.
+ const size_t kCount = 1000;
+ NotificationsOwner pairs(kCount);
+
+ for (size_t i = 0; i < pairs.size(); ++i) {
+ pairs.a()[i]->Signal();
+ // To disable the load distribution algorithm and force the extra worker
+ // thread(s) to catch the signaled event, we define the |waiter| inside
+ // the loop.
+ CrossProcessNotification::WaitForMultiple waiter(&pairs.b());
+ int index = waiter.Wait();
+ EXPECT_EQ(i, static_cast<size_t>(index));
+ }
+}
+
+class CrossProcessNotificationMultiProcessTest : public base::MultiProcessTest {
+ public:
+ static const char kSharedMemName[];
+ static const size_t kSharedMemSize = 1024;
+
+ protected:
+ virtual void SetUp() OVERRIDE {
+ base::MultiProcessTest::SetUp();
+ }
+
+ virtual void TearDown() OVERRIDE {
+ base::MultiProcessTest::TearDown();
+ }
+};
+
+// static
+const char CrossProcessNotificationMultiProcessTest::kSharedMemName[] =
+ "CrossProcessNotificationMultiProcessTest";
+
+namespace {
+// A very crude IPC mechanism that we use to set up the spawned child process
+// and the parent process.
+struct CrudeIpc {
+ uint8 ready;
+ CrossProcessNotification::IPCHandle handle_1;
+ CrossProcessNotification::IPCHandle handle_2;
+};
+} // end namespace
+
+// The main routine of the child process. Waits for the parent process
+// to copy handles over to the child and then uses a CrossProcessNotification to
+// wait and signal to the parent process.
+MULTIPROCESS_TEST_MAIN(CrossProcessNotificationChildMain) {
+ base::SharedMemory mem;
+ bool ok = mem.CreateNamed(
+ CrossProcessNotificationMultiProcessTest::kSharedMemName,
+ true,
+ CrossProcessNotificationMultiProcessTest::kSharedMemSize);
+ DCHECK(ok);
+ if (!ok) {
+ LOG(ERROR) << "Failed to open shared memory segment.";
+ return -1;
+ }
+
+ mem.Map(CrossProcessNotificationMultiProcessTest::kSharedMemSize);
+ CrudeIpc* ipc = reinterpret_cast<CrudeIpc*>(mem.memory());
+
+ while (!ipc->ready)
+ base::PlatformThread::Sleep(10);
+
+ CrossProcessNotification notifier(ipc->handle_1, ipc->handle_2);
+ notifier.Wait();
+ notifier.Signal();
+
+ return 0;
+}
+
+// Spawns a new process and hands a CrossProcessNotification instance to the
+// new process. Once that's done, it waits for the child process to signal
+// it's end and quits.
+TEST_F(CrossProcessNotificationMultiProcessTest, Basic) {
+ base::SharedMemory mem;
+ mem.Delete(kSharedMemName); // In case a previous run was unsuccessful.
+ bool ok = mem.CreateNamed(kSharedMemName, false, kSharedMemSize);
+ ASSERT_TRUE(ok);
+
+ ASSERT_TRUE(mem.Map(kSharedMemSize));
+
+ CrossProcessNotification a, b;
+ ASSERT_TRUE(CrossProcessNotification::InitializePair(&a, &b));
+ EXPECT_TRUE(a.IsValid());
+ EXPECT_TRUE(b.IsValid());
+
+ CrudeIpc* ipc = reinterpret_cast<CrudeIpc*>(mem.memory());
+ ipc->ready = false;
+
+#if defined(OS_POSIX)
+ const int kPosixChildSocket = 20;
+ EXPECT_TRUE(b.ShareToProcess(NULL, &ipc->handle_1, &ipc->handle_2));
+ base::FileHandleMappingVector fd_mapping_vec;
+ fd_mapping_vec.push_back(std::pair<int, int>(ipc->handle_1.fd,
+ kPosixChildSocket));
+ ipc->handle_1.fd = kPosixChildSocket;
+ base::ProcessHandle process = SpawnChild("CrossProcessNotificationChildMain",
+ fd_mapping_vec, false);
+#else
+ base::ProcessHandle process = SpawnChild("CrossProcessNotificationChildMain",
+ false);
+ EXPECT_TRUE(b.ShareToProcess(process, &ipc->handle_1, &ipc->handle_2));
+#endif
+
+ ipc->ready = true;
+
+ a.Signal();
+ a.Wait();
+
+ int exit_code = -1;
+ base::WaitForExitCode(process, &exit_code);
+ EXPECT_EQ(0, exit_code);
+}
diff --git a/media/audio/cross_process_notification_win.cc b/media/audio/cross_process_notification_win.cc
new file mode 100644
index 0000000..b454cbf
--- /dev/null
+++ b/media/audio/cross_process_notification_win.cc
@@ -0,0 +1,268 @@
+// Copyright (c) 2012 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "media/audio/cross_process_notification.h"
+
+#include "base/logging.h"
+#include "base/memory/scoped_ptr.h"
+#include "base/threading/platform_thread.h"
+#include "base/win/scoped_handle.h"
+
+CrossProcessNotification::~CrossProcessNotification() {}
+
+CrossProcessNotification::CrossProcessNotification(IPCHandle handle_1,
+ IPCHandle handle_2)
+ : mine_(handle_1), other_(handle_2) {
+ DCHECK(IsValid());
+}
+
+void CrossProcessNotification::Signal() {
+ DCHECK(IsValid());
+ DCHECK_EQ(::WaitForSingleObject(mine_, 0), static_cast<DWORD>(WAIT_TIMEOUT))
+ << "Are you calling Signal() without calling Wait() first?";
+ BOOL ok = ::SetEvent(mine_);
+ CHECK(ok);
+}
+
+void CrossProcessNotification::Wait() {
+ DCHECK(IsValid());
+ DWORD wait = ::WaitForSingleObject(other_, INFINITE);
+ DCHECK_EQ(wait, WAIT_OBJECT_0);
+ BOOL ok = ::ResetEvent(other_);
+ CHECK(ok);
+}
+
+bool CrossProcessNotification::IsValid() const {
+ return mine_.IsValid() && other_.IsValid();
+}
+
+bool CrossProcessNotification::ShareToProcess(base::ProcessHandle process,
+ IPCHandle* handle_1,
+ IPCHandle* handle_2) {
+ DCHECK(IsValid());
+ HANDLE our_process = ::GetCurrentProcess();
+ if (!::DuplicateHandle(our_process, mine_, process, handle_1, 0, FALSE,
+ DUPLICATE_SAME_ACCESS)) {
+ return false;
+ }
+
+ if (!::DuplicateHandle(our_process, other_, process, handle_2, 0, FALSE,
+ DUPLICATE_SAME_ACCESS)) {
+ // In case we're sharing to ourselves, we can close the handle, but
+ // if the target process is a different process, we do nothing.
+ if (process == our_process)
+ ::CloseHandle(*handle_1);
+ *handle_1 = NULL;
+ return false;
+ }
+
+ return true;
+}
+
+// static
+bool CrossProcessNotification::InitializePair(CrossProcessNotification* a,
+ CrossProcessNotification* b) {
+ DCHECK(!a->IsValid());
+ DCHECK(!b->IsValid());
+
+ bool success = false;
+
+ // Create two manually resettable events and give each party a handle
+ // to both events.
+ HANDLE event_a = ::CreateEvent(NULL, TRUE, FALSE, NULL);
+ HANDLE event_b = ::CreateEvent(NULL, TRUE, FALSE, NULL);
+ if (event_a && event_b) {
+ a->mine_.Set(event_a);
+ a->other_.Set(event_b);
+ success = a->ShareToProcess(GetCurrentProcess(), &event_a, &event_b);
+ if (success) {
+ b->mine_.Set(event_b);
+ b->other_.Set(event_a);
+ } else {
+ a->mine_.Close();
+ a->other_.Close();
+ }
+ } else {
+ if (event_a)
+ ::CloseHandle(event_a);
+ if (event_b)
+ ::CloseHandle(event_b);
+ }
+
+ DCHECK(!success || a->IsValid());
+ DCHECK(!success || b->IsValid());
+
+ return success;
+}
+
+namespace {
+class ExtraWaitThread : public base::PlatformThread::Delegate {
+ public:
+ ExtraWaitThread(HANDLE stop, HANDLE* events, size_t count,
+ int* signaled_event)
+ : stop_(stop), events_(events), count_(count),
+ signaled_event_(signaled_event) {
+ *signaled_event_ = -1;
+ }
+ virtual ~ExtraWaitThread() {}
+
+ virtual void ThreadMain() OVERRIDE {
+ // Store the |stop_| event as the first event.
+ HANDLE events[MAXIMUM_WAIT_OBJECTS] = { stop_ };
+ HANDLE next_thread = NULL;
+ DWORD event_count = MAXIMUM_WAIT_OBJECTS;
+ int thread_signaled_event = -1;
+ scoped_ptr<ExtraWaitThread> extra_wait_thread;
+ if (count_ > (MAXIMUM_WAIT_OBJECTS - 1)) {
+ std::copy(&events_[0], &events_[MAXIMUM_WAIT_OBJECTS - 2], &events[1]);
+
+ extra_wait_thread.reset(new ExtraWaitThread(stop_,
+ &events_[MAXIMUM_WAIT_OBJECTS - 2],
+ count_ - (MAXIMUM_WAIT_OBJECTS - 2),
+ &thread_signaled_event));
+ base::PlatformThread::Create(0, extra_wait_thread.get(), &next_thread);
+
+ event_count = MAXIMUM_WAIT_OBJECTS;
+ events[MAXIMUM_WAIT_OBJECTS - 1] = next_thread;
+ } else {
+ std::copy(&events_[0], &events_[count_], &events[1]);
+ event_count = count_ + 1;
+ }
+
+ DWORD wait = ::WaitForMultipleObjects(event_count, &events[0], FALSE,
+ INFINITE);
+ if (wait >= WAIT_OBJECT_0 && wait < (WAIT_OBJECT_0 + event_count)) {
+ wait -= WAIT_OBJECT_0;
+ if (wait == 0) {
+ // The stop event was signaled. Check if it was signaled by a
+ // sub thread. In case our sub thread had to spin another thread (and
+ // so on), we must wait for ours to exit before we can check the
+ // propagated event offset.
+ if (next_thread) {
+ base::PlatformThread::Join(next_thread);
+ next_thread = NULL;
+ }
+ if (thread_signaled_event != -1)
+ *signaled_event_ = thread_signaled_event + (MAXIMUM_WAIT_OBJECTS - 2);
+ } else if (events[wait] == next_thread) {
+ NOTREACHED();
+ } else {
+ *signaled_event_ = static_cast<int>(wait);
+ SetEvent(stop_);
+ }
+ } else {
+ NOTREACHED();
+ }
+
+ if (next_thread)
+ base::PlatformThread::Join(next_thread);
+ }
+
+ private:
+ HANDLE stop_;
+ HANDLE* events_;
+ size_t count_;
+ int* signaled_event_;
+ DISALLOW_COPY_AND_ASSIGN(ExtraWaitThread);
+};
+} // end namespace
+
+// static
+int CrossProcessNotification::WaitMultiple(const Notifications& notifications,
+ size_t wait_offset) {
+ DCHECK_LT(wait_offset, notifications.size());
+
+ for (size_t i = 0; i < notifications.size(); ++i) {
+ DCHECK(notifications[i]->IsValid());
+ }
+
+ // TODO(tommi): Should we wait in an alertable state so that we can be
+ // canceled via an APC?
+ scoped_array<HANDLE> handles(new HANDLE[notifications.size()]);
+
+ // Because of the way WaitForMultipleObjects works, we do a little trick here.
+ // When multiple events are signaled, WaitForMultipleObjects will return the
+ // index of the first signaled item (lowest). This means that if we always
+ // pass the array the same way to WaitForMultipleObjects, the objects that
+ // come first, have higher priority. In times of heavy load, this will cause
+ // elements at the back to become DOS-ed.
+ // So, we store the location of the item that was last signaled. Then we split
+ // up the array and move everything higher than the last signaled index to the
+ // front and the rest to the back (meaning that the last signaled item will
+ // become the last element in the list).
+ // Assuming equally busy events, this approach distributes the priority
+ // evenly.
+
+ size_t index = 0;
+ for (size_t i = wait_offset; i < notifications.size(); ++i)
+ handles[index++] = notifications[i]->other_;
+
+ for (size_t i = 0; i < wait_offset; ++i)
+ handles[index++] = notifications[i]->other_;
+ DCHECK_EQ(index, notifications.size());
+
+ DWORD wait = WAIT_FAILED;
+ bool wait_failed = false;
+ if (notifications.size() <= MAXIMUM_WAIT_OBJECTS) {
+ wait = ::WaitForMultipleObjects(notifications.size(), &handles[0], FALSE,
+ INFINITE);
+ wait_failed = wait < WAIT_OBJECT_0 ||
+ wait >= (WAIT_OBJECT_0 + MAXIMUM_WAIT_OBJECTS);
+ } else {
+ // Used to stop the other wait threads when an event has been signaled.
+ base::win::ScopedHandle stop(::CreateEvent(NULL, TRUE, FALSE, NULL));
+
+ // Create the first thread and pass a pointer to all handles >63
+ // to the thread + 'stop'. Then implement the thread so that it checks
+ // if the number of handles is > 63. If so, spawns a new thread and
+ // passes >62 handles to that thread and waits for the 62 handles + stop +
+ // next thread. etc etc.
+
+ // Create a list of threads so that each thread waits on at most 62 events
+ // including one event for when a child thread signals completion and one
+ // event for when all of the threads must be stopped (due to some event
+ // being signaled).
+
+ int thread_signaled_event = -1;
+ ExtraWaitThread wait_thread(stop, &handles[MAXIMUM_WAIT_OBJECTS - 1],
+ notifications.size() - (MAXIMUM_WAIT_OBJECTS - 1),
+ &thread_signaled_event);
+ base::PlatformThreadHandle thread;
+ base::PlatformThread::Create(0, &wait_thread, &thread);
+ HANDLE events[MAXIMUM_WAIT_OBJECTS];
+ std::copy(&handles[0], &handles[MAXIMUM_WAIT_OBJECTS - 1], &events[0]);
+ events[MAXIMUM_WAIT_OBJECTS - 1] = thread;
+ wait = ::WaitForMultipleObjects(MAXIMUM_WAIT_OBJECTS, &events[0], FALSE,
+ INFINITE);
+ wait_failed = wait < WAIT_OBJECT_0 ||
+ wait >= (WAIT_OBJECT_0 + MAXIMUM_WAIT_OBJECTS);
+ if (wait == WAIT_OBJECT_0 + (MAXIMUM_WAIT_OBJECTS - 1)) {
+ if (thread_signaled_event < 0) {
+ wait_failed = true;
+ NOTREACHED();
+ } else {
+ wait = WAIT_OBJECT_0 + (MAXIMUM_WAIT_OBJECTS - 2) +
+ thread_signaled_event;
+ }
+ } else {
+ ::SetEvent(stop);
+ }
+ base::PlatformThread::Join(thread);
+ }
+
+ int ret = -1;
+ if (!wait_failed) {
+ // Subtract to be politically correct (WAIT_OBJECT_0 is actually 0).
+ wait -= WAIT_OBJECT_0;
+ BOOL ok = ::ResetEvent(handles[wait]);
+ CHECK(ok);
+ ret = (wait + wait_offset) % notifications.size();
+ DCHECK_EQ(handles[wait], notifications[ret]->other_.Get());
+ } else {
+ NOTREACHED();
+ }
+
+ CHECK_NE(ret, -1);
+ return ret;
+}