diff options
mode: <>2012-06-07 12:45:51 +0000 <>2012-06-07 12:45:51 +0000
commita0e3f0fe89bf3d1a21ea6b078c1c4bcab353069c (patch)
parentc57ccd8e0dce78d664c79b320079487c8094198a (diff)
Add support to be able to asynchronously read from a CancelableSyncSocket
on a TYPE_IO message loop thread. This makes it easy to share a thread that uses a message loop (e.g. for IPC and other things) and not require a separate thread to read from the socket. TEST=Run media_unittests. (--gtest_filter=AsyncSocketIoHandlerTest.*) Review URL: git-svn-id: svn:// 0039d316-1c4b-4281-b951-d872f2087c98
5 files changed, 363 insertions, 8 deletions
diff --git a/media/audio/async_socket_io_handler.h b/media/audio/async_socket_io_handler.h
new file mode 100644
index 0000000..463b2ca
--- /dev/null
+++ b/media/audio/async_socket_io_handler.h
@@ -0,0 +1,108 @@
+// Copyright (c) 2012 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+#include "base/message_loop.h"
+#include "base/sync_socket.h"
+#include "base/threading/non_thread_safe.h"
+#include "media/base/media_export.h"
+namespace media {
+// The message loop callback interface is different based on platforms.
+#if defined(OS_WIN)
+typedef MessageLoopForIO::IOHandler MessageLoopIOHandler;
+#elif defined(OS_POSIX)
+typedef MessageLoopForIO::Watcher MessageLoopIOHandler;
+// Extends the CancelableSyncSocket class to allow reading from a socket
+// asynchronously on a TYPE_IO message loop thread. This makes it easy to share
+// a thread that uses a message loop (e.g. for IPC and other things) and not
+// require a separate thread to read from the socket.
+// Example usage (also see the unit tests):
+// class SocketReader {
+// public:
+// SocketReader(base::CancelableSyncSocket* socket)
+// : socket_(socket), buffer_() {
+// io_handler.Initialize(socket_->handle());
+// }
+// void AsyncRead() {
+// CHECK(io_handler.Read(&buffer_[0], sizeof(buffer_),
+// base::Bind(&SocketReader::OnDataAvailable,
+// base::Unretained(this)));
+// }
+// private:
+// void OnDataAvailable(int bytes_read) {
+// ProcessData(&buffer_[0], bytes_read);
+// }
+// media::AsyncSocketIoHandler io_handler;
+// base::CancelableSyncSocket* socket_;
+// char buffer_[kBufferSize];
+// };
+class MEDIA_EXPORT AsyncSocketIoHandler
+ : public base::NonThreadSafe,
+ public MessageLoopIOHandler {
+ public:
+ AsyncSocketIoHandler();
+ virtual ~AsyncSocketIoHandler();
+ // Initializes the AsyncSocketIoHandler by hooking it up to the current
+ // thread's message loop (must be TYPE_IO), to do async reads from the socket
+ // on the current thread.
+ bool Initialize(base::SyncSocket::Handle socket);
+ // Type definition for the callback. The parameter tells how many
+ // bytes were read and is 0 if an error occurred.
+ typedef base::Callback<void(int)> ReadCompleteCallback;
+ // Attempts to read from the socket. The return value will be |false|
+ // if an error occurred and |true| if data was read or a pending read
+ // was issued. Regardless of async or sync operation, the callback will
+ // be called when data is available.
+ bool Read(char* buffer, int buffer_len,
+ const ReadCompleteCallback& callback);
+ private:
+#if defined(OS_WIN)
+ // Implementation of IOHandler on Windows.
+ virtual void OnIOCompleted(MessageLoopForIO::IOContext* context,
+ DWORD bytes_transfered,
+#elif defined(OS_POSIX)
+ // Implementation of MessageLoopForIO::Watcher.
+ virtual void OnFileCanWriteWithoutBlocking(int socket) OVERRIDE {}
+ virtual void OnFileCanReadWithoutBlocking(int socket) OVERRIDE;
+ void EnsureWatchingSocket();
+ base::SyncSocket::Handle socket_;
+#if defined(OS_WIN)
+ MessageLoopForIO::IOContext* context_;
+#elif defined(OS_POSIX)
+ MessageLoopForIO::FileDescriptorWatcher socket_watcher_;
+ // |pending_buffer_| and |pending_buffer_len_| are valid only between
+ // Read() and OnFileCanReadWithoutBlocking().
+ char* pending_buffer_;
+ int pending_buffer_len_;
+ // |true| iff the message loop is watching the socket for IO events.
+ bool is_watching_;
+ ReadCompleteCallback read_complete_;
+} // namespace media.
diff --git a/media/audio/ b/media/audio/
new file mode 100644
index 0000000..c4d3169
--- /dev/null
+++ b/media/audio/
@@ -0,0 +1,89 @@
+// Copyright (c) 2012 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+#include "media/audio/async_socket_io_handler.h"
+#include <fcntl.h>
+#include "base/eintr_wrapper.h"
+namespace media {
+ : socket_(base::SyncSocket::kInvalidHandle),
+ is_watching_(false) {}
+AsyncSocketIoHandler::~AsyncSocketIoHandler() {
+ DCHECK(CalledOnValidThread());
+void AsyncSocketIoHandler::OnFileCanReadWithoutBlocking(int socket) {
+ DCHECK(CalledOnValidThread());
+ DCHECK_EQ(socket, socket_);
+ if (!read_complete_.is_null()) {
+ int bytes_read = HANDLE_EINTR(read(socket_, pending_buffer_,
+ pending_buffer_len_));
+ DCHECK_GT(bytes_read, 0);
+ read_complete_.Run(bytes_read > 0 ? bytes_read : 0);
+ read_complete_.Reset();
+ } else {
+ // We're getting notifications that we can read from the socket while
+ // we're not waiting for data. In order to not starve the message loop,
+ // let's stop watching the fd and restart the watch when Read() is called.
+ is_watching_ = false;
+ socket_watcher_.StopWatchingFileDescriptor();
+ }
+bool AsyncSocketIoHandler::Read(char* buffer, int buffer_len,
+ const ReadCompleteCallback& callback) {
+ DCHECK(CalledOnValidThread());
+ DCHECK(read_complete_.is_null());
+ EnsureWatchingSocket();
+ int bytes_read = HANDLE_EINTR(read(socket_, buffer, buffer_len));
+ if (bytes_read < 0) {
+ if (errno == EAGAIN) {
+ read_complete_ = callback;
+ pending_buffer_ = buffer;
+ pending_buffer_len_ = buffer_len;
+ } else {
+ NOTREACHED() << "read(): " << errno;
+ return false;
+ }
+ } else {
+ callback.Run(bytes_read);
+ }
+ return true;
+bool AsyncSocketIoHandler::Initialize(base::SyncSocket::Handle socket) {
+ DCHECK_EQ(socket_, base::SyncSocket::kInvalidHandle);
+ DetachFromThread();
+ socket_ = socket;
+ // SyncSocket is blocking by default, so let's convert it to non-blocking.
+ int value = fcntl(socket, F_GETFL);
+ if (!(value & O_NONBLOCK)) {
+ // Set the socket to be non-blocking so we can do async reads.
+ if (fcntl(socket, F_SETFL, O_NONBLOCK) == -1) {
+ return false;
+ }
+ }
+ return true;
+void AsyncSocketIoHandler::EnsureWatchingSocket() {
+ DCHECK(CalledOnValidThread());
+ if (!is_watching_ && socket_ != base::SyncSocket::kInvalidHandle) {
+ is_watching_ = MessageLoopForIO::current()->WatchFileDescriptor(
+ socket_, true, MessageLoopForIO::WATCH_READ, &socket_watcher_, this);
+ }
+} // namespace media.
diff --git a/media/audio/ b/media/audio/
new file mode 100644
index 0000000..c9caa9f
--- /dev/null
+++ b/media/audio/
@@ -0,0 +1,80 @@
+// Copyright (c) 2012 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+#include "media/audio/async_socket_io_handler.h"
+#include "base/bind.h"
+#include "testing/gtest/include/gtest/gtest.h"
+namespace {
+const char kAsyncSocketIoTestString[] = "Hello, AsyncSocketIoHandler";
+const size_t kAsyncSocketIoTestStringLength =
+ arraysize(kAsyncSocketIoTestString);
+class TestSocketReader {
+ public:
+ TestSocketReader(base::CancelableSyncSocket* socket, bool quit_on_read)
+ : socket_(socket), buffer_(), quit_on_read_(quit_on_read) {
+ io_handler.Initialize(socket_->handle());
+ }
+ ~TestSocketReader() {}
+ bool IssueRead() {
+ return io_handler.Read(&buffer_[0], sizeof(buffer_),
+ base::Bind(&TestSocketReader::OnRead,
+ base::Unretained(this)));
+ }
+ const char* buffer() const { return &buffer_[0]; }
+ private:
+ void OnRead(int bytes_read) {
+ EXPECT_GT(bytes_read, 0);
+ if (quit_on_read_)
+ MessageLoop::current()->Quit();
+ }
+ media::AsyncSocketIoHandler io_handler;
+ base::CancelableSyncSocket* socket_; // Ownership lies outside the class.
+ char buffer_[kAsyncSocketIoTestStringLength];
+ bool quit_on_read_;
+} // end namespace.
+// Tests doing a pending read from a socket and use an IO handler to get
+// notified of data.
+TEST(AsyncSocketIoHandlerTest, AsynchronousReadWithMessageLoop) {
+ MessageLoopForIO loop;
+ base::CancelableSyncSocket pair[2];
+ ASSERT_TRUE(base::CancelableSyncSocket::CreatePair(&pair[0], &pair[1]));
+ TestSocketReader reader(&pair[0], true);
+ EXPECT_TRUE(reader.IssueRead());
+ pair[1].Send(kAsyncSocketIoTestString, kAsyncSocketIoTestStringLength);
+ MessageLoop::current()->Run();
+ EXPECT_EQ(strcmp(reader.buffer(), kAsyncSocketIoTestString), 0);
+// Tests doing a read from a socket when we know that there is data in the
+// socket. Here we want to make sure that any async 'can read' notifications
+// won't trip us off and that the synchronous case works as well.
+TEST(AsyncSocketIoHandlerTest, SynchronousReadWithMessageLoop) {
+ MessageLoopForIO loop;
+ base::CancelableSyncSocket pair[2];
+ ASSERT_TRUE(base::CancelableSyncSocket::CreatePair(&pair[0], &pair[1]));
+ TestSocketReader reader(&pair[0], false);
+ pair[1].Send(kAsyncSocketIoTestString, kAsyncSocketIoTestStringLength);
+ MessageLoop::current()->PostDelayedTask(FROM_HERE, MessageLoop::QuitClosure(),
+ base::TimeDelta::FromMilliseconds(100));
+ MessageLoop::current()->Run();
+ EXPECT_TRUE(reader.IssueRead());
+ EXPECT_EQ(strcmp(reader.buffer(), kAsyncSocketIoTestString), 0);
diff --git a/media/audio/ b/media/audio/
new file mode 100644
index 0000000..aad3823
--- /dev/null
+++ b/media/audio/
@@ -0,0 +1,74 @@
+// Copyright (c) 2012 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+#include "media/audio/async_socket_io_handler.h"
+namespace media {
+ : socket_(base::SyncSocket::kInvalidHandle),
+ context_(NULL) {}
+AsyncSocketIoHandler::~AsyncSocketIoHandler() {
+ // We need to be deleted on the correct thread to avoid racing with the
+ // message loop thread.
+ DCHECK(CalledOnValidThread());
+ if (context_) {
+ if (!read_complete_.is_null()) {
+ // Make the context be deleted by the message pump when done.
+ context_->handler = NULL;
+ } else {
+ delete context_;
+ }
+ }
+// Implementation of IOHandler on Windows.
+void AsyncSocketIoHandler::OnIOCompleted(MessageLoopForIO::IOContext* context,
+ DWORD bytes_transfered,
+ DWORD error) {
+ DCHECK(CalledOnValidThread());
+ DCHECK_EQ(context_, context);
+ if (!read_complete_.is_null()) {
+ read_complete_.Run(error == ERROR_SUCCESS ? bytes_transfered : 0);
+ read_complete_.Reset();
+ }
+bool AsyncSocketIoHandler::Read(char* buffer, int buffer_len,
+ const ReadCompleteCallback& callback) {
+ DCHECK(CalledOnValidThread());
+ DCHECK(read_complete_.is_null());
+ DCHECK_NE(socket_, base::SyncSocket::kInvalidHandle);
+ read_complete_ = callback;
+ DWORD bytes_read = 0;
+ BOOL ok = ::ReadFile(socket_, buffer, buffer_len, &bytes_read,
+ &context_->overlapped);
+ // The completion port will be signaled regardless of completing the read
+ // straight away or asynchronously (ERROR_IO_PENDING). OnIOCompleted() will
+ // be called regardless and we don't need to explicitly run the callback
+ // in the case where ok is FALSE and GLE==ERROR_IO_PENDING.
+ return ok || GetLastError() == ERROR_IO_PENDING;
+bool AsyncSocketIoHandler::Initialize(base::SyncSocket::Handle socket) {
+ DCHECK(!context_);
+ DCHECK_EQ(socket_, base::SyncSocket::kInvalidHandle);
+ DetachFromThread();
+ socket_ = socket;
+ MessageLoopForIO::current()->RegisterIOHandler(socket, this);
+ context_ = new MessageLoopForIO::IOContext();
+ context_->handler = this;
+ memset(&context_->overlapped, 0, sizeof(context_->overlapped));
+ return true;
+} // namespace media.
diff --git a/media/media.gyp b/media/media.gyp
index f1ac284..2a8c6c6 100644
--- a/media/media.gyp
+++ b/media/media.gyp
@@ -30,6 +30,17 @@
'sources': [
+ 'audio/android/',
+ 'audio/android/audio_manager_android.h',
+ 'audio/android/',
+ 'audio/android/audio_track_output_android.h',
+ 'audio/android/',
+ 'audio/android/opensles_input.h',
+ 'audio/android/',
+ 'audio/android/opensles_output.h',
+ 'audio/async_socket_io_handler.h',
+ 'audio/',
+ 'audio/',
@@ -57,14 +68,6 @@
- 'audio/android/',
- 'audio/android/audio_manager_android.h',
- 'audio/android/',
- 'audio/android/audio_track_output_android.h',
- 'audio/android/',
- 'audio/android/opensles_input.h',
- 'audio/android/',
- 'audio/android/opensles_output.h',
@@ -641,6 +644,7 @@
'sources': [
+ 'audio/',