diff options
author | tommi@chromium.org <tommi@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2012-03-27 20:37:55 +0000 |
---|---|---|
committer | tommi@chromium.org <tommi@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2012-03-27 20:37:55 +0000 |
commit | 409a01172dc2b4721f4817fee405ddd6933678a6 (patch) | |
tree | 3cd8217d5551e4ccf89609f26f91fc9d7ba548c0 /media/audio | |
parent | efbd89cd06131ca00c1b7ea08755f287ec485625 (diff) | |
download | chromium_src-409a01172dc2b4721f4817fee405ddd6933678a6.zip chromium_src-409a01172dc2b4721f4817fee405ddd6933678a6.tar.gz chromium_src-409a01172dc2b4721f4817fee405ddd6933678a6.tar.bz2 |
Add a CrossProcessNotification class.
This class will be used to synchronize multiple audio objects with minimal thread and IPC socket usage.
TEST=Several tests included. Run media unittests.
BUG=114699
Review URL: https://chromiumcodereview.appspot.com/9605015
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@129263 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'media/audio')
-rw-r--r-- | media/audio/cross_process_notification.cc | 30 | ||||
-rw-r--r-- | media/audio/cross_process_notification.h | 172 | ||||
-rw-r--r-- | media/audio/cross_process_notification_posix.cc | 114 | ||||
-rw-r--r-- | media/audio/cross_process_notification_unittest.cc | 458 | ||||
-rw-r--r-- | media/audio/cross_process_notification_win.cc | 268 |
5 files changed, 1042 insertions, 0 deletions
diff --git a/media/audio/cross_process_notification.cc b/media/audio/cross_process_notification.cc new file mode 100644 index 0000000..1806f77 --- /dev/null +++ b/media/audio/cross_process_notification.cc @@ -0,0 +1,30 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "media/audio/cross_process_notification.h" + +#include "base/logging.h" +#include "base/memory/scoped_ptr.h" + +CrossProcessNotification::CrossProcessNotification() {} + +CrossProcessNotification::WaitForMultiple::WaitForMultiple( + const Notifications* notifications) { + Reset(notifications); +} + +int CrossProcessNotification::WaitForMultiple::Wait() { + DCHECK(CalledOnValidThread()); + int ret = WaitMultiple(*notifications_, wait_offset_); + wait_offset_ = (ret + 1) % notifications_->size(); + return ret; +} + +void CrossProcessNotification::WaitForMultiple::Reset( + const Notifications* notifications) { + DCHECK(CalledOnValidThread()); + wait_offset_ = 0; + notifications_ = notifications; + DCHECK(!notifications_->empty()); +} diff --git a/media/audio/cross_process_notification.h b/media/audio/cross_process_notification.h new file mode 100644 index 0000000..cae7435 --- /dev/null +++ b/media/audio/cross_process_notification.h @@ -0,0 +1,172 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef MEDIA_AUDIO_CROSS_PROCESS_NOTIFICATION_H_ +#define MEDIA_AUDIO_CROSS_PROCESS_NOTIFICATION_H_ + +#include <vector> + +#include "base/basictypes.h" +#include "base/process.h" +#include "base/threading/non_thread_safe.h" +#include "media/base/media_export.h" + +#if defined(OS_WIN) +#include "base/win/scoped_handle.h" +#else +#include "base/file_descriptor_posix.h" +#include "base/sync_socket.h" +#endif + +// A mechanism to synchronize access to a shared resource between two parties +// when the usage pattern resembles that of two players playing a game of chess. +// Each end has an instance of CrossProcessNotification and calls Signal() when +// it has finished using the shared resource. +// Before accessing the resource, it must call Wait() in order to know when the +// other end has called Signal(). +// +// Here's some pseudo code for how this class can be used: +// +// This method is used by both processes as it's a general way to use the +// shared resource and then grant the privilege to the other process: +// +// void WriteToSharedMemory(CrossProcessNotification* notification, +// SharedMemory* mem, +// const char my_char) { +// notification->Wait(); // Wait for the other process to yield access. +// reinterpret_cast<char*>(mem->memory())[0] = my_char; +// notification->Signal(); // Grant the other process access. +// } +// +// Process A: +// +// class A { +// public: +// void Initialize(base::ProcessHandle process_b) { +// mem_.CreateNamed("foo", false, 1024); +// +// CrossProcessNotification other; +// CHECK(CrossProcessNotification::InitializePair(¬ification_, &other)); +// CrossProcessNotification::IPCHandle handle_1, handle_2; +// CHECK(other.ShareToProcess(process_b, &handle_1, &handle_2)); +// // This could be implemented by using some IPC mechanism +// // such as MessageLoop. +// SendToProcessB(mem_, handle_1, handle_2); +// // Allow process B the first chance to write to the memory: +// notification_.Signal(); +// // Once B is done, we'll write 'A' to the shared memory. +// WriteToSharedMemory(¬ification_, &mem_, 'A'); +// } +// +// CrossProcessNotification notification_; +// SharedMemory mem_; +// }; +// +// Process B: +// +// class B { +// public: +// // Called when we receive the IPC message from A. +// void Initialize(SharedMemoryHandle mem, +// CrossProcessNotification::IPCHandle handle_1, +// CrossProcessNotification::IPCHandle handle_2) { +// mem_.reset(new SharedMemory(mem, false)); +// notification_.reset(new CrossProcessNotification(handle_1, handle_2)); +// WriteToSharedMemory(¬ification_, &mem_, 'B'); +// } +// +// CrossProcessNotification notification_; +// scoped_ptr<SharedMemory> mem_; +// }; +// +class MEDIA_EXPORT CrossProcessNotification { + public: +#if defined(OS_WIN) + typedef HANDLE IPCHandle; +#else + typedef base::FileDescriptor IPCHandle; +#endif + + typedef std::vector<CrossProcessNotification*> Notifications; + + // Default ctor. Initializes a NULL notification. User must call + // InitializePair() to initialize the instance along with a connected one. + CrossProcessNotification(); + + // Ctor for the user that does not call InitializePair but instead receives + // handles from the one that did. These handles come from a call to + // ShareToProcess. + CrossProcessNotification(IPCHandle handle_1, IPCHandle handle_2); + ~CrossProcessNotification(); + + // Raises a signal that the shared resource now can be accessed by the other + // party. + // NOTE: Calling Signal() more than once without calling Wait() in between + // is not a supported scenario and will result in undefined behavior (and + // different depending on platform). + void Signal(); + + // Waits for the other party to finish using the shared resource. + // NOTE: As with Signal(), you must not call Wait() more than once without + // calling Signal() in between. + void Wait(); + + bool IsValid() const; + + // Copies the internal handles to the output parameters, |handle_1| and + // |handle_2|. The operation can fail, so the caller must be prepared to + // handle that case. + bool ShareToProcess(base::ProcessHandle process, IPCHandle* handle_1, + IPCHandle* handle_2); + + // Initializes a pair of CrossProcessNotification instances. Note that this + // can fail (e.g. due to EMFILE on Linux). + static bool InitializePair(CrossProcessNotification* a, + CrossProcessNotification* b); + + // Use an instance of this class when you have to repeatedly wait for multiple + // notifications on the same thread. The class will store information about + // which notification was last signaled and try to distribute the signals so + // that all notifications get a chance to be processed in times of high load + // and a busy one won't starve the others. + // TODO(tommi): Support a way to abort the wait. + class MEDIA_EXPORT WaitForMultiple : + public NON_EXPORTED_BASE(base::NonThreadSafe) { + public: + // Caller must make sure that the lifetime of the array is greater than + // that of the WaitForMultiple instance. + explicit WaitForMultiple(const Notifications* notifications); + + // Waits for any of the notifications to be signaled. Returns the 0 based + // index of a signaled notification. + int Wait(); + + // Call when the array changes. This should be called on the same thread + // as Wait() is called on and the array must never change while a Wait() + // is in progress. + void Reset(const Notifications* notifications); + + private: + const Notifications* notifications_; + size_t wait_offset_; + }; + + private: + // Only called by the WaitForMultiple class. See documentation + // for WaitForMultiple and comments inside WaitMultiple for details. + static int WaitMultiple(const Notifications& notifications, + size_t wait_offset); + +#if defined(OS_WIN) + base::win::ScopedHandle mine_; + base::win::ScopedHandle other_; +#else + typedef base::CancelableSyncSocket SocketClass; + SocketClass socket_; +#endif + + DISALLOW_COPY_AND_ASSIGN(CrossProcessNotification); +}; + +#endif // MEDIA_AUDIO_CROSS_PROCESS_NOTIFICATION_H_ diff --git a/media/audio/cross_process_notification_posix.cc b/media/audio/cross_process_notification_posix.cc new file mode 100644 index 0000000..070ef06 --- /dev/null +++ b/media/audio/cross_process_notification_posix.cc @@ -0,0 +1,114 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "media/audio/cross_process_notification.h" + +#include <errno.h> +#include <sys/poll.h> + +#include "base/logging.h" +#include "base/memory/scoped_ptr.h" +#include "base/file_descriptor_posix.h" + +CrossProcessNotification::~CrossProcessNotification() {} + +CrossProcessNotification::CrossProcessNotification(IPCHandle handle_1, + IPCHandle handle_2) + : socket_(handle_1.fd) { + DCHECK_NE(handle_1.fd, -1); + DCHECK_EQ(handle_2.fd, -1); + DCHECK(IsValid()); +} + +void CrossProcessNotification::Signal() { + DCHECK(IsValid()); + char signal = 1; + size_t bytes = socket_.Send(&signal, sizeof(signal)); + DCHECK_EQ(bytes, 1U) << "errno: " << errno; +} + +void CrossProcessNotification::Wait() { + DCHECK(IsValid()); + char signal = 0; + size_t bytes = socket_.Receive(&signal, sizeof(signal)); + DCHECK_EQ(bytes, 1U) << "errno: " << errno; + DCHECK_EQ(signal, 1); +} + +bool CrossProcessNotification::IsValid() const { + return socket_.handle() != SocketClass::kInvalidHandle; +} + +bool CrossProcessNotification::ShareToProcess(base::ProcessHandle process, + IPCHandle* handle_1, + IPCHandle* handle_2) { + DCHECK(IsValid()); + handle_1->fd = socket_.handle(); + handle_1->auto_close = false; + handle_2->fd = -1; + return true; +} + +// static +bool CrossProcessNotification::InitializePair(CrossProcessNotification* a, + CrossProcessNotification* b) { + DCHECK(!a->IsValid()); + DCHECK(!b->IsValid()); + + bool ok = SocketClass::CreatePair(&a->socket_, &b->socket_); + + DLOG_IF(WARNING, !ok) << "failed to create socket: " << errno; + DCHECK(!ok || a->IsValid()); + DCHECK(!ok || b->IsValid()); + return ok; +} + +// static +int CrossProcessNotification::WaitMultiple(const Notifications& notifications, + size_t wait_offset) { + DCHECK_LT(wait_offset, notifications.size()); + + for (size_t i = 0; i < notifications.size(); ++i) { + DCHECK(notifications[i]->IsValid()); + } + + // Below, we always check the |revents| of the first socket in the array + // and return the index of that socket if set. This can cause sockets + // that come later in the array to starve when the first sockets are + // very busy. So to avoid the starving problem, we use the |wait_offset| + // variable to split up the array so that the last socket to be signaled + // becomes the last socket in the array and all the other sockets will have + // priority the next time WaitMultiple is called. + scoped_array<struct pollfd> sockets(new struct pollfd[notifications.size()]); + memset(&sockets[0], 0, notifications.size() * sizeof(sockets[0])); + size_t index = 0; + for (size_t i = wait_offset; i < notifications.size(); ++i) { + struct pollfd& fd = sockets[index++]; + fd.events = POLLIN; + fd.fd = notifications[i]->socket_.handle(); + } + + for (size_t i = 0; i < wait_offset; ++i) { + struct pollfd& fd = sockets[index++]; + fd.events = POLLIN; + fd.fd = notifications[i]->socket_.handle(); + } + DCHECK_EQ(index, notifications.size()); + + int err = poll(&sockets[0], notifications.size(), -1); + if (err != -1) { + for (size_t i = 0; i < notifications.size(); ++i) { + if (sockets[i].revents) { + size_t ret = (i + wait_offset) % notifications.size(); + DCHECK_EQ(sockets[i].fd, notifications[ret]->socket_.handle()); + notifications[ret]->Wait(); + return ret; + } + } + } + // Either poll() failed or we failed to find a single socket that was + // signaled. Either way continuing will result in undefined behavior. + LOG(FATAL) << "poll() failed: " << errno; + return -1; +} diff --git a/media/audio/cross_process_notification_unittest.cc b/media/audio/cross_process_notification_unittest.cc new file mode 100644 index 0000000..9ff6196 --- /dev/null +++ b/media/audio/cross_process_notification_unittest.cc @@ -0,0 +1,458 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/compiler_specific.h" +#include "base/logging.h" +#include "base/shared_memory.h" +#include "base/stl_util.h" +#include "base/test/multiprocess_test.h" +#include "base/threading/platform_thread.h" +#include "media/audio/cross_process_notification.h" +#include "testing/gtest/include/gtest/gtest.h" +#include "testing/multiprocess_func_list.h" + +#if defined(OS_POSIX) +#include <utility> // NOLINT +#endif + +namespace { + +// Initializes (ctor) and deletes (dtor) two vectors of pairs of +// CrossProcessNotification instances. +class NotificationsOwner { + public: + // Attempts to create up to |number_of_pairs| number of pairs. Call size() + // after construction to find out how many pairs were actually created. + explicit NotificationsOwner(size_t number_of_pairs) { + CreateMultiplePairs(number_of_pairs); + } + ~NotificationsOwner() { + STLDeleteElements(&a_); + STLDeleteElements(&b_); + } + + size_t size() const { + DCHECK_EQ(a_.size(), b_.size()); + return a_.size(); + } + + const CrossProcessNotification::Notifications& a() { return a_; } + const CrossProcessNotification::Notifications& b() { return b_; } + + private: + void CreateMultiplePairs(size_t count) { + a_.resize(count); + b_.resize(count); + size_t i = 0; + for (; i < count; ++i) { + a_[i] = new CrossProcessNotification(); + b_[i] = new CrossProcessNotification(); + if (!CrossProcessNotification::InitializePair(a_[i], b_[i])) { + LOG(WARNING) << "InitializePair failed at " << i; + delete a_[i]; + delete b_[i]; + break; + } + } + a_.resize(i); + b_.resize(i); + } + + CrossProcessNotification::Notifications a_; + CrossProcessNotification::Notifications b_; +}; + +// A simple thread that we'll run two instances of. Both threads get a pointer +// to the same |shared_data| and use a CrossProcessNotification to control when +// each thread can read/write. +class SingleNotifierWorker : public base::PlatformThread::Delegate { + public: + SingleNotifierWorker(size_t* shared_data, size_t repeats, + CrossProcessNotification* notifier) + : shared_data_(shared_data), repeats_(repeats), + notifier_(notifier) { + } + virtual ~SingleNotifierWorker() {} + + virtual void ThreadMain() OVERRIDE { + for (size_t i = 0; i < repeats_; ++i) { + notifier_->Wait(); + ++(*shared_data_); + notifier_->Signal(); + } + } + + private: + size_t* shared_data_; + size_t repeats_; + CrossProcessNotification* notifier_; + DISALLOW_COPY_AND_ASSIGN(SingleNotifierWorker); +}; + +// Similar to SingleNotifierWorker, except each instance of this class will +// have >1 instances of CrossProcessNotification to Wait/Signal and an equal +// amount of |shared_data| that the notifiers control access to. +class MultiNotifierWorker : public base::PlatformThread::Delegate { + public: + MultiNotifierWorker(size_t* shared_data, size_t repeats, + const CrossProcessNotification::Notifications* notifiers) + : shared_data_(shared_data), repeats_(repeats), + notifiers_(notifiers) { + } + virtual ~MultiNotifierWorker() {} + + virtual void ThreadMain() OVERRIDE { + CrossProcessNotification::WaitForMultiple waiter(notifiers_); + for (size_t i = 0; i < repeats_; ++i) { + int signaled = waiter.Wait(); + ++shared_data_[signaled]; + (*notifiers_)[signaled]->Signal(); + } + } + + private: + size_t* shared_data_; + size_t repeats_; + const CrossProcessNotification::Notifications* notifiers_; + size_t count_; + DISALLOW_COPY_AND_ASSIGN(MultiNotifierWorker); +}; + +// A fixed array of bool flags. Each flag uses 1 bit. Use sizeof(FlagArray) +// to determine how much memory you need. The number of flags will therefore +// be sizeof(FlagArray) * 8. +// We use 'struct' to signify that this structures represents compiler +// independent structured data. I.e. you must be able to map this class +// to a piece of shared memory of size sizeof(FlagArray) and be able to +// use the class. No vtables etc. +// TODO(tommi): Move this to its own header when we start using it for signaling +// audio devices. As is, it's just here for perf comparison against the +// "multiple notifiers" approach. +struct FlagArray { + public: + FlagArray() : flags_() {} + + bool is_set(size_t index) const { + return (flags_[index >> 5] & (1 << (index & 31))) ? true : false; + } + + void set(size_t index) { + flags_[index >> 5] |= (1U << (static_cast<uint32>(index) & 31)); + } + + void clear(size_t index) { + flags_[index >> 5] &= ~(1U << (static_cast<uint32>(index) & 31)); + } + + // Returns the number of flags that can be set/checked. + size_t size() const { return sizeof(flags_) * 8; } + + private: + // 256 * 32 = 8192 flags in 1KB. + uint32 flags_[256]; + DISALLOW_COPY_AND_ASSIGN(FlagArray); +}; + +class MultiNotifierWorkerFlagArray : public base::PlatformThread::Delegate { + public: + MultiNotifierWorkerFlagArray(size_t count, FlagArray* signals, + size_t* shared_data, size_t repeats, + CrossProcessNotification* notifier) + : count_(count), signals_(signals), shared_data_(shared_data), + repeats_(repeats), notifier_(notifier) { + } + virtual ~MultiNotifierWorkerFlagArray() {} + + virtual void ThreadMain() OVERRIDE { + for (size_t i = 0; i < repeats_; ++i) { + notifier_->Wait(); + for (size_t s = 0; s < count_; ++s) { + if (signals_->is_set(s)) { + ++shared_data_[s]; + // We don't clear the flag here but simply leave it signaled because + // we want the other thread to also increment this variable. + } + } + notifier_->Signal(); + } + } + + private: + size_t count_; + FlagArray* signals_; + size_t* shared_data_; + size_t repeats_; + CrossProcessNotification* notifier_; + DISALLOW_COPY_AND_ASSIGN(MultiNotifierWorkerFlagArray); +}; + +} // end namespace + +TEST(CrossProcessNotification, FlagArray) { + FlagArray flags; + EXPECT_GT(flags.size(), 1000U); + for (size_t i = 0; i < flags.size(); ++i) { + EXPECT_FALSE(flags.is_set(i)); + flags.set(i); + EXPECT_TRUE(flags.is_set(i)); + flags.clear(i); + EXPECT_FALSE(flags.is_set(i)); + } +} + +// Initializes two notifiers, signals the each one and make sure the others +// wait is satisfied. +TEST(CrossProcessNotification, Basic) { + CrossProcessNotification a, b; + ASSERT_TRUE(CrossProcessNotification::InitializePair(&a, &b)); + EXPECT_TRUE(a.IsValid()); + EXPECT_TRUE(b.IsValid()); + + a.Signal(); + b.Wait(); + + b.Signal(); + a.Wait(); +} + +// Spins two worker threads, each with their own CrossProcessNotification +// that they use to read and write from a shared memory buffer. +TEST(CrossProcessNotification, TwoThreads) { + CrossProcessNotification a, b; + ASSERT_TRUE(CrossProcessNotification::InitializePair(&a, &b)); + + size_t data = 0; + const size_t kRepeats = 10000; + SingleNotifierWorker worker1(&data, kRepeats, &a); + SingleNotifierWorker worker2(&data, kRepeats, &b); + base::PlatformThreadHandle thread1, thread2; + base::PlatformThread::Create(0, &worker1, &thread1); + base::PlatformThread::Create(0, &worker2, &thread2); + + // Start the first thread. They should ping pong a few times and take turns + // incrementing the shared variable and never step on each other's toes. + a.Signal(); + + base::PlatformThread::Join(thread1); + base::PlatformThread::Join(thread2); + + EXPECT_EQ(kRepeats * 2, data); +} + +// Uses a pair of threads to access up to 1000 pieces of synchronized shared +// data. On regular dev machines, the number of notifiers should be 1000, but on +// mac and linux bots, the number will be smaller due to the RLIMIT_NOFILE +// limit. Specifically, linux will have this limit at 1024 which means for this +// test that the max number of notifiers will be in the range 500-512. On Mac +// the limit is 256, so |count| will be ~120. Oh, and raising the limit via +// setrlimit() won't work. +TEST(CrossProcessNotification, ThousandNotifiersTwoThreads) { + const size_t kCount = 1000; + NotificationsOwner pairs(kCount); + size_t data[kCount] = {0}; + // We use a multiple of the count so that the division in the check below + // will be nice and round. + size_t repeats = pairs.size() * 1; + + MultiNotifierWorker worker_1(&data[0], repeats, &pairs.a()); + MultiNotifierWorker worker_2(&data[0], repeats, &pairs.b()); + base::PlatformThreadHandle thread_1, thread_2; + base::PlatformThread::Create(0, &worker_1, &thread_1); + base::PlatformThread::Create(0, &worker_2, &thread_2); + + for (size_t i = 0; i < pairs.size(); ++i) + pairs.a()[i]->Signal(); + + base::PlatformThread::Join(thread_1); + base::PlatformThread::Join(thread_2); + + size_t expected_total = pairs.size() * 2; + size_t total = 0; + for (size_t i = 0; i < pairs.size(); ++i) { + // The CrossProcessNotification::WaitForMultiple class should have ensured + // that all notifiers had the same quality of service. + EXPECT_EQ(expected_total / pairs.size(), data[i]); + total += data[i]; + } + EXPECT_EQ(expected_total, total); +} + +// Functionally equivalent (as far as the shared data goes) to the +// ThousandNotifiersTwoThreads test but uses a single pair of notifiers + +// FlagArray for the 1000 signals. This approach is significantly faster. +TEST(CrossProcessNotification, TwoNotifiersTwoThreads1000Signals) { + CrossProcessNotification a, b; + ASSERT_TRUE(CrossProcessNotification::InitializePair(&a, &b)); + + const size_t kCount = 1000; + FlagArray signals; + ASSERT_GE(signals.size(), kCount); + size_t data[kCount] = {0}; + + // Since this algorithm checks all events each time the notifier is + // signaled, |repeat| doesn't mean the same thing here as it does in + // ThousandNotifiersTwoThreads. 1 repeat here is the same as kCount + // repeats in ThousandNotifiersTwoThreads. + size_t repeats = 1; + MultiNotifierWorkerFlagArray worker1(kCount, &signals, &data[0], repeats, &a); + MultiNotifierWorkerFlagArray worker2(kCount, &signals, &data[0], repeats, &b); + base::PlatformThreadHandle thread1, thread2; + base::PlatformThread::Create(0, &worker1, &thread1); + base::PlatformThread::Create(0, &worker2, &thread2); + + for (size_t i = 0; i < kCount; ++i) + signals.set(i); + a.Signal(); + + base::PlatformThread::Join(thread1); + base::PlatformThread::Join(thread2); + + size_t expected_total = kCount * 2; + size_t total = 0; + for (size_t i = 0; i < kCount; ++i) { + // Since for each signal, we process all signaled events, the shared data + // variables should all be equal. + EXPECT_EQ(expected_total / kCount, data[i]); + total += data[i]; + } + EXPECT_EQ(expected_total, total); +} + +// Test the maximum number of notifiers without spinning further wait +// threads on Windows. This test assumes we can always create 64 pairs and +// bails if we can't. +TEST(CrossProcessNotification, MultipleWaits64) { + const size_t kCount = 64; + NotificationsOwner pairs(kCount); + ASSERT_TRUE(pairs.size() == kCount); + + CrossProcessNotification::WaitForMultiple waiter(&pairs.b()); + for (size_t i = 0; i < kCount; ++i) { + pairs.a()[i]->Signal(); + int index = waiter.Wait(); + EXPECT_EQ(i, static_cast<size_t>(index)); + } +} + +// Tests waiting for more notifiers than the OS supports on one thread. +// The test will create at most 1000 pairs, but on mac/linux bots the actual +// number will be lower. See comment about the RLIMIT_NOFILE limit above for +// more details. +TEST(CrossProcessNotification, MultipleWaits1000) { + // A 1000 notifiers requires 16 threads on Windows, including the current + // one, to perform the wait operation. + const size_t kCount = 1000; + NotificationsOwner pairs(kCount); + + for (size_t i = 0; i < pairs.size(); ++i) { + pairs.a()[i]->Signal(); + // To disable the load distribution algorithm and force the extra worker + // thread(s) to catch the signaled event, we define the |waiter| inside + // the loop. + CrossProcessNotification::WaitForMultiple waiter(&pairs.b()); + int index = waiter.Wait(); + EXPECT_EQ(i, static_cast<size_t>(index)); + } +} + +class CrossProcessNotificationMultiProcessTest : public base::MultiProcessTest { + public: + static const char kSharedMemName[]; + static const size_t kSharedMemSize = 1024; + + protected: + virtual void SetUp() OVERRIDE { + base::MultiProcessTest::SetUp(); + } + + virtual void TearDown() OVERRIDE { + base::MultiProcessTest::TearDown(); + } +}; + +// static +const char CrossProcessNotificationMultiProcessTest::kSharedMemName[] = + "CrossProcessNotificationMultiProcessTest"; + +namespace { +// A very crude IPC mechanism that we use to set up the spawned child process +// and the parent process. +struct CrudeIpc { + uint8 ready; + CrossProcessNotification::IPCHandle handle_1; + CrossProcessNotification::IPCHandle handle_2; +}; +} // end namespace + +// The main routine of the child process. Waits for the parent process +// to copy handles over to the child and then uses a CrossProcessNotification to +// wait and signal to the parent process. +MULTIPROCESS_TEST_MAIN(CrossProcessNotificationChildMain) { + base::SharedMemory mem; + bool ok = mem.CreateNamed( + CrossProcessNotificationMultiProcessTest::kSharedMemName, + true, + CrossProcessNotificationMultiProcessTest::kSharedMemSize); + DCHECK(ok); + if (!ok) { + LOG(ERROR) << "Failed to open shared memory segment."; + return -1; + } + + mem.Map(CrossProcessNotificationMultiProcessTest::kSharedMemSize); + CrudeIpc* ipc = reinterpret_cast<CrudeIpc*>(mem.memory()); + + while (!ipc->ready) + base::PlatformThread::Sleep(10); + + CrossProcessNotification notifier(ipc->handle_1, ipc->handle_2); + notifier.Wait(); + notifier.Signal(); + + return 0; +} + +// Spawns a new process and hands a CrossProcessNotification instance to the +// new process. Once that's done, it waits for the child process to signal +// it's end and quits. +TEST_F(CrossProcessNotificationMultiProcessTest, Basic) { + base::SharedMemory mem; + mem.Delete(kSharedMemName); // In case a previous run was unsuccessful. + bool ok = mem.CreateNamed(kSharedMemName, false, kSharedMemSize); + ASSERT_TRUE(ok); + + ASSERT_TRUE(mem.Map(kSharedMemSize)); + + CrossProcessNotification a, b; + ASSERT_TRUE(CrossProcessNotification::InitializePair(&a, &b)); + EXPECT_TRUE(a.IsValid()); + EXPECT_TRUE(b.IsValid()); + + CrudeIpc* ipc = reinterpret_cast<CrudeIpc*>(mem.memory()); + ipc->ready = false; + +#if defined(OS_POSIX) + const int kPosixChildSocket = 20; + EXPECT_TRUE(b.ShareToProcess(NULL, &ipc->handle_1, &ipc->handle_2)); + base::FileHandleMappingVector fd_mapping_vec; + fd_mapping_vec.push_back(std::pair<int, int>(ipc->handle_1.fd, + kPosixChildSocket)); + ipc->handle_1.fd = kPosixChildSocket; + base::ProcessHandle process = SpawnChild("CrossProcessNotificationChildMain", + fd_mapping_vec, false); +#else + base::ProcessHandle process = SpawnChild("CrossProcessNotificationChildMain", + false); + EXPECT_TRUE(b.ShareToProcess(process, &ipc->handle_1, &ipc->handle_2)); +#endif + + ipc->ready = true; + + a.Signal(); + a.Wait(); + + int exit_code = -1; + base::WaitForExitCode(process, &exit_code); + EXPECT_EQ(0, exit_code); +} diff --git a/media/audio/cross_process_notification_win.cc b/media/audio/cross_process_notification_win.cc new file mode 100644 index 0000000..b454cbf --- /dev/null +++ b/media/audio/cross_process_notification_win.cc @@ -0,0 +1,268 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "media/audio/cross_process_notification.h" + +#include "base/logging.h" +#include "base/memory/scoped_ptr.h" +#include "base/threading/platform_thread.h" +#include "base/win/scoped_handle.h" + +CrossProcessNotification::~CrossProcessNotification() {} + +CrossProcessNotification::CrossProcessNotification(IPCHandle handle_1, + IPCHandle handle_2) + : mine_(handle_1), other_(handle_2) { + DCHECK(IsValid()); +} + +void CrossProcessNotification::Signal() { + DCHECK(IsValid()); + DCHECK_EQ(::WaitForSingleObject(mine_, 0), static_cast<DWORD>(WAIT_TIMEOUT)) + << "Are you calling Signal() without calling Wait() first?"; + BOOL ok = ::SetEvent(mine_); + CHECK(ok); +} + +void CrossProcessNotification::Wait() { + DCHECK(IsValid()); + DWORD wait = ::WaitForSingleObject(other_, INFINITE); + DCHECK_EQ(wait, WAIT_OBJECT_0); + BOOL ok = ::ResetEvent(other_); + CHECK(ok); +} + +bool CrossProcessNotification::IsValid() const { + return mine_.IsValid() && other_.IsValid(); +} + +bool CrossProcessNotification::ShareToProcess(base::ProcessHandle process, + IPCHandle* handle_1, + IPCHandle* handle_2) { + DCHECK(IsValid()); + HANDLE our_process = ::GetCurrentProcess(); + if (!::DuplicateHandle(our_process, mine_, process, handle_1, 0, FALSE, + DUPLICATE_SAME_ACCESS)) { + return false; + } + + if (!::DuplicateHandle(our_process, other_, process, handle_2, 0, FALSE, + DUPLICATE_SAME_ACCESS)) { + // In case we're sharing to ourselves, we can close the handle, but + // if the target process is a different process, we do nothing. + if (process == our_process) + ::CloseHandle(*handle_1); + *handle_1 = NULL; + return false; + } + + return true; +} + +// static +bool CrossProcessNotification::InitializePair(CrossProcessNotification* a, + CrossProcessNotification* b) { + DCHECK(!a->IsValid()); + DCHECK(!b->IsValid()); + + bool success = false; + + // Create two manually resettable events and give each party a handle + // to both events. + HANDLE event_a = ::CreateEvent(NULL, TRUE, FALSE, NULL); + HANDLE event_b = ::CreateEvent(NULL, TRUE, FALSE, NULL); + if (event_a && event_b) { + a->mine_.Set(event_a); + a->other_.Set(event_b); + success = a->ShareToProcess(GetCurrentProcess(), &event_a, &event_b); + if (success) { + b->mine_.Set(event_b); + b->other_.Set(event_a); + } else { + a->mine_.Close(); + a->other_.Close(); + } + } else { + if (event_a) + ::CloseHandle(event_a); + if (event_b) + ::CloseHandle(event_b); + } + + DCHECK(!success || a->IsValid()); + DCHECK(!success || b->IsValid()); + + return success; +} + +namespace { +class ExtraWaitThread : public base::PlatformThread::Delegate { + public: + ExtraWaitThread(HANDLE stop, HANDLE* events, size_t count, + int* signaled_event) + : stop_(stop), events_(events), count_(count), + signaled_event_(signaled_event) { + *signaled_event_ = -1; + } + virtual ~ExtraWaitThread() {} + + virtual void ThreadMain() OVERRIDE { + // Store the |stop_| event as the first event. + HANDLE events[MAXIMUM_WAIT_OBJECTS] = { stop_ }; + HANDLE next_thread = NULL; + DWORD event_count = MAXIMUM_WAIT_OBJECTS; + int thread_signaled_event = -1; + scoped_ptr<ExtraWaitThread> extra_wait_thread; + if (count_ > (MAXIMUM_WAIT_OBJECTS - 1)) { + std::copy(&events_[0], &events_[MAXIMUM_WAIT_OBJECTS - 2], &events[1]); + + extra_wait_thread.reset(new ExtraWaitThread(stop_, + &events_[MAXIMUM_WAIT_OBJECTS - 2], + count_ - (MAXIMUM_WAIT_OBJECTS - 2), + &thread_signaled_event)); + base::PlatformThread::Create(0, extra_wait_thread.get(), &next_thread); + + event_count = MAXIMUM_WAIT_OBJECTS; + events[MAXIMUM_WAIT_OBJECTS - 1] = next_thread; + } else { + std::copy(&events_[0], &events_[count_], &events[1]); + event_count = count_ + 1; + } + + DWORD wait = ::WaitForMultipleObjects(event_count, &events[0], FALSE, + INFINITE); + if (wait >= WAIT_OBJECT_0 && wait < (WAIT_OBJECT_0 + event_count)) { + wait -= WAIT_OBJECT_0; + if (wait == 0) { + // The stop event was signaled. Check if it was signaled by a + // sub thread. In case our sub thread had to spin another thread (and + // so on), we must wait for ours to exit before we can check the + // propagated event offset. + if (next_thread) { + base::PlatformThread::Join(next_thread); + next_thread = NULL; + } + if (thread_signaled_event != -1) + *signaled_event_ = thread_signaled_event + (MAXIMUM_WAIT_OBJECTS - 2); + } else if (events[wait] == next_thread) { + NOTREACHED(); + } else { + *signaled_event_ = static_cast<int>(wait); + SetEvent(stop_); + } + } else { + NOTREACHED(); + } + + if (next_thread) + base::PlatformThread::Join(next_thread); + } + + private: + HANDLE stop_; + HANDLE* events_; + size_t count_; + int* signaled_event_; + DISALLOW_COPY_AND_ASSIGN(ExtraWaitThread); +}; +} // end namespace + +// static +int CrossProcessNotification::WaitMultiple(const Notifications& notifications, + size_t wait_offset) { + DCHECK_LT(wait_offset, notifications.size()); + + for (size_t i = 0; i < notifications.size(); ++i) { + DCHECK(notifications[i]->IsValid()); + } + + // TODO(tommi): Should we wait in an alertable state so that we can be + // canceled via an APC? + scoped_array<HANDLE> handles(new HANDLE[notifications.size()]); + + // Because of the way WaitForMultipleObjects works, we do a little trick here. + // When multiple events are signaled, WaitForMultipleObjects will return the + // index of the first signaled item (lowest). This means that if we always + // pass the array the same way to WaitForMultipleObjects, the objects that + // come first, have higher priority. In times of heavy load, this will cause + // elements at the back to become DOS-ed. + // So, we store the location of the item that was last signaled. Then we split + // up the array and move everything higher than the last signaled index to the + // front and the rest to the back (meaning that the last signaled item will + // become the last element in the list). + // Assuming equally busy events, this approach distributes the priority + // evenly. + + size_t index = 0; + for (size_t i = wait_offset; i < notifications.size(); ++i) + handles[index++] = notifications[i]->other_; + + for (size_t i = 0; i < wait_offset; ++i) + handles[index++] = notifications[i]->other_; + DCHECK_EQ(index, notifications.size()); + + DWORD wait = WAIT_FAILED; + bool wait_failed = false; + if (notifications.size() <= MAXIMUM_WAIT_OBJECTS) { + wait = ::WaitForMultipleObjects(notifications.size(), &handles[0], FALSE, + INFINITE); + wait_failed = wait < WAIT_OBJECT_0 || + wait >= (WAIT_OBJECT_0 + MAXIMUM_WAIT_OBJECTS); + } else { + // Used to stop the other wait threads when an event has been signaled. + base::win::ScopedHandle stop(::CreateEvent(NULL, TRUE, FALSE, NULL)); + + // Create the first thread and pass a pointer to all handles >63 + // to the thread + 'stop'. Then implement the thread so that it checks + // if the number of handles is > 63. If so, spawns a new thread and + // passes >62 handles to that thread and waits for the 62 handles + stop + + // next thread. etc etc. + + // Create a list of threads so that each thread waits on at most 62 events + // including one event for when a child thread signals completion and one + // event for when all of the threads must be stopped (due to some event + // being signaled). + + int thread_signaled_event = -1; + ExtraWaitThread wait_thread(stop, &handles[MAXIMUM_WAIT_OBJECTS - 1], + notifications.size() - (MAXIMUM_WAIT_OBJECTS - 1), + &thread_signaled_event); + base::PlatformThreadHandle thread; + base::PlatformThread::Create(0, &wait_thread, &thread); + HANDLE events[MAXIMUM_WAIT_OBJECTS]; + std::copy(&handles[0], &handles[MAXIMUM_WAIT_OBJECTS - 1], &events[0]); + events[MAXIMUM_WAIT_OBJECTS - 1] = thread; + wait = ::WaitForMultipleObjects(MAXIMUM_WAIT_OBJECTS, &events[0], FALSE, + INFINITE); + wait_failed = wait < WAIT_OBJECT_0 || + wait >= (WAIT_OBJECT_0 + MAXIMUM_WAIT_OBJECTS); + if (wait == WAIT_OBJECT_0 + (MAXIMUM_WAIT_OBJECTS - 1)) { + if (thread_signaled_event < 0) { + wait_failed = true; + NOTREACHED(); + } else { + wait = WAIT_OBJECT_0 + (MAXIMUM_WAIT_OBJECTS - 2) + + thread_signaled_event; + } + } else { + ::SetEvent(stop); + } + base::PlatformThread::Join(thread); + } + + int ret = -1; + if (!wait_failed) { + // Subtract to be politically correct (WAIT_OBJECT_0 is actually 0). + wait -= WAIT_OBJECT_0; + BOOL ok = ::ResetEvent(handles[wait]); + CHECK(ok); + ret = (wait + wait_offset) % notifications.size(); + DCHECK_EQ(handles[wait], notifications[ret]->other_.Get()); + } else { + NOTREACHED(); + } + + CHECK_NE(ret, -1); + return ret; +} |