diff options
author | xians@chromium.org <xians@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2012-04-18 19:30:13 +0000 |
---|---|---|
committer | xians@chromium.org <xians@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2012-04-18 19:30:13 +0000 |
commit | 1fa64821c890d866c6b4e5463f5891ccfe1fc9f1 (patch) | |
tree | 48be2d823ca4833944be51e04d0e36f77f6b7123 | |
parent | f45c5f3d74273e913c62598fcbde35e20924ab50 (diff) | |
download | chromium_src-1fa64821c890d866c6b4e5463f5891ccfe1fc9f1.zip chromium_src-1fa64821c890d866c6b4e5463f5891ccfe1fc9f1.tar.gz chromium_src-1fa64821c890d866c6b4e5463f5891ccfe1fc9f1.tar.bz2 |
If we are using blocking write, when the renderer stop getting the data without notifying the browser, it will hang the browser. This happens with some plugins which use the sync sockets provided by the Pepper.
This patch change CancellableSyncSocket to be non-blocking on sending, so that we don't need to worry the whole browser hangs by one plugin application.
Also, we remove the lock in audio_sync_reader.cc since it is not really needed if we don't set the socket_ to NULL when calling Close(). By doing this we allow the user to close the socket while another thread is writing to the socket.
BUG=121152
TEST=ipc_tests
Review URL: http://codereview.chromium.org/10000004
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@132842 0039d316-1c4b-4281-b951-d872f2087c98
-rw-r--r-- | base/sync_socket.h | 14 | ||||
-rw-r--r-- | base/sync_socket_posix.cc | 25 | ||||
-rw-r--r-- | base/sync_socket_win.cc | 38 | ||||
-rw-r--r-- | content/browser/renderer_host/media/audio_sync_reader.cc | 4 | ||||
-rw-r--r-- | content/browser/renderer_host/media/audio_sync_reader.h | 6 | ||||
-rw-r--r-- | ipc/sync_socket_unittest.cc | 79 |
6 files changed, 133 insertions, 33 deletions
diff --git a/base/sync_socket.h b/base/sync_socket.h index 9bf8836..d5bfb72 100644 --- a/base/sync_socket.h +++ b/base/sync_socket.h @@ -7,7 +7,7 @@ #pragma once // A socket abstraction used for sending and receiving plain -// data. Because they are blocking, they can be used to perform +// data. Because the receiving is blocking, they can be used to perform // rudimentary cross-process synchronization with low latency. #include "base/basictypes.h" @@ -77,8 +77,8 @@ class BASE_EXPORT SyncSocket { }; // Derives from SyncSocket and adds support for shutting down the socket from -// another thread while a blocking Receive or Send is being done from the thread -// that owns the socket. +// another thread while a blocking Receive or Send is being done from the +// thread that owns the socket. class BASE_EXPORT CancelableSyncSocket : public SyncSocket { public: CancelableSyncSocket(); @@ -102,10 +102,16 @@ class BASE_EXPORT CancelableSyncSocket : public SyncSocket { // supported on <Vista. So, for Windows only, we override these // SyncSocket methods in order to support shutting down the 'socket'. virtual bool Close() OVERRIDE; - virtual size_t Send(const void* buffer, size_t length) OVERRIDE; virtual size_t Receive(void* buffer, size_t length) OVERRIDE; #endif + // Send() is overridden to catch cases where the remote end is not responding + // and we fill the local socket buffer. When the buffer is full, this + // implementation of Send() will not block indefinitely as + // SyncSocket::Send will, but instead return 0, as no bytes could be sent. + // Note that the socket will not be closed in this case. + virtual size_t Send(const void* buffer, size_t length) OVERRIDE; + private: #if defined(OS_WIN) WaitableEvent shutdown_event_; diff --git a/base/sync_socket_posix.cc b/base/sync_socket_posix.cc index c5dca75..257916d 100644 --- a/base/sync_socket_posix.cc +++ b/base/sync_socket_posix.cc @@ -6,10 +6,11 @@ #include <errno.h> #include <limits.h> +#include <fcntl.h> #include <stdio.h> -#include <sys/types.h> #include <sys/ioctl.h> #include <sys/socket.h> +#include <sys/types.h> #if defined(OS_SOLARIS) #include <sys/filio.h> @@ -95,7 +96,8 @@ size_t SyncSocket::Send(const void* buffer, size_t length) { DCHECK_LE(length, kMaxMessageLength); const char* charbuffer = static_cast<const char*>(buffer); int len = file_util::WriteFileDescriptor(handle_, charbuffer, length); - return static_cast<size_t>(len); + + return (len == -1) ? 0 : static_cast<size_t>(len); } size_t SyncSocket::Receive(void* buffer, size_t length) { @@ -124,6 +126,25 @@ bool CancelableSyncSocket::Shutdown() { return HANDLE_EINTR(shutdown(handle(), SHUT_RDWR)) >= 0; } +size_t CancelableSyncSocket::Send(const void* buffer, size_t length) { + long flags = 0; + flags = fcntl(handle_, F_GETFL, NULL); + if (flags != -1 && (flags & O_NONBLOCK) == 0) { + // Set the socket to non-blocking mode for sending if its original mode + // is blocking. + fcntl(handle_, F_SETFL, flags | O_NONBLOCK); + } + + size_t len = SyncSocket::Send(buffer, length); + + if (flags != -1 && (flags & O_NONBLOCK) == 0) { + // Restore the original flags. + fcntl(handle_, F_SETFL, flags); + } + + return len; +} + // static bool CancelableSyncSocket::CreatePair(CancelableSyncSocket* socket_a, CancelableSyncSocket* socket_b) { diff --git a/base/sync_socket_win.cc b/base/sync_socket_win.cc index c6fb1ce..4fcd572 100644 --- a/base/sync_socket_win.cc +++ b/base/sync_socket_win.cc @@ -114,7 +114,8 @@ size_t CancelableFileOperation(Function operation, HANDLE file, BufferType* buffer, size_t length, base::WaitableEvent* io_event, base::WaitableEvent* cancel_event, - CancelableSyncSocket* socket) { + CancelableSyncSocket* socket, + DWORD timeout_in_ms) { // The buffer must be byte size or the length check won't make much sense. COMPILE_ASSERT(sizeof(buffer[0]) == sizeof(char), incorrect_buffer_type); DCHECK_LE(length, kMaxMessageLength); @@ -131,24 +132,38 @@ size_t CancelableFileOperation(Function operation, HANDLE file, &len, &ol); if (!ok) { if (::GetLastError() == ERROR_IO_PENDING) { - base::WaitableEvent* events[] = { io_event, cancel_event }; - size_t signaled = WaitableEvent::WaitMany(events, arraysize(events)); - if (signaled == 1) { + HANDLE events[] = { io_event->handle(), cancel_event->handle() }; + int wait_result = WaitForMultipleObjects( + arraysize(events), events, FALSE, timeout_in_ms); + if (wait_result == (WAIT_OBJECT_0 + 0)) { + GetOverlappedResult(file, &ol, &len, TRUE); + } else if (wait_result == (WAIT_OBJECT_0 + 1)) { VLOG(1) << "Shutdown was signaled. Closing socket."; CancelIo(file); socket->Close(); count = 0; break; } else { - GetOverlappedResult(file, &ol, &len, TRUE); + // Timeout happened. + DCHECK_EQ(WAIT_TIMEOUT, wait_result); + if (!CancelIo(file)){ + DLOG(WARNING) << "CancelIo() failed"; + } + break; } } else { - return (0 < count) ? count : 0; + break; } } + count += len; + + // Quit the operation if we can't write/read anymore. + if (len != chunk) + break; } - return count; + + return (count > 0) ? count : 0; } } // namespace @@ -234,15 +249,16 @@ bool CancelableSyncSocket::Close() { } size_t CancelableSyncSocket::Send(const void* buffer, size_t length) { - return CancelableFileOperation(&WriteFile, handle_, - reinterpret_cast<const char*>(buffer), length, &file_operation_, - &shutdown_event_, this); + static const DWORD kWaitTimeOutInMs = 500; + return CancelableFileOperation( + &WriteFile, handle_, reinterpret_cast<const char*>(buffer), + length, &file_operation_, &shutdown_event_, this, kWaitTimeOutInMs); } size_t CancelableSyncSocket::Receive(void* buffer, size_t length) { return CancelableFileOperation(&ReadFile, handle_, reinterpret_cast<char*>(buffer), length, &file_operation_, - &shutdown_event_, this); + &shutdown_event_, this, INFINITE); } // static diff --git a/content/browser/renderer_host/media/audio_sync_reader.cc b/content/browser/renderer_host/media/audio_sync_reader.cc index 7b6664b..de121df 100644 --- a/content/browser/renderer_host/media/audio_sync_reader.cc +++ b/content/browser/renderer_host/media/audio_sync_reader.cc @@ -36,7 +36,7 @@ void AudioSyncReader::UpdatePendingBytes(uint32 bytes) { shared_memory_, media::PacketSizeSizeInBytes(shared_memory_->created_size())); } - base::AutoLock auto_lock(lock_); + if (socket_.get()) { socket_->Send(&bytes, sizeof(bytes)); } @@ -84,10 +84,8 @@ uint32 AudioSyncReader::Read(void* data, uint32 size) { } void AudioSyncReader::Close() { - base::AutoLock auto_lock(lock_); if (socket_.get()) { socket_->Close(); - socket_.reset(NULL); } } diff --git a/content/browser/renderer_host/media/audio_sync_reader.h b/content/browser/renderer_host/media/audio_sync_reader.h index 60a2879..8aef113 100644 --- a/content/browser/renderer_host/media/audio_sync_reader.h +++ b/content/browser/renderer_host/media/audio_sync_reader.h @@ -52,12 +52,6 @@ class AudioSyncReader : public media::AudioOutputController::SyncReader { // PrepareForeignSocketHandle() is called and ran successfully. scoped_ptr<base::CancelableSyncSocket> foreign_socket_; - // Protect socket_ access by lock to prevent race condition when audio - // controller thread closes the reader and hardware audio thread is reading - // data. This way we know that socket would not be deleted while we are - // writing data to it. - base::Lock lock_; - DISALLOW_COPY_AND_ASSIGN(AudioSyncReader); }; diff --git a/ipc/sync_socket_unittest.cc b/ipc/sync_socket_unittest.cc index dc50525..0e11eca 100644 --- a/ipc/sync_socket_unittest.cc +++ b/ipc/sync_socket_unittest.cc @@ -89,8 +89,8 @@ class SyncSocketServerListener : public IPC::Channel::Listener { void SetHandle(base::SyncSocket::Handle handle) { base::SyncSocket sync_socket(handle); - EXPECT_EQ(sync_socket.Send(static_cast<const void*>(kHelloString), - kHelloStringLength), kHelloStringLength); + EXPECT_EQ(sync_socket.Send(kHelloString, kHelloStringLength), + kHelloStringLength); IPC::Message* msg = new MsgClassResponse(kHelloString); EXPECT_TRUE(chan_->Send(msg)); } @@ -206,11 +206,12 @@ TEST_F(SyncSocketTest, SanityTest) { base::CloseProcessHandle(server_process); } -static void BlockingRead(base::SyncSocket* socket, size_t* received) { +static void BlockingRead(base::SyncSocket* socket, char* buf, + size_t length, size_t* received) { + DCHECK(buf != NULL); // Notify the parent thread that we're up and running. socket->Send(kHelloString, kHelloStringLength); - char buf[0xff]; // Won't ever be filled. - *received = socket->Receive(buf, arraysize(buf)); + *received = socket->Receive(buf, length); } // Tests that we can safely end a blocking Receive operation on one thread @@ -223,14 +224,15 @@ TEST_F(SyncSocketTest, DisconnectTest) { worker.Start(); // Try to do a blocking read from one of the sockets on the worker thread. + char buf[0xff]; size_t received = 1U; // Initialize to an unexpected value. worker.message_loop()->PostTask(FROM_HERE, - base::Bind(&BlockingRead, &pair[0], &received)); + base::Bind(&BlockingRead, &pair[0], &buf[0], arraysize(buf), &received)); // Wait for the worker thread to say hello. char hello[kHelloStringLength] = {0}; pair[1].Receive(&hello[0], sizeof(hello)); - VLOG(1) << "Received: " << hello; + EXPECT_EQ(0, strcmp(hello, kHelloString)); // Give the worker a chance to start Receive(). base::PlatformThread::YieldCurrentThread(); @@ -242,3 +244,66 @@ TEST_F(SyncSocketTest, DisconnectTest) { EXPECT_EQ(0U, received); } + +// Tests that read is a blocking operation. +TEST_F(SyncSocketTest, BlockingReceiveTest) { + base::CancelableSyncSocket pair[2]; + ASSERT_TRUE(base::CancelableSyncSocket::CreatePair(&pair[0], &pair[1])); + + base::Thread worker("BlockingThread"); + worker.Start(); + + // Try to do a blocking read from one of the sockets on the worker thread. + char buf[kHelloStringLength] = {0}; + size_t received = 1U; // Initialize to an unexpected value. + worker.message_loop()->PostTask(FROM_HERE, + base::Bind(&BlockingRead, &pair[0], &buf[0], + kHelloStringLength, &received)); + + // Wait for the worker thread to say hello. + char hello[kHelloStringLength] = {0}; + pair[1].Receive(&hello[0], sizeof(hello)); + EXPECT_EQ(0, strcmp(hello, kHelloString)); + // Give the worker a chance to start Receive(). + base::PlatformThread::YieldCurrentThread(); + + // The socket on the blocking thread is currently blocked on Receive() and + // has got nothing. + EXPECT_EQ(1U, received); + + // Send a message to the socket on the blocking thead, it should free the + // socket from Receive(). + pair[1].Send(kHelloString, kHelloStringLength); + worker.Stop(); + + // Verify the socket has received the message. + EXPECT_TRUE(strcmp(buf, kHelloString) == 0); + EXPECT_EQ(kHelloStringLength, received); +} + +// Tests that the write operation is non-blocking and returns immediately +// when there is insufficient space in the socket's buffer. +TEST_F(SyncSocketTest, NonBlockingWriteTest) { + base::CancelableSyncSocket pair[2]; + ASSERT_TRUE(base::CancelableSyncSocket::CreatePair(&pair[0], &pair[1])); + + // Fill up the buffer for one of the socket, Send() should not block the + // thread even when the buffer is full. + while (pair[0].Send(kHelloString, kHelloStringLength) != 0) {} + + // Data should be avialble on another socket. + size_t bytes_in_buffer = pair[1].Peek(); + EXPECT_NE(bytes_in_buffer, 0U); + + // No more data can be written to the buffer since socket has been full, + // verify that the amount of avialble data on another socket is unchanged. + EXPECT_EQ(0U, pair[0].Send(kHelloString, kHelloStringLength)); + EXPECT_EQ(bytes_in_buffer, pair[1].Peek()); + + // Read from another socket to free some space for a new write. + char hello[kHelloStringLength] = {0}; + pair[1].Receive(&hello[0], sizeof(hello)); + + // Should be able to write more data to the buffer now. + EXPECT_EQ(kHelloStringLength, pair[0].Send(kHelloString, kHelloStringLength)); +} |