summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--base/sync_socket.h14
-rw-r--r--base/sync_socket_posix.cc25
-rw-r--r--base/sync_socket_win.cc38
-rw-r--r--content/browser/renderer_host/media/audio_sync_reader.cc4
-rw-r--r--content/browser/renderer_host/media/audio_sync_reader.h6
-rw-r--r--ipc/sync_socket_unittest.cc78
6 files changed, 132 insertions, 33 deletions
diff --git a/base/sync_socket.h b/base/sync_socket.h
index 9bf8836..d5bfb72 100644
--- a/base/sync_socket.h
+++ b/base/sync_socket.h
@@ -7,7 +7,7 @@
#pragma once
// A socket abstraction used for sending and receiving plain
-// data. Because they are blocking, they can be used to perform
+// data. Because the receiving is blocking, they can be used to perform
// rudimentary cross-process synchronization with low latency.
#include "base/basictypes.h"
@@ -77,8 +77,8 @@ class BASE_EXPORT SyncSocket {
};
// Derives from SyncSocket and adds support for shutting down the socket from
-// another thread while a blocking Receive or Send is being done from the thread
-// that owns the socket.
+// another thread while a blocking Receive or Send is being done from the
+// thread that owns the socket.
class BASE_EXPORT CancelableSyncSocket : public SyncSocket {
public:
CancelableSyncSocket();
@@ -102,10 +102,16 @@ class BASE_EXPORT CancelableSyncSocket : public SyncSocket {
// supported on <Vista. So, for Windows only, we override these
// SyncSocket methods in order to support shutting down the 'socket'.
virtual bool Close() OVERRIDE;
- virtual size_t Send(const void* buffer, size_t length) OVERRIDE;
virtual size_t Receive(void* buffer, size_t length) OVERRIDE;
#endif
+ // Send() is overridden to catch cases where the remote end is not responding
+ // and we fill the local socket buffer. When the buffer is full, this
+ // implementation of Send() will not block indefinitely as
+ // SyncSocket::Send will, but instead return 0, as no bytes could be sent.
+ // Note that the socket will not be closed in this case.
+ virtual size_t Send(const void* buffer, size_t length) OVERRIDE;
+
private:
#if defined(OS_WIN)
WaitableEvent shutdown_event_;
diff --git a/base/sync_socket_posix.cc b/base/sync_socket_posix.cc
index c5dca75..257916d 100644
--- a/base/sync_socket_posix.cc
+++ b/base/sync_socket_posix.cc
@@ -6,10 +6,11 @@
#include <errno.h>
#include <limits.h>
+#include <fcntl.h>
#include <stdio.h>
-#include <sys/types.h>
#include <sys/ioctl.h>
#include <sys/socket.h>
+#include <sys/types.h>
#if defined(OS_SOLARIS)
#include <sys/filio.h>
@@ -95,7 +96,8 @@ size_t SyncSocket::Send(const void* buffer, size_t length) {
DCHECK_LE(length, kMaxMessageLength);
const char* charbuffer = static_cast<const char*>(buffer);
int len = file_util::WriteFileDescriptor(handle_, charbuffer, length);
- return static_cast<size_t>(len);
+
+ return (len == -1) ? 0 : static_cast<size_t>(len);
}
size_t SyncSocket::Receive(void* buffer, size_t length) {
@@ -124,6 +126,25 @@ bool CancelableSyncSocket::Shutdown() {
return HANDLE_EINTR(shutdown(handle(), SHUT_RDWR)) >= 0;
}
+size_t CancelableSyncSocket::Send(const void* buffer, size_t length) {
+ long flags = 0;
+ flags = fcntl(handle_, F_GETFL, NULL);
+ if (flags != -1 && (flags & O_NONBLOCK) == 0) {
+ // Set the socket to non-blocking mode for sending if its original mode
+ // is blocking.
+ fcntl(handle_, F_SETFL, flags | O_NONBLOCK);
+ }
+
+ size_t len = SyncSocket::Send(buffer, length);
+
+ if (flags != -1 && (flags & O_NONBLOCK) == 0) {
+ // Restore the original flags.
+ fcntl(handle_, F_SETFL, flags);
+ }
+
+ return len;
+}
+
// static
bool CancelableSyncSocket::CreatePair(CancelableSyncSocket* socket_a,
CancelableSyncSocket* socket_b) {
diff --git a/base/sync_socket_win.cc b/base/sync_socket_win.cc
index c6fb1ce..4fcd572 100644
--- a/base/sync_socket_win.cc
+++ b/base/sync_socket_win.cc
@@ -114,7 +114,8 @@ size_t CancelableFileOperation(Function operation, HANDLE file,
BufferType* buffer, size_t length,
base::WaitableEvent* io_event,
base::WaitableEvent* cancel_event,
- CancelableSyncSocket* socket) {
+ CancelableSyncSocket* socket,
+ DWORD timeout_in_ms) {
// The buffer must be byte size or the length check won't make much sense.
COMPILE_ASSERT(sizeof(buffer[0]) == sizeof(char), incorrect_buffer_type);
DCHECK_LE(length, kMaxMessageLength);
@@ -131,24 +132,38 @@ size_t CancelableFileOperation(Function operation, HANDLE file,
&len, &ol);
if (!ok) {
if (::GetLastError() == ERROR_IO_PENDING) {
- base::WaitableEvent* events[] = { io_event, cancel_event };
- size_t signaled = WaitableEvent::WaitMany(events, arraysize(events));
- if (signaled == 1) {
+ HANDLE events[] = { io_event->handle(), cancel_event->handle() };
+ int wait_result = WaitForMultipleObjects(
+ arraysize(events), events, FALSE, timeout_in_ms);
+ if (wait_result == (WAIT_OBJECT_0 + 0)) {
+ GetOverlappedResult(file, &ol, &len, TRUE);
+ } else if (wait_result == (WAIT_OBJECT_0 + 1)) {
VLOG(1) << "Shutdown was signaled. Closing socket.";
CancelIo(file);
socket->Close();
count = 0;
break;
} else {
- GetOverlappedResult(file, &ol, &len, TRUE);
+ // Timeout happened.
+ DCHECK_EQ(WAIT_TIMEOUT, wait_result);
+ if (!CancelIo(file)){
+ DLOG(WARNING) << "CancelIo() failed";
+ }
+ break;
}
} else {
- return (0 < count) ? count : 0;
+ break;
}
}
+
count += len;
+
+ // Quit the operation if we can't write/read anymore.
+ if (len != chunk)
+ break;
}
- return count;
+
+ return (count > 0) ? count : 0;
}
} // namespace
@@ -234,15 +249,16 @@ bool CancelableSyncSocket::Close() {
}
size_t CancelableSyncSocket::Send(const void* buffer, size_t length) {
- return CancelableFileOperation(&WriteFile, handle_,
- reinterpret_cast<const char*>(buffer), length, &file_operation_,
- &shutdown_event_, this);
+ static const DWORD kWaitTimeOutInMs = 500;
+ return CancelableFileOperation(
+ &WriteFile, handle_, reinterpret_cast<const char*>(buffer),
+ length, &file_operation_, &shutdown_event_, this, kWaitTimeOutInMs);
}
size_t CancelableSyncSocket::Receive(void* buffer, size_t length) {
return CancelableFileOperation(&ReadFile, handle_,
reinterpret_cast<char*>(buffer), length, &file_operation_,
- &shutdown_event_, this);
+ &shutdown_event_, this, INFINITE);
}
// static
diff --git a/content/browser/renderer_host/media/audio_sync_reader.cc b/content/browser/renderer_host/media/audio_sync_reader.cc
index 7b6664b..de121df 100644
--- a/content/browser/renderer_host/media/audio_sync_reader.cc
+++ b/content/browser/renderer_host/media/audio_sync_reader.cc
@@ -36,7 +36,7 @@ void AudioSyncReader::UpdatePendingBytes(uint32 bytes) {
shared_memory_,
media::PacketSizeSizeInBytes(shared_memory_->created_size()));
}
- base::AutoLock auto_lock(lock_);
+
if (socket_.get()) {
socket_->Send(&bytes, sizeof(bytes));
}
@@ -84,10 +84,8 @@ uint32 AudioSyncReader::Read(void* data, uint32 size) {
}
void AudioSyncReader::Close() {
- base::AutoLock auto_lock(lock_);
if (socket_.get()) {
socket_->Close();
- socket_.reset(NULL);
}
}
diff --git a/content/browser/renderer_host/media/audio_sync_reader.h b/content/browser/renderer_host/media/audio_sync_reader.h
index 60a2879..8aef113 100644
--- a/content/browser/renderer_host/media/audio_sync_reader.h
+++ b/content/browser/renderer_host/media/audio_sync_reader.h
@@ -52,12 +52,6 @@ class AudioSyncReader : public media::AudioOutputController::SyncReader {
// PrepareForeignSocketHandle() is called and ran successfully.
scoped_ptr<base::CancelableSyncSocket> foreign_socket_;
- // Protect socket_ access by lock to prevent race condition when audio
- // controller thread closes the reader and hardware audio thread is reading
- // data. This way we know that socket would not be deleted while we are
- // writing data to it.
- base::Lock lock_;
-
DISALLOW_COPY_AND_ASSIGN(AudioSyncReader);
};
diff --git a/ipc/sync_socket_unittest.cc b/ipc/sync_socket_unittest.cc
index dc50525..6aa3330 100644
--- a/ipc/sync_socket_unittest.cc
+++ b/ipc/sync_socket_unittest.cc
@@ -89,8 +89,8 @@ class SyncSocketServerListener : public IPC::Channel::Listener {
void SetHandle(base::SyncSocket::Handle handle) {
base::SyncSocket sync_socket(handle);
- EXPECT_EQ(sync_socket.Send(static_cast<const void*>(kHelloString),
- kHelloStringLength), kHelloStringLength);
+ EXPECT_EQ(sync_socket.Send(kHelloString, kHelloStringLength),
+ kHelloStringLength);
IPC::Message* msg = new MsgClassResponse(kHelloString);
EXPECT_TRUE(chan_->Send(msg));
}
@@ -206,11 +206,15 @@ TEST_F(SyncSocketTest, SanityTest) {
base::CloseProcessHandle(server_process);
}
-static void BlockingRead(base::SyncSocket* socket, size_t* received) {
+
+// A blocking read operation that will block the thread until it receives
+// |length| bytes of packets or Shutdown() is called on another thread.
+static void BlockingRead(base::SyncSocket* socket, char* buf,
+ size_t length, size_t* received) {
+ DCHECK(buf != NULL);
// Notify the parent thread that we're up and running.
socket->Send(kHelloString, kHelloStringLength);
- char buf[0xff]; // Won't ever be filled.
- *received = socket->Receive(buf, arraysize(buf));
+ *received = socket->Receive(buf, length);
}
// Tests that we can safely end a blocking Receive operation on one thread
@@ -223,14 +227,15 @@ TEST_F(SyncSocketTest, DisconnectTest) {
worker.Start();
// Try to do a blocking read from one of the sockets on the worker thread.
+ char buf[0xff];
size_t received = 1U; // Initialize to an unexpected value.
worker.message_loop()->PostTask(FROM_HERE,
- base::Bind(&BlockingRead, &pair[0], &received));
+ base::Bind(&BlockingRead, &pair[0], &buf[0], arraysize(buf), &received));
// Wait for the worker thread to say hello.
char hello[kHelloStringLength] = {0};
pair[1].Receive(&hello[0], sizeof(hello));
- VLOG(1) << "Received: " << hello;
+ EXPECT_EQ(0, strcmp(hello, kHelloString));
// Give the worker a chance to start Receive().
base::PlatformThread::YieldCurrentThread();
@@ -242,3 +247,62 @@ TEST_F(SyncSocketTest, DisconnectTest) {
EXPECT_EQ(0U, received);
}
+
+// Tests that read is a blocking operation.
+TEST_F(SyncSocketTest, BlockingReceiveTest) {
+ base::CancelableSyncSocket pair[2];
+ ASSERT_TRUE(base::CancelableSyncSocket::CreatePair(&pair[0], &pair[1]));
+
+ base::Thread worker("BlockingThread");
+ worker.Start();
+
+ // Try to do a blocking read from one of the sockets on the worker thread.
+ char buf[kHelloStringLength] = {0};
+ size_t received = 1U; // Initialize to an unexpected value.
+ worker.message_loop()->PostTask(FROM_HERE,
+ base::Bind(&BlockingRead, &pair[0], &buf[0],
+ kHelloStringLength, &received));
+
+ // Wait for the worker thread to say hello.
+ char hello[kHelloStringLength] = {0};
+ pair[1].Receive(&hello[0], sizeof(hello));
+ EXPECT_EQ(0, strcmp(hello, kHelloString));
+ // Give the worker a chance to start Receive().
+ base::PlatformThread::YieldCurrentThread();
+
+ // Send a message to the socket on the blocking thead, it should free the
+ // socket from Receive().
+ pair[1].Send(kHelloString, kHelloStringLength);
+ worker.Stop();
+
+ // Verify the socket has received the message.
+ EXPECT_TRUE(strcmp(buf, kHelloString) == 0);
+ EXPECT_EQ(kHelloStringLength, received);
+}
+
+// Tests that the write operation is non-blocking and returns immediately
+// when there is insufficient space in the socket's buffer.
+TEST_F(SyncSocketTest, NonBlockingWriteTest) {
+ base::CancelableSyncSocket pair[2];
+ ASSERT_TRUE(base::CancelableSyncSocket::CreatePair(&pair[0], &pair[1]));
+
+ // Fill up the buffer for one of the socket, Send() should not block the
+ // thread even when the buffer is full.
+ while (pair[0].Send(kHelloString, kHelloStringLength) != 0) {}
+
+ // Data should be avialble on another socket.
+ size_t bytes_in_buffer = pair[1].Peek();
+ EXPECT_NE(bytes_in_buffer, 0U);
+
+ // No more data can be written to the buffer since socket has been full,
+ // verify that the amount of avialble data on another socket is unchanged.
+ EXPECT_EQ(0U, pair[0].Send(kHelloString, kHelloStringLength));
+ EXPECT_EQ(bytes_in_buffer, pair[1].Peek());
+
+ // Read from another socket to free some space for a new write.
+ char hello[kHelloStringLength] = {0};
+ pair[1].Receive(&hello[0], sizeof(hello));
+
+ // Should be able to write more data to the buffer now.
+ EXPECT_EQ(kHelloStringLength, pair[0].Send(kHelloString, kHelloStringLength));
+}