diff options
author | tommi@chromium.org <tommi@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2012-01-25 12:04:17 +0000 |
---|---|---|
committer | tommi@chromium.org <tommi@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2012-01-25 12:04:17 +0000 |
commit | 532e9bd6d2f7e0e6a8e6e6c29bc62e5a3c839adf (patch) | |
tree | 1e0762047fbae23fe10f547d8d76318597786de8 | |
parent | 049ed6d1eb9c8e7028d894350728be8d11170bca (diff) | |
download | chromium_src-532e9bd6d2f7e0e6a8e6e6c29bc62e5a3c839adf.zip chromium_src-532e9bd6d2f7e0e6a8e6e6c29bc62e5a3c839adf.tar.gz chromium_src-532e9bd6d2f7e0e6a8e6e6c29bc62e5a3c839adf.tar.bz2 |
Implement support for a cancelable SyncSocket.
Currently, blocking SyncSocket operations can not be unblocked from other threads, but this is now supported by using the CancelableSyncSocket class.
The implementation on Mac and Linux is very simple and basically consists of adding a call to shutdown().
On Windows however things are a tiny bit more complex since we use named pipes with synchronous IO and canceling synchronous IO is simply not possible on XP and arguably tricky on Vista+. So, what we do instead is to use asynchronous IO in a synchronous fashion to support the SyncSocket semantics and as well as allowing the connection to be correctly shut down from another thread.
Review URL: https://chromiumcodereview.appspot.com/8965053
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@119051 0039d316-1c4b-4281-b951-d872f2087c98
-rw-r--r-- | base/sync_socket.h | 65 | ||||
-rw-r--r-- | base/sync_socket_posix.cc | 63 | ||||
-rw-r--r-- | base/sync_socket_win.cc | 207 | ||||
-rw-r--r-- | content/browser/renderer_host/media/audio_input_sync_writer.cc | 12 | ||||
-rw-r--r-- | content/browser/renderer_host/media/audio_sync_reader.cc | 12 | ||||
-rw-r--r-- | content/renderer/pepper_plugin_delegate_impl.cc | 12 | ||||
-rw-r--r-- | ipc/sync_socket_unittest.cc | 61 | ||||
-rw-r--r-- | media/audio/win/audio_output_win_unittest.cc | 10 |
8 files changed, 313 insertions, 129 deletions
diff --git a/base/sync_socket.h b/base/sync_socket.h index 87b6a97..9bf8836 100644 --- a/base/sync_socket.h +++ b/base/sync_socket.h @@ -1,4 +1,4 @@ -// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Copyright (c) 2012 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. @@ -17,6 +17,8 @@ #include <sys/types.h> #include "base/base_export.h" +#include "base/compiler_specific.h" +#include "base/synchronization/waitable_event.h" namespace base { @@ -29,17 +31,19 @@ class BASE_EXPORT SyncSocket { #endif static const Handle kInvalidHandle; + SyncSocket(); + // Creates a SyncSocket from a Handle. Used in transport. - explicit SyncSocket(Handle handle) : handle_(handle) { } - ~SyncSocket() { Close(); } + explicit SyncSocket(Handle handle) : handle_(handle) {} + virtual ~SyncSocket(); - // Creates an unnamed pair of connected sockets. - // pair is a pointer to an array of two SyncSockets in which connected socket - // descriptors are returned. Returns true on success, false on failure. - static bool CreatePair(SyncSocket* pair[2]); + // Initializes and connects a pair of sockets. + // |socket_a| and |socket_b| must not hold a valid handle. Upon successful + // return, the sockets will both be valid and connected. + static bool CreatePair(SyncSocket* socket_a, SyncSocket* socket_b); // Closes the SyncSocket. Returns true on success, false on failure. - bool Close(); + virtual bool Close(); // Sends the message to the remote peer of the SyncSocket. // Note it is not safe to send messages from the same socket handle by @@ -47,13 +51,13 @@ class BASE_EXPORT SyncSocket { // buffer is a pointer to the data to send. // length is the length of the data to send (must be non-zero). // Returns the number of bytes sent, or 0 upon failure. - size_t Send(const void* buffer, size_t length); + virtual size_t Send(const void* buffer, size_t length); // Receives a message from an SyncSocket. // buffer is a pointer to the buffer to receive data. // length is the number of bytes of data to receive (must be non-zero). // Returns the number of bytes received, or 0 upon failure. - size_t Receive(void* buffer, size_t length); + virtual size_t Receive(void* buffer, size_t length); // Returns the number of bytes available. If non-zero, Receive() will not // not block when called. NOTE: Some implementations cannot reliably @@ -65,12 +69,51 @@ class BASE_EXPORT SyncSocket { // processes. Handle handle() const { return handle_; } - private: + protected: Handle handle_; + private: DISALLOW_COPY_AND_ASSIGN(SyncSocket); }; +// Derives from SyncSocket and adds support for shutting down the socket from +// another thread while a blocking Receive or Send is being done from the thread +// that owns the socket. +class BASE_EXPORT CancelableSyncSocket : public SyncSocket { + public: + CancelableSyncSocket(); + explicit CancelableSyncSocket(Handle handle); + virtual ~CancelableSyncSocket() {} + + // Initializes a pair of cancelable sockets. See documentation for + // SyncSocket::CreatePair for more details. + static bool CreatePair(CancelableSyncSocket* socket_a, + CancelableSyncSocket* socket_b); + + // A way to shut down a socket even if another thread is currently performing + // a blocking Receive or Send. + bool Shutdown(); + +#if defined(OS_WIN) + // Since the Linux and Mac implementations actually use a socket, shutting + // them down from another thread is pretty simple - we can just call + // shutdown(). However, the Windows implementation relies on named pipes + // and there isn't a way to cancel a blocking synchronous Read that is + // supported on <Vista. So, for Windows only, we override these + // SyncSocket methods in order to support shutting down the 'socket'. + virtual bool Close() OVERRIDE; + virtual size_t Send(const void* buffer, size_t length) OVERRIDE; + virtual size_t Receive(void* buffer, size_t length) OVERRIDE; +#endif + + private: +#if defined(OS_WIN) + WaitableEvent shutdown_event_; + WaitableEvent file_operation_; +#endif + DISALLOW_COPY_AND_ASSIGN(CancelableSyncSocket); +}; + } // namespace base #endif // BASE_SYNC_SOCKET_H_ diff --git a/base/sync_socket_posix.cc b/base/sync_socket_posix.cc index c486cb5..c5dca75 100644 --- a/base/sync_socket_posix.cc +++ b/base/sync_socket_posix.cc @@ -1,4 +1,4 @@ -// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Copyright (c) 2012 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. @@ -30,25 +30,26 @@ const size_t kMaxMessageLength = static_cast<size_t>(INT_MAX); const SyncSocket::Handle SyncSocket::kInvalidHandle = -1; -bool SyncSocket::CreatePair(SyncSocket* pair[2]) { - Handle handles[2] = { kInvalidHandle, kInvalidHandle }; - SyncSocket* tmp_sockets[2] = { NULL, NULL }; +SyncSocket::SyncSocket() : handle_(kInvalidHandle) {} + +SyncSocket::~SyncSocket() { + Close(); +} + +// static +bool SyncSocket::CreatePair(SyncSocket* socket_a, SyncSocket* socket_b) { + DCHECK(socket_a != socket_b); + DCHECK(socket_a->handle_ == kInvalidHandle); + DCHECK(socket_b->handle_ == kInvalidHandle); + #if defined(OS_MACOSX) int nosigpipe = 1; #endif // defined(OS_MACOSX) - // Create the two SyncSocket objects first to avoid ugly cleanup issues. - tmp_sockets[0] = new SyncSocket(kInvalidHandle); - if (tmp_sockets[0] == NULL) { - goto cleanup; - } - tmp_sockets[1] = new SyncSocket(kInvalidHandle); - if (tmp_sockets[1] == NULL) { - goto cleanup; - } - if (socketpair(AF_UNIX, SOCK_STREAM, 0, handles) != 0) { + Handle handles[2] = { kInvalidHandle, kInvalidHandle }; + if (socketpair(AF_UNIX, SOCK_STREAM, 0, handles) != 0) goto cleanup; - } + #if defined(OS_MACOSX) // On OSX an attempt to read or write to a closed socket may generate a // SIGPIPE rather than returning -1. setsockopt will shut this off. @@ -59,11 +60,11 @@ bool SyncSocket::CreatePair(SyncSocket* pair[2]) { goto cleanup; } #endif + // Copy the handles out for successful return. - tmp_sockets[0]->handle_ = handles[0]; - pair[0] = tmp_sockets[0]; - tmp_sockets[1]->handle_ = handles[1]; - pair[1] = tmp_sockets[1]; + socket_a->handle_ = handles[0]; + socket_b->handle_ = handles[1]; + return true; cleanup: @@ -75,8 +76,7 @@ bool SyncSocket::CreatePair(SyncSocket* pair[2]) { if (HANDLE_EINTR(close(handles[1])) < 0) DPLOG(ERROR) << "close"; } - delete tmp_sockets[0]; - delete tmp_sockets[1]; + return false; } @@ -101,11 +101,9 @@ size_t SyncSocket::Send(const void* buffer, size_t length) { size_t SyncSocket::Receive(void* buffer, size_t length) { DCHECK_LE(length, kMaxMessageLength); char* charbuffer = static_cast<char*>(buffer); - if (file_util::ReadFromFD(handle_, charbuffer, length)) { + if (file_util::ReadFromFD(handle_, charbuffer, length)) return length; - } else { - return -1; - } + return 0; } size_t SyncSocket::Peek() { @@ -117,4 +115,19 @@ size_t SyncSocket::Peek() { return (size_t) number_chars; } +CancelableSyncSocket::CancelableSyncSocket() {} +CancelableSyncSocket::CancelableSyncSocket(Handle handle) + : SyncSocket(handle) { +} + +bool CancelableSyncSocket::Shutdown() { + return HANDLE_EINTR(shutdown(handle(), SHUT_RDWR)) >= 0; +} + +// static +bool CancelableSyncSocket::CreatePair(CancelableSyncSocket* socket_a, + CancelableSyncSocket* socket_b) { + return SyncSocket::CreatePair(socket_a, socket_b); +} + } // namespace base diff --git a/base/sync_socket_win.cc b/base/sync_socket_win.cc index 032e04f7..359790c 100644 --- a/base/sync_socket_win.cc +++ b/base/sync_socket_win.cc @@ -3,14 +3,14 @@ // found in the LICENSE file. #include "base/sync_socket.h" -#include <limits.h> -#include <stdio.h> -#include <windows.h> -#include <sys/types.h> + #include "base/logging.h" +#include "base/win/scoped_handle.h" namespace base { +using win::ScopedHandle; + namespace { // IMPORTANT: do not change how this name is generated because it will break // in sandboxed scenarios as we might have by-name policies that allow pipe @@ -26,85 +26,148 @@ const int kOutBufferSize = 4096; const int kInBufferSize = 4096; const int kDefaultTimeoutMilliSeconds = 1000; -} // namespace - -const SyncSocket::Handle SyncSocket::kInvalidHandle = INVALID_HANDLE_VALUE; - -bool SyncSocket::CreatePair(SyncSocket* pair[2]) { - Handle handles[2]; - SyncSocket* tmp_sockets[2]; - - // Create the two SyncSocket objects first to avoid ugly cleanup issues. - tmp_sockets[0] = new SyncSocket(kInvalidHandle); - if (tmp_sockets[0] == NULL) { - return false; - } - tmp_sockets[1] = new SyncSocket(kInvalidHandle); - if (tmp_sockets[1] == NULL) { - delete tmp_sockets[0]; - return false; - } +bool CreatePairImpl(HANDLE* socket_a, HANDLE* socket_b, bool overlapped) { + DCHECK(socket_a != socket_b); + DCHECK(*socket_a == SyncSocket::kInvalidHandle); + DCHECK(*socket_b == SyncSocket::kInvalidHandle); wchar_t name[kPipePathMax]; + ScopedHandle handle_a; + DWORD flags = PIPE_ACCESS_DUPLEX | FILE_FLAG_FIRST_PIPE_INSTANCE; + if (overlapped) + flags |= FILE_FLAG_OVERLAPPED; + do { unsigned int rnd_name; if (rand_s(&rnd_name) != 0) return false; + swprintf(name, kPipePathMax, kPipeNameFormat, GetCurrentProcessId(), GetCurrentThreadId(), rnd_name); - handles[0] = CreateNamedPipeW( + + handle_a.Set(CreateNamedPipeW( name, - PIPE_ACCESS_DUPLEX | FILE_FLAG_FIRST_PIPE_INSTANCE, + flags, PIPE_TYPE_BYTE | PIPE_READMODE_BYTE, 1, kOutBufferSize, kInBufferSize, kDefaultTimeoutMilliSeconds, - NULL); - } while ((handles[0] == INVALID_HANDLE_VALUE) && + NULL)); + } while (!handle_a.IsValid() && (GetLastError() == ERROR_PIPE_BUSY)); - if (handles[0] == INVALID_HANDLE_VALUE) { + if (!handle_a.IsValid()) { NOTREACHED(); return false; } - // The SECURITY_ANONYMOUS flag means that the server side (pair[0]) cannot - // impersonate the client (pair[1]). This allows us not to care which side + + // The SECURITY_ANONYMOUS flag means that the server side (handle_a) cannot + // impersonate the client (handle_b). This allows us not to care which side // ends up in which side of a privilege boundary. - handles[1] = CreateFileW(name, - GENERIC_READ | GENERIC_WRITE, - 0, // no sharing. - NULL, // default security attributes. - OPEN_EXISTING, // opens existing pipe. - SECURITY_SQOS_PRESENT | SECURITY_ANONYMOUS, - NULL); // no template file. - if (handles[1] == INVALID_HANDLE_VALUE) { - CloseHandle(handles[0]); + flags = SECURITY_SQOS_PRESENT | SECURITY_ANONYMOUS; + if (overlapped) + flags |= FILE_FLAG_OVERLAPPED; + + ScopedHandle handle_b(CreateFileW(name, + GENERIC_READ | GENERIC_WRITE, + 0, // no sharing. + NULL, // default security attributes. + OPEN_EXISTING, // opens existing pipe. + flags, + NULL)); // no template file. + if (!handle_b.IsValid()) { + DPLOG(ERROR) << "CreateFileW failed"; return false; } - if (ConnectNamedPipe(handles[0], NULL) == FALSE) { + + if (!ConnectNamedPipe(handle_a, NULL)) { DWORD error = GetLastError(); if (error != ERROR_PIPE_CONNECTED) { - CloseHandle(handles[0]); - CloseHandle(handles[1]); + DPLOG(ERROR) << "ConnectNamedPipe failed"; return false; } } - // Copy the handles out for successful return. - tmp_sockets[0]->handle_ = handles[0]; - pair[0] = tmp_sockets[0]; - tmp_sockets[1]->handle_ = handles[1]; - pair[1] = tmp_sockets[1]; + + *socket_a = handle_a.Take(); + *socket_b = handle_b.Take(); + return true; } +// Inline helper to avoid having the cast everywhere. +DWORD GetNextChunkSize(size_t current_pos, size_t max_size) { + // The following statement is for 64 bit portability. + return static_cast<DWORD>(((max_size - current_pos) <= UINT_MAX) ? + (max_size - current_pos) : UINT_MAX); +} + +// Template function that supports calling ReadFile or WriteFile in an +// overlapped fashion and waits for IO completion. The function also waits +// on an event that can be used to cancel the operation. If the operation +// is cancelled, the function returns and closes the relevant socket object. +template <typename BufferType, typename Function> +size_t CancelableFileOperation(Function operation, HANDLE file, + BufferType* buffer, size_t length, + base::WaitableEvent* io_event, + base::WaitableEvent* cancel_event, + CancelableSyncSocket* socket) { + // The buffer must be byte size or the length check won't make much sense. + COMPILE_ASSERT(sizeof(buffer[0]) == sizeof(char), incorrect_buffer_type); + DCHECK_LE(length, kMaxMessageLength); + + OVERLAPPED ol = {0}; + ol.hEvent = io_event->handle(); + size_t count = 0; + while (count < length) { + DWORD chunk = GetNextChunkSize(count, length); + // This is either the ReadFile or WriteFile call depending on whether + // we're receiving or sending data. + DWORD len; + BOOL ok = operation(file, static_cast<BufferType*>(buffer) + count, chunk, + &len, &ol); + if (!ok) { + if (::GetLastError() == ERROR_IO_PENDING) { + base::WaitableEvent* events[] = { io_event, cancel_event }; + size_t signaled = WaitableEvent::WaitMany(events, arraysize(events)); + if (signaled == 1) { + VLOG(1) << "Shutdown was signaled. Closing socket."; + socket->Close(); + break; + } else { + GetOverlappedResult(file, &ol, &len, TRUE); + } + } else { + return (0 < count) ? count : 0; + } + } + count += len; + } + return count; +} + +} // namespace + +const SyncSocket::Handle SyncSocket::kInvalidHandle = INVALID_HANDLE_VALUE; + +SyncSocket::SyncSocket() : handle_(kInvalidHandle) {} + +SyncSocket::~SyncSocket() { + Close(); +} + +// static +bool SyncSocket::CreatePair(SyncSocket* socket_a, SyncSocket* socket_b) { + return CreatePairImpl(&socket_a->handle_, &socket_b->handle_, false); +} + bool SyncSocket::Close() { - if (handle_ == kInvalidHandle) { + if (handle_ == kInvalidHandle) return false; - } + BOOL retval = CloseHandle(handle_); handle_ = kInvalidHandle; return retval ? true : false; @@ -115,9 +178,7 @@ size_t SyncSocket::Send(const void* buffer, size_t length) { size_t count = 0; while (count < length) { DWORD len; - // The following statement is for 64 bit portability. - DWORD chunk = static_cast<DWORD>( - ((length - count) <= UINT_MAX) ? (length - count) : UINT_MAX); + DWORD chunk = GetNextChunkSize(count, length); if (WriteFile(handle_, static_cast<const char*>(buffer) + count, chunk, &len, NULL) == FALSE) { return (0 < count) ? count : 0; @@ -132,8 +193,7 @@ size_t SyncSocket::Receive(void* buffer, size_t length) { size_t count = 0; while (count < length) { DWORD len; - DWORD chunk = static_cast<DWORD>( - ((length - count) <= UINT_MAX) ? (length - count) : UINT_MAX); + DWORD chunk = GetNextChunkSize(count, length); if (ReadFile(handle_, static_cast<char*>(buffer) + count, chunk, &len, NULL) == FALSE) { return (0 < count) ? count : 0; @@ -149,4 +209,45 @@ size_t SyncSocket::Peek() { return available; } +CancelableSyncSocket::CancelableSyncSocket() + : shutdown_event_(true, false), file_operation_(true, false) { +} + +CancelableSyncSocket::CancelableSyncSocket(Handle handle) + : SyncSocket(handle), shutdown_event_(true, false), + file_operation_(true, false) { +} + +bool CancelableSyncSocket::Shutdown() { + // This doesn't shut down the pipe immediately, but subsequent Receive or Send + // methods will fail straight away. + shutdown_event_.Signal(); + return true; +} + +bool CancelableSyncSocket::Close() { + bool ret = SyncSocket::Close(); + shutdown_event_.Reset(); + return ret; +} + +size_t CancelableSyncSocket::Send(const void* buffer, size_t length) { + return CancelableFileOperation(&WriteFile, handle_, + reinterpret_cast<const char*>(buffer), length, &file_operation_, + &shutdown_event_, this); +} + +size_t CancelableSyncSocket::Receive(void* buffer, size_t length) { + return CancelableFileOperation(&ReadFile, handle_, + reinterpret_cast<char*>(buffer), length, &file_operation_, + &shutdown_event_, this); +} + +// static +bool CancelableSyncSocket::CreatePair(CancelableSyncSocket* socket_a, + CancelableSyncSocket* socket_b) { + return CreatePairImpl(&socket_a->handle_, &socket_b->handle_, true); +} + + } // namespace base diff --git a/content/browser/renderer_host/media/audio_input_sync_writer.cc b/content/browser/renderer_host/media/audio_input_sync_writer.cc index 8add5b5..5a8870c 100644 --- a/content/browser/renderer_host/media/audio_input_sync_writer.cc +++ b/content/browser/renderer_host/media/audio_input_sync_writer.cc @@ -1,4 +1,4 @@ -// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Copyright (c) 2012 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. @@ -31,13 +31,9 @@ void AudioInputSyncWriter::Close() { } bool AudioInputSyncWriter::Init() { - base::SyncSocket* sockets[2] = {0}; - if (base::SyncSocket::CreatePair(sockets)) { - socket_.reset(sockets[0]); - foreign_socket_.reset(sockets[1]); - return true; - } - return false; + socket_.reset(new base::SyncSocket()); + foreign_socket_.reset(new base::SyncSocket()); + return base::SyncSocket::CreatePair(socket_.get(), foreign_socket_.get()); } #if defined(OS_WIN) diff --git a/content/browser/renderer_host/media/audio_sync_reader.cc b/content/browser/renderer_host/media/audio_sync_reader.cc index 7e21cf1..c06345a 100644 --- a/content/browser/renderer_host/media/audio_sync_reader.cc +++ b/content/browser/renderer_host/media/audio_sync_reader.cc @@ -1,4 +1,4 @@ -// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Copyright (c) 2012 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. @@ -92,13 +92,9 @@ void AudioSyncReader::Close() { } bool AudioSyncReader::Init() { - base::SyncSocket* sockets[2] = {0}; - if (base::SyncSocket::CreatePair(sockets)) { - socket_.reset(sockets[0]); - foreign_socket_.reset(sockets[1]); - return true; - } - return false; + socket_.reset(new base::SyncSocket()); + foreign_socket_.reset(new base::SyncSocket()); + return base::SyncSocket::CreatePair(socket_.get(), foreign_socket_.get()); } #if defined(OS_WIN) diff --git a/content/renderer/pepper_plugin_delegate_impl.cc b/content/renderer/pepper_plugin_delegate_impl.cc index 8bc1fb8..ffc7fc1 100644 --- a/content/renderer/pepper_plugin_delegate_impl.cc +++ b/content/renderer/pepper_plugin_delegate_impl.cc @@ -820,13 +820,11 @@ void PpapiBrokerImpl::ConnectPluginToBroker( base::SyncSocket::Handle plugin_handle = base::kInvalidPlatformFileValue; int32_t result = PP_OK; - base::SyncSocket* sockets[2] = {0}; - if (base::SyncSocket::CreatePair(sockets)) { - // The socket objects will be deleted when this function exits, closing the - // handles. Any uses of the socket must duplicate them. - scoped_ptr<base::SyncSocket> broker_socket(sockets[0]); - scoped_ptr<base::SyncSocket> plugin_socket(sockets[1]); - + // The socket objects will be deleted when this function exits, closing the + // handles. Any uses of the socket must duplicate them. + scoped_ptr<base::SyncSocket> broker_socket(new base::SyncSocket()); + scoped_ptr<base::SyncSocket> plugin_socket(new base::SyncSocket()); + if (base::SyncSocket::CreatePair(broker_socket.get(), plugin_socket.get())) { result = dispatcher_->SendHandleToBroker(client->pp_instance(), broker_socket->handle()); diff --git a/ipc/sync_socket_unittest.cc b/ipc/sync_socket_unittest.cc index 79cf6c3..dc50525 100644 --- a/ipc/sync_socket_unittest.cc +++ b/ipc/sync_socket_unittest.cc @@ -1,4 +1,4 @@ -// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Copyright (c) 2012 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. @@ -8,8 +8,10 @@ #include <string> #include <sstream> +#include "base/bind.h" #include "base/message_loop.h" #include "base/process_util.h" +#include "base/threading/thread.h" #include "ipc/ipc_channel_proxy.h" #include "ipc/ipc_tests.h" #include "testing/gtest/include/gtest/gtest.h" @@ -52,7 +54,7 @@ const size_t kHelloStringLength = arraysize(kHelloString); // messages from the client. class SyncSocketServerListener : public IPC::Channel::Listener { public: - SyncSocketServerListener() : chan_(NULL) { + SyncSocketServerListener() : chan_(NULL) { } void Init(IPC::Channel* chan) { @@ -171,25 +173,25 @@ TEST_F(SyncSocketTest, SanityTest) { base::ProcessHandle server_process = SpawnChild(SYNC_SOCKET_SERVER, &chan); ASSERT_TRUE(server_process); // Create a pair of SyncSockets. - base::SyncSocket* pair[2]; - base::SyncSocket::CreatePair(pair); + base::SyncSocket pair[2]; + base::SyncSocket::CreatePair(&pair[0], &pair[1]); // Immediately after creation there should be no pending bytes. - EXPECT_EQ(0U, pair[0]->Peek()); - EXPECT_EQ(0U, pair[1]->Peek()); + EXPECT_EQ(0U, pair[0].Peek()); + EXPECT_EQ(0U, pair[1].Peek()); base::SyncSocket::Handle target_handle; // Connect the channel and listener. ASSERT_TRUE(chan.Connect()); - listener.Init(pair[0], &chan); + listener.Init(&pair[0], &chan); #if defined(OS_WIN) // On windows we need to duplicate the handle into the server process. - BOOL retval = DuplicateHandle(GetCurrentProcess(), pair[1]->handle(), + BOOL retval = DuplicateHandle(GetCurrentProcess(), pair[1].handle(), server_process, &target_handle, 0, FALSE, DUPLICATE_SAME_ACCESS); EXPECT_TRUE(retval); // Set up a message to pass the handle to the server. IPC::Message* msg = new MsgClassSetHandle(target_handle); #else - target_handle = pair[1]->handle(); + target_handle = pair[1].handle(); // Set up a message to pass the handle to the server. base::FileDescriptor filedesc(target_handle, false); IPC::Message* msg = new MsgClassSetHandle(filedesc); @@ -198,8 +200,45 @@ TEST_F(SyncSocketTest, SanityTest) { // Use the current thread as the I/O thread. MessageLoop::current()->Run(); // Shut down. - delete pair[0]; - delete pair[1]; + pair[0].Close(); + pair[1].Close(); EXPECT_TRUE(base::WaitForSingleProcess(server_process, 5000)); base::CloseProcessHandle(server_process); } + +static void BlockingRead(base::SyncSocket* socket, size_t* received) { + // Notify the parent thread that we're up and running. + socket->Send(kHelloString, kHelloStringLength); + char buf[0xff]; // Won't ever be filled. + *received = socket->Receive(buf, arraysize(buf)); +} + +// Tests that we can safely end a blocking Receive operation on one thread +// from another thread by disconnecting (but not closing) the socket. +TEST_F(SyncSocketTest, DisconnectTest) { + base::CancelableSyncSocket pair[2]; + ASSERT_TRUE(base::CancelableSyncSocket::CreatePair(&pair[0], &pair[1])); + + base::Thread worker("BlockingThread"); + worker.Start(); + + // Try to do a blocking read from one of the sockets on the worker thread. + size_t received = 1U; // Initialize to an unexpected value. + worker.message_loop()->PostTask(FROM_HERE, + base::Bind(&BlockingRead, &pair[0], &received)); + + // Wait for the worker thread to say hello. + char hello[kHelloStringLength] = {0}; + pair[1].Receive(&hello[0], sizeof(hello)); + VLOG(1) << "Received: " << hello; + // Give the worker a chance to start Receive(). + base::PlatformThread::YieldCurrentThread(); + + // Now shut down the socket that the thread is issuing a blocking read on + // which should cause Receive to return with an error. + pair[0].Shutdown(); + + worker.Stop(); + + EXPECT_EQ(0U, received); +} diff --git a/media/audio/win/audio_output_win_unittest.cc b/media/audio/win/audio_output_win_unittest.cc index 4d5a13c..8a0b31f 100644 --- a/media/audio/win/audio_output_win_unittest.cc +++ b/media/audio/win/audio_output_win_unittest.cc @@ -636,7 +636,6 @@ class SyncSocketSource : public AudioOutputStream::AudioSourceCallback { : socket_(socket) {} ~SyncSocketSource() { - delete socket_; } // AudioSourceCallback::OnMoreData implementation: @@ -716,16 +715,16 @@ TEST(WinAudioTest, SyncSocketBasic) { ASSERT_TRUE(oas->Open()); - base::SyncSocket* sockets[2]; - ASSERT_TRUE(base::SyncSocket::CreatePair(sockets)); + base::SyncSocket sockets[2]; + ASSERT_TRUE(base::SyncSocket::CreatePair(&sockets[0], &sockets[1])); - SyncSocketSource source(sockets[0]); + SyncSocketSource source(&sockets[0]); SyncThreadContext thread_context; thread_context.sample_rate = sample_rate; thread_context.sine_freq = 200.0; thread_context.packet_size_bytes = kSamples20ms * 2; - thread_context.socket = sockets[1]; + thread_context.socket = &sockets[1]; HANDLE thread = ::CreateThread(NULL, 0, SyncSocketThread, &thread_context, 0, NULL); @@ -734,7 +733,6 @@ TEST(WinAudioTest, SyncSocketBasic) { ::WaitForSingleObject(thread, INFINITE); ::CloseHandle(thread); - delete sockets[1]; oas->Stop(); oas->Close(); |