diff options
-rw-r--r-- | base/sync_socket.h | 14 | ||||
-rw-r--r-- | base/sync_socket_posix.cc | 25 | ||||
-rw-r--r-- | base/sync_socket_win.cc | 38 | ||||
-rw-r--r-- | content/browser/renderer_host/media/audio_sync_reader.cc | 4 | ||||
-rw-r--r-- | content/browser/renderer_host/media/audio_sync_reader.h | 6 | ||||
-rw-r--r-- | ipc/sync_socket_unittest.cc | 78 |
6 files changed, 132 insertions, 33 deletions
diff --git a/base/sync_socket.h b/base/sync_socket.h index 9bf8836..d5bfb72 100644 --- a/base/sync_socket.h +++ b/base/sync_socket.h @@ -7,7 +7,7 @@ #pragma once // A socket abstraction used for sending and receiving plain -// data. Because they are blocking, they can be used to perform +// data. Because the receiving is blocking, they can be used to perform // rudimentary cross-process synchronization with low latency. #include "base/basictypes.h" @@ -77,8 +77,8 @@ class BASE_EXPORT SyncSocket { }; // Derives from SyncSocket and adds support for shutting down the socket from -// another thread while a blocking Receive or Send is being done from the thread -// that owns the socket. +// another thread while a blocking Receive or Send is being done from the +// thread that owns the socket. class BASE_EXPORT CancelableSyncSocket : public SyncSocket { public: CancelableSyncSocket(); @@ -102,10 +102,16 @@ class BASE_EXPORT CancelableSyncSocket : public SyncSocket { // supported on <Vista. So, for Windows only, we override these // SyncSocket methods in order to support shutting down the 'socket'. virtual bool Close() OVERRIDE; - virtual size_t Send(const void* buffer, size_t length) OVERRIDE; virtual size_t Receive(void* buffer, size_t length) OVERRIDE; #endif + // Send() is overridden to catch cases where the remote end is not responding + // and we fill the local socket buffer. When the buffer is full, this + // implementation of Send() will not block indefinitely as + // SyncSocket::Send will, but instead return 0, as no bytes could be sent. + // Note that the socket will not be closed in this case. + virtual size_t Send(const void* buffer, size_t length) OVERRIDE; + private: #if defined(OS_WIN) WaitableEvent shutdown_event_; diff --git a/base/sync_socket_posix.cc b/base/sync_socket_posix.cc index c5dca75..257916d 100644 --- a/base/sync_socket_posix.cc +++ b/base/sync_socket_posix.cc @@ -6,10 +6,11 @@ #include <errno.h> #include <limits.h> +#include <fcntl.h> #include <stdio.h> -#include <sys/types.h> #include <sys/ioctl.h> #include <sys/socket.h> +#include <sys/types.h> #if defined(OS_SOLARIS) #include <sys/filio.h> @@ -95,7 +96,8 @@ size_t SyncSocket::Send(const void* buffer, size_t length) { DCHECK_LE(length, kMaxMessageLength); const char* charbuffer = static_cast<const char*>(buffer); int len = file_util::WriteFileDescriptor(handle_, charbuffer, length); - return static_cast<size_t>(len); + + return (len == -1) ? 0 : static_cast<size_t>(len); } size_t SyncSocket::Receive(void* buffer, size_t length) { @@ -124,6 +126,25 @@ bool CancelableSyncSocket::Shutdown() { return HANDLE_EINTR(shutdown(handle(), SHUT_RDWR)) >= 0; } +size_t CancelableSyncSocket::Send(const void* buffer, size_t length) { + long flags = 0; + flags = fcntl(handle_, F_GETFL, NULL); + if (flags != -1 && (flags & O_NONBLOCK) == 0) { + // Set the socket to non-blocking mode for sending if its original mode + // is blocking. + fcntl(handle_, F_SETFL, flags | O_NONBLOCK); + } + + size_t len = SyncSocket::Send(buffer, length); + + if (flags != -1 && (flags & O_NONBLOCK) == 0) { + // Restore the original flags. + fcntl(handle_, F_SETFL, flags); + } + + return len; +} + // static bool CancelableSyncSocket::CreatePair(CancelableSyncSocket* socket_a, CancelableSyncSocket* socket_b) { diff --git a/base/sync_socket_win.cc b/base/sync_socket_win.cc index c6fb1ce..4fcd572 100644 --- a/base/sync_socket_win.cc +++ b/base/sync_socket_win.cc @@ -114,7 +114,8 @@ size_t CancelableFileOperation(Function operation, HANDLE file, BufferType* buffer, size_t length, base::WaitableEvent* io_event, base::WaitableEvent* cancel_event, - CancelableSyncSocket* socket) { + CancelableSyncSocket* socket, + DWORD timeout_in_ms) { // The buffer must be byte size or the length check won't make much sense. COMPILE_ASSERT(sizeof(buffer[0]) == sizeof(char), incorrect_buffer_type); DCHECK_LE(length, kMaxMessageLength); @@ -131,24 +132,38 @@ size_t CancelableFileOperation(Function operation, HANDLE file, &len, &ol); if (!ok) { if (::GetLastError() == ERROR_IO_PENDING) { - base::WaitableEvent* events[] = { io_event, cancel_event }; - size_t signaled = WaitableEvent::WaitMany(events, arraysize(events)); - if (signaled == 1) { + HANDLE events[] = { io_event->handle(), cancel_event->handle() }; + int wait_result = WaitForMultipleObjects( + arraysize(events), events, FALSE, timeout_in_ms); + if (wait_result == (WAIT_OBJECT_0 + 0)) { + GetOverlappedResult(file, &ol, &len, TRUE); + } else if (wait_result == (WAIT_OBJECT_0 + 1)) { VLOG(1) << "Shutdown was signaled. Closing socket."; CancelIo(file); socket->Close(); count = 0; break; } else { - GetOverlappedResult(file, &ol, &len, TRUE); + // Timeout happened. + DCHECK_EQ(WAIT_TIMEOUT, wait_result); + if (!CancelIo(file)){ + DLOG(WARNING) << "CancelIo() failed"; + } + break; } } else { - return (0 < count) ? count : 0; + break; } } + count += len; + + // Quit the operation if we can't write/read anymore. + if (len != chunk) + break; } - return count; + + return (count > 0) ? count : 0; } } // namespace @@ -234,15 +249,16 @@ bool CancelableSyncSocket::Close() { } size_t CancelableSyncSocket::Send(const void* buffer, size_t length) { - return CancelableFileOperation(&WriteFile, handle_, - reinterpret_cast<const char*>(buffer), length, &file_operation_, - &shutdown_event_, this); + static const DWORD kWaitTimeOutInMs = 500; + return CancelableFileOperation( + &WriteFile, handle_, reinterpret_cast<const char*>(buffer), + length, &file_operation_, &shutdown_event_, this, kWaitTimeOutInMs); } size_t CancelableSyncSocket::Receive(void* buffer, size_t length) { return CancelableFileOperation(&ReadFile, handle_, reinterpret_cast<char*>(buffer), length, &file_operation_, - &shutdown_event_, this); + &shutdown_event_, this, INFINITE); } // static diff --git a/content/browser/renderer_host/media/audio_sync_reader.cc b/content/browser/renderer_host/media/audio_sync_reader.cc index 7b6664b..de121df 100644 --- a/content/browser/renderer_host/media/audio_sync_reader.cc +++ b/content/browser/renderer_host/media/audio_sync_reader.cc @@ -36,7 +36,7 @@ void AudioSyncReader::UpdatePendingBytes(uint32 bytes) { shared_memory_, media::PacketSizeSizeInBytes(shared_memory_->created_size())); } - base::AutoLock auto_lock(lock_); + if (socket_.get()) { socket_->Send(&bytes, sizeof(bytes)); } @@ -84,10 +84,8 @@ uint32 AudioSyncReader::Read(void* data, uint32 size) { } void AudioSyncReader::Close() { - base::AutoLock auto_lock(lock_); if (socket_.get()) { socket_->Close(); - socket_.reset(NULL); } } diff --git a/content/browser/renderer_host/media/audio_sync_reader.h b/content/browser/renderer_host/media/audio_sync_reader.h index 60a2879..8aef113 100644 --- a/content/browser/renderer_host/media/audio_sync_reader.h +++ b/content/browser/renderer_host/media/audio_sync_reader.h @@ -52,12 +52,6 @@ class AudioSyncReader : public media::AudioOutputController::SyncReader { // PrepareForeignSocketHandle() is called and ran successfully. scoped_ptr<base::CancelableSyncSocket> foreign_socket_; - // Protect socket_ access by lock to prevent race condition when audio - // controller thread closes the reader and hardware audio thread is reading - // data. This way we know that socket would not be deleted while we are - // writing data to it. - base::Lock lock_; - DISALLOW_COPY_AND_ASSIGN(AudioSyncReader); }; diff --git a/ipc/sync_socket_unittest.cc b/ipc/sync_socket_unittest.cc index dc50525..6aa3330 100644 --- a/ipc/sync_socket_unittest.cc +++ b/ipc/sync_socket_unittest.cc @@ -89,8 +89,8 @@ class SyncSocketServerListener : public IPC::Channel::Listener { void SetHandle(base::SyncSocket::Handle handle) { base::SyncSocket sync_socket(handle); - EXPECT_EQ(sync_socket.Send(static_cast<const void*>(kHelloString), - kHelloStringLength), kHelloStringLength); + EXPECT_EQ(sync_socket.Send(kHelloString, kHelloStringLength), + kHelloStringLength); IPC::Message* msg = new MsgClassResponse(kHelloString); EXPECT_TRUE(chan_->Send(msg)); } @@ -206,11 +206,15 @@ TEST_F(SyncSocketTest, SanityTest) { base::CloseProcessHandle(server_process); } -static void BlockingRead(base::SyncSocket* socket, size_t* received) { + +// A blocking read operation that will block the thread until it receives +// |length| bytes of packets or Shutdown() is called on another thread. +static void BlockingRead(base::SyncSocket* socket, char* buf, + size_t length, size_t* received) { + DCHECK(buf != NULL); // Notify the parent thread that we're up and running. socket->Send(kHelloString, kHelloStringLength); - char buf[0xff]; // Won't ever be filled. - *received = socket->Receive(buf, arraysize(buf)); + *received = socket->Receive(buf, length); } // Tests that we can safely end a blocking Receive operation on one thread @@ -223,14 +227,15 @@ TEST_F(SyncSocketTest, DisconnectTest) { worker.Start(); // Try to do a blocking read from one of the sockets on the worker thread. + char buf[0xff]; size_t received = 1U; // Initialize to an unexpected value. worker.message_loop()->PostTask(FROM_HERE, - base::Bind(&BlockingRead, &pair[0], &received)); + base::Bind(&BlockingRead, &pair[0], &buf[0], arraysize(buf), &received)); // Wait for the worker thread to say hello. char hello[kHelloStringLength] = {0}; pair[1].Receive(&hello[0], sizeof(hello)); - VLOG(1) << "Received: " << hello; + EXPECT_EQ(0, strcmp(hello, kHelloString)); // Give the worker a chance to start Receive(). base::PlatformThread::YieldCurrentThread(); @@ -242,3 +247,62 @@ TEST_F(SyncSocketTest, DisconnectTest) { EXPECT_EQ(0U, received); } + +// Tests that read is a blocking operation. +TEST_F(SyncSocketTest, BlockingReceiveTest) { + base::CancelableSyncSocket pair[2]; + ASSERT_TRUE(base::CancelableSyncSocket::CreatePair(&pair[0], &pair[1])); + + base::Thread worker("BlockingThread"); + worker.Start(); + + // Try to do a blocking read from one of the sockets on the worker thread. + char buf[kHelloStringLength] = {0}; + size_t received = 1U; // Initialize to an unexpected value. + worker.message_loop()->PostTask(FROM_HERE, + base::Bind(&BlockingRead, &pair[0], &buf[0], + kHelloStringLength, &received)); + + // Wait for the worker thread to say hello. + char hello[kHelloStringLength] = {0}; + pair[1].Receive(&hello[0], sizeof(hello)); + EXPECT_EQ(0, strcmp(hello, kHelloString)); + // Give the worker a chance to start Receive(). + base::PlatformThread::YieldCurrentThread(); + + // Send a message to the socket on the blocking thead, it should free the + // socket from Receive(). + pair[1].Send(kHelloString, kHelloStringLength); + worker.Stop(); + + // Verify the socket has received the message. + EXPECT_TRUE(strcmp(buf, kHelloString) == 0); + EXPECT_EQ(kHelloStringLength, received); +} + +// Tests that the write operation is non-blocking and returns immediately +// when there is insufficient space in the socket's buffer. +TEST_F(SyncSocketTest, NonBlockingWriteTest) { + base::CancelableSyncSocket pair[2]; + ASSERT_TRUE(base::CancelableSyncSocket::CreatePair(&pair[0], &pair[1])); + + // Fill up the buffer for one of the socket, Send() should not block the + // thread even when the buffer is full. + while (pair[0].Send(kHelloString, kHelloStringLength) != 0) {} + + // Data should be avialble on another socket. + size_t bytes_in_buffer = pair[1].Peek(); + EXPECT_NE(bytes_in_buffer, 0U); + + // No more data can be written to the buffer since socket has been full, + // verify that the amount of avialble data on another socket is unchanged. + EXPECT_EQ(0U, pair[0].Send(kHelloString, kHelloStringLength)); + EXPECT_EQ(bytes_in_buffer, pair[1].Peek()); + + // Read from another socket to free some space for a new write. + char hello[kHelloStringLength] = {0}; + pair[1].Receive(&hello[0], sizeof(hello)); + + // Should be able to write more data to the buffer now. + EXPECT_EQ(kHelloStringLength, pair[0].Send(kHelloString, kHelloStringLength)); +} |