diff options
author | tommi@chromium.org <tommi@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2012-06-07 12:45:51 +0000 |
---|---|---|
committer | tommi@chromium.org <tommi@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2012-06-07 12:45:51 +0000 |
commit | a0e3f0fe89bf3d1a21ea6b078c1c4bcab353069c (patch) | |
tree | f2e9027dbceedaaaed995ddd546a07f425b1d648 /media | |
parent | c57ccd8e0dce78d664c79b320079487c8094198a (diff) | |
download | chromium_src-a0e3f0fe89bf3d1a21ea6b078c1c4bcab353069c.zip chromium_src-a0e3f0fe89bf3d1a21ea6b078c1c4bcab353069c.tar.gz chromium_src-a0e3f0fe89bf3d1a21ea6b078c1c4bcab353069c.tar.bz2 |
Add support to be able to asynchronously read from a CancelableSyncSocket
on a TYPE_IO message loop thread. This makes it easy to share a thread
that uses a message loop (e.g. for IPC and other things) and not require
a separate thread to read from the socket.
TEST=Run media_unittests. (--gtest_filter=AsyncSocketIoHandlerTest.*)
Review URL: https://chromiumcodereview.appspot.com/10540047
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@140994 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'media')
-rw-r--r-- | media/audio/async_socket_io_handler.h | 108 | ||||
-rw-r--r-- | media/audio/async_socket_io_handler_posix.cc | 89 | ||||
-rw-r--r-- | media/audio/async_socket_io_handler_unittest.cc | 80 | ||||
-rw-r--r-- | media/audio/async_socket_io_handler_win.cc | 74 | ||||
-rw-r--r-- | media/media.gyp | 20 |
5 files changed, 363 insertions, 8 deletions
diff --git a/media/audio/async_socket_io_handler.h b/media/audio/async_socket_io_handler.h new file mode 100644 index 0000000..463b2ca --- /dev/null +++ b/media/audio/async_socket_io_handler.h @@ -0,0 +1,108 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef MEDIA_AUDIO_ASYNC_SOCKET_IO_HANDLER_H_ +#define MEDIA_AUDIO_ASYNC_SOCKET_IO_HANDLER_H_ + +#include "base/message_loop.h" +#include "base/sync_socket.h" +#include "base/threading/non_thread_safe.h" +#include "media/base/media_export.h" + +namespace media { + +// The message loop callback interface is different based on platforms. +#if defined(OS_WIN) +typedef MessageLoopForIO::IOHandler MessageLoopIOHandler; +#elif defined(OS_POSIX) +typedef MessageLoopForIO::Watcher MessageLoopIOHandler; +#endif + +// Extends the CancelableSyncSocket class to allow reading from a socket +// asynchronously on a TYPE_IO message loop thread. This makes it easy to share +// a thread that uses a message loop (e.g. for IPC and other things) and not +// require a separate thread to read from the socket. +// +// Example usage (also see the unit tests): +// +// class SocketReader { +// public: +// SocketReader(base::CancelableSyncSocket* socket) +// : socket_(socket), buffer_() { +// io_handler.Initialize(socket_->handle()); +// } +// +// void AsyncRead() { +// CHECK(io_handler.Read(&buffer_[0], sizeof(buffer_), +// base::Bind(&SocketReader::OnDataAvailable, +// base::Unretained(this))); +// } +// +// private: +// void OnDataAvailable(int bytes_read) { +// ProcessData(&buffer_[0], bytes_read); +// } +// +// media::AsyncSocketIoHandler io_handler; +// base::CancelableSyncSocket* socket_; +// char buffer_[kBufferSize]; +// }; +// +class MEDIA_EXPORT AsyncSocketIoHandler + : public base::NonThreadSafe, + public MessageLoopIOHandler { + public: + AsyncSocketIoHandler(); + virtual ~AsyncSocketIoHandler(); + + // Initializes the AsyncSocketIoHandler by hooking it up to the current + // thread's message loop (must be TYPE_IO), to do async reads from the socket + // on the current thread. + bool Initialize(base::SyncSocket::Handle socket); + + // Type definition for the callback. The parameter tells how many + // bytes were read and is 0 if an error occurred. + typedef base::Callback<void(int)> ReadCompleteCallback; + + // Attempts to read from the socket. The return value will be |false| + // if an error occurred and |true| if data was read or a pending read + // was issued. Regardless of async or sync operation, the callback will + // be called when data is available. + bool Read(char* buffer, int buffer_len, + const ReadCompleteCallback& callback); + + private: +#if defined(OS_WIN) + // Implementation of IOHandler on Windows. + virtual void OnIOCompleted(MessageLoopForIO::IOContext* context, + DWORD bytes_transfered, + DWORD error) OVERRIDE; +#elif defined(OS_POSIX) + // Implementation of MessageLoopForIO::Watcher. + virtual void OnFileCanWriteWithoutBlocking(int socket) OVERRIDE {} + virtual void OnFileCanReadWithoutBlocking(int socket) OVERRIDE; + + void EnsureWatchingSocket(); +#endif + + base::SyncSocket::Handle socket_; +#if defined(OS_WIN) + MessageLoopForIO::IOContext* context_; +#elif defined(OS_POSIX) + MessageLoopForIO::FileDescriptorWatcher socket_watcher_; + // |pending_buffer_| and |pending_buffer_len_| are valid only between + // Read() and OnFileCanReadWithoutBlocking(). + char* pending_buffer_; + int pending_buffer_len_; + // |true| iff the message loop is watching the socket for IO events. + bool is_watching_; +#endif + ReadCompleteCallback read_complete_; + + DISALLOW_COPY_AND_ASSIGN(AsyncSocketIoHandler); +}; + +} // namespace media. + +#endif // MEDIA_AUDIO_ASYNC_SOCKET_IO_HANDLER_H_ diff --git a/media/audio/async_socket_io_handler_posix.cc b/media/audio/async_socket_io_handler_posix.cc new file mode 100644 index 0000000..c4d3169 --- /dev/null +++ b/media/audio/async_socket_io_handler_posix.cc @@ -0,0 +1,89 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "media/audio/async_socket_io_handler.h" + +#include <fcntl.h> +#include "base/eintr_wrapper.h" + +namespace media { + +AsyncSocketIoHandler::AsyncSocketIoHandler() + : socket_(base::SyncSocket::kInvalidHandle), + is_watching_(false) {} + +AsyncSocketIoHandler::~AsyncSocketIoHandler() { + DCHECK(CalledOnValidThread()); +} + +void AsyncSocketIoHandler::OnFileCanReadWithoutBlocking(int socket) { + DCHECK(CalledOnValidThread()); + DCHECK_EQ(socket, socket_); + if (!read_complete_.is_null()) { + int bytes_read = HANDLE_EINTR(read(socket_, pending_buffer_, + pending_buffer_len_)); + DCHECK_GT(bytes_read, 0); + read_complete_.Run(bytes_read > 0 ? bytes_read : 0); + read_complete_.Reset(); + } else { + // We're getting notifications that we can read from the socket while + // we're not waiting for data. In order to not starve the message loop, + // let's stop watching the fd and restart the watch when Read() is called. + is_watching_ = false; + socket_watcher_.StopWatchingFileDescriptor(); + } +} + +bool AsyncSocketIoHandler::Read(char* buffer, int buffer_len, + const ReadCompleteCallback& callback) { + DCHECK(CalledOnValidThread()); + DCHECK(read_complete_.is_null()); + + EnsureWatchingSocket(); + + int bytes_read = HANDLE_EINTR(read(socket_, buffer, buffer_len)); + if (bytes_read < 0) { + if (errno == EAGAIN) { + read_complete_ = callback; + pending_buffer_ = buffer; + pending_buffer_len_ = buffer_len; + } else { + NOTREACHED() << "read(): " << errno; + return false; + } + } else { + callback.Run(bytes_read); + } + return true; +} + +bool AsyncSocketIoHandler::Initialize(base::SyncSocket::Handle socket) { + DCHECK_EQ(socket_, base::SyncSocket::kInvalidHandle); + + DetachFromThread(); + + socket_ = socket; + + // SyncSocket is blocking by default, so let's convert it to non-blocking. + int value = fcntl(socket, F_GETFL); + if (!(value & O_NONBLOCK)) { + // Set the socket to be non-blocking so we can do async reads. + if (fcntl(socket, F_SETFL, O_NONBLOCK) == -1) { + NOTREACHED(); + return false; + } + } + + return true; +} + +void AsyncSocketIoHandler::EnsureWatchingSocket() { + DCHECK(CalledOnValidThread()); + if (!is_watching_ && socket_ != base::SyncSocket::kInvalidHandle) { + is_watching_ = MessageLoopForIO::current()->WatchFileDescriptor( + socket_, true, MessageLoopForIO::WATCH_READ, &socket_watcher_, this); + } +} + +} // namespace media. diff --git a/media/audio/async_socket_io_handler_unittest.cc b/media/audio/async_socket_io_handler_unittest.cc new file mode 100644 index 0000000..c9caa9f --- /dev/null +++ b/media/audio/async_socket_io_handler_unittest.cc @@ -0,0 +1,80 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "media/audio/async_socket_io_handler.h" + +#include "base/bind.h" +#include "testing/gtest/include/gtest/gtest.h" + +namespace { +const char kAsyncSocketIoTestString[] = "Hello, AsyncSocketIoHandler"; +const size_t kAsyncSocketIoTestStringLength = + arraysize(kAsyncSocketIoTestString); + +class TestSocketReader { + public: + TestSocketReader(base::CancelableSyncSocket* socket, bool quit_on_read) + : socket_(socket), buffer_(), quit_on_read_(quit_on_read) { + io_handler.Initialize(socket_->handle()); + } + ~TestSocketReader() {} + + bool IssueRead() { + return io_handler.Read(&buffer_[0], sizeof(buffer_), + base::Bind(&TestSocketReader::OnRead, + base::Unretained(this))); + } + + const char* buffer() const { return &buffer_[0]; } + + private: + void OnRead(int bytes_read) { + EXPECT_GT(bytes_read, 0); + if (quit_on_read_) + MessageLoop::current()->Quit(); + } + + media::AsyncSocketIoHandler io_handler; + base::CancelableSyncSocket* socket_; // Ownership lies outside the class. + char buffer_[kAsyncSocketIoTestStringLength]; + bool quit_on_read_; +}; + +} // end namespace. + +// Tests doing a pending read from a socket and use an IO handler to get +// notified of data. +TEST(AsyncSocketIoHandlerTest, AsynchronousReadWithMessageLoop) { + MessageLoopForIO loop; + + base::CancelableSyncSocket pair[2]; + ASSERT_TRUE(base::CancelableSyncSocket::CreatePair(&pair[0], &pair[1])); + + TestSocketReader reader(&pair[0], true); + EXPECT_TRUE(reader.IssueRead()); + + pair[1].Send(kAsyncSocketIoTestString, kAsyncSocketIoTestStringLength); + MessageLoop::current()->Run(); + EXPECT_EQ(strcmp(reader.buffer(), kAsyncSocketIoTestString), 0); +} + +// Tests doing a read from a socket when we know that there is data in the +// socket. Here we want to make sure that any async 'can read' notifications +// won't trip us off and that the synchronous case works as well. +TEST(AsyncSocketIoHandlerTest, SynchronousReadWithMessageLoop) { + MessageLoopForIO loop; + + base::CancelableSyncSocket pair[2]; + ASSERT_TRUE(base::CancelableSyncSocket::CreatePair(&pair[0], &pair[1])); + + TestSocketReader reader(&pair[0], false); + + pair[1].Send(kAsyncSocketIoTestString, kAsyncSocketIoTestStringLength); + MessageLoop::current()->PostDelayedTask(FROM_HERE, MessageLoop::QuitClosure(), + base::TimeDelta::FromMilliseconds(100)); + MessageLoop::current()->Run(); + + EXPECT_TRUE(reader.IssueRead()); + EXPECT_EQ(strcmp(reader.buffer(), kAsyncSocketIoTestString), 0); +} diff --git a/media/audio/async_socket_io_handler_win.cc b/media/audio/async_socket_io_handler_win.cc new file mode 100644 index 0000000..aad3823 --- /dev/null +++ b/media/audio/async_socket_io_handler_win.cc @@ -0,0 +1,74 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "media/audio/async_socket_io_handler.h" + +namespace media { + +AsyncSocketIoHandler::AsyncSocketIoHandler() + : socket_(base::SyncSocket::kInvalidHandle), + context_(NULL) {} + +AsyncSocketIoHandler::~AsyncSocketIoHandler() { + // We need to be deleted on the correct thread to avoid racing with the + // message loop thread. + DCHECK(CalledOnValidThread()); + + if (context_) { + if (!read_complete_.is_null()) { + // Make the context be deleted by the message pump when done. + context_->handler = NULL; + } else { + delete context_; + } + } +} + +// Implementation of IOHandler on Windows. +void AsyncSocketIoHandler::OnIOCompleted(MessageLoopForIO::IOContext* context, + DWORD bytes_transfered, + DWORD error) { + DCHECK(CalledOnValidThread()); + DCHECK_EQ(context_, context); + if (!read_complete_.is_null()) { + read_complete_.Run(error == ERROR_SUCCESS ? bytes_transfered : 0); + read_complete_.Reset(); + } +} + +bool AsyncSocketIoHandler::Read(char* buffer, int buffer_len, + const ReadCompleteCallback& callback) { + DCHECK(CalledOnValidThread()); + DCHECK(read_complete_.is_null()); + DCHECK_NE(socket_, base::SyncSocket::kInvalidHandle); + + read_complete_ = callback; + + DWORD bytes_read = 0; + BOOL ok = ::ReadFile(socket_, buffer, buffer_len, &bytes_read, + &context_->overlapped); + // The completion port will be signaled regardless of completing the read + // straight away or asynchronously (ERROR_IO_PENDING). OnIOCompleted() will + // be called regardless and we don't need to explicitly run the callback + // in the case where ok is FALSE and GLE==ERROR_IO_PENDING. + return ok || GetLastError() == ERROR_IO_PENDING; +} + +bool AsyncSocketIoHandler::Initialize(base::SyncSocket::Handle socket) { + DCHECK(!context_); + DCHECK_EQ(socket_, base::SyncSocket::kInvalidHandle); + + DetachFromThread(); + + socket_ = socket; + MessageLoopForIO::current()->RegisterIOHandler(socket, this); + + context_ = new MessageLoopForIO::IOContext(); + context_->handler = this; + memset(&context_->overlapped, 0, sizeof(context_->overlapped)); + + return true; +} + +} // namespace media. diff --git a/media/media.gyp b/media/media.gyp index f1ac284..2a8c6c6 100644 --- a/media/media.gyp +++ b/media/media.gyp @@ -30,6 +30,17 @@ '..', ], 'sources': [ + 'audio/android/audio_manager_android.cc', + 'audio/android/audio_manager_android.h', + 'audio/android/audio_track_output_android.cc', + 'audio/android/audio_track_output_android.h', + 'audio/android/opensles_input.cc', + 'audio/android/opensles_input.h', + 'audio/android/opensles_output.cc', + 'audio/android/opensles_output.h', + 'audio/async_socket_io_handler.h', + 'audio/async_socket_io_handler_posix.cc', + 'audio/async_socket_io_handler_win.cc', 'audio/audio_buffers_state.cc', 'audio/audio_buffers_state.h', 'audio/audio_io.h', @@ -57,14 +68,6 @@ 'audio/audio_parameters.h', 'audio/audio_util.cc', 'audio/audio_util.h', - 'audio/android/audio_manager_android.cc', - 'audio/android/audio_manager_android.h', - 'audio/android/audio_track_output_android.cc', - 'audio/android/audio_track_output_android.h', - 'audio/android/opensles_input.cc', - 'audio/android/opensles_input.h', - 'audio/android/opensles_output.cc', - 'audio/android/opensles_output.h', 'audio/cross_process_notification.cc', 'audio/cross_process_notification.h', 'audio/cross_process_notification_win.cc', @@ -641,6 +644,7 @@ '../ui/ui.gyp:ui', ], 'sources': [ + 'audio/async_socket_io_handler_unittest.cc', 'audio/audio_input_controller_unittest.cc', 'audio/audio_input_device_unittest.cc', 'audio/audio_input_unittest.cc', |