From 1e9499c21c23b52391d952572bd9059df532efcb Mon Sep 17 00:00:00 2001 From: "jabdelmalek@google.com" Date: Tue, 6 Apr 2010 20:33:36 +0000 Subject: Allow synchronous messages to be sent from threads other than the main thread. This simplifies code that needs to do this (i.e. webkit db and file threads). BUG=23423 Review URL: http://codereview.chromium.org/1601005 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@43752 0039d316-1c4b-4281-b951-d872f2087c98 --- .../renderer_host/database_dispatcher_host.cc | 77 +++++++------ .../renderer_host/database_dispatcher_host.h | 16 +-- chrome/common/child_thread.cc | 9 +- chrome/common/child_thread.h | 11 ++ chrome/common/database_util.cc | 78 ++++++------- chrome/common/db_message_filter.cc | 55 +-------- chrome/common/db_message_filter.h | 124 +------------------- chrome/common/render_messages.h | 34 ------ chrome/common/render_messages_internal.h | 46 +++----- chrome/renderer/renderer_webkitclient_impl.cc | 1 - chrome/worker/worker_webkitclient_impl.cc | 1 - ipc/ipc.gypi | 2 + ipc/ipc_sync_channel.h | 21 +--- ipc/ipc_sync_channel_unittest.cc | 65 ++++++++++- ipc/ipc_sync_message.cc | 10 +- ipc/ipc_sync_message.h | 17 ++- ipc/ipc_sync_message_filter.cc | 126 +++++++++++++++++++++ ipc/ipc_sync_message_filter.h | 79 +++++++++++++ 18 files changed, 425 insertions(+), 347 deletions(-) create mode 100644 ipc/ipc_sync_message_filter.cc create mode 100644 ipc/ipc_sync_message_filter.h diff --git a/chrome/browser/renderer_host/database_dispatcher_host.cc b/chrome/browser/renderer_host/database_dispatcher_host.cc index f649dfb..201b5c1 100644 --- a/chrome/browser/renderer_host/database_dispatcher_host.cc +++ b/chrome/browser/renderer_host/database_dispatcher_host.cc @@ -90,12 +90,14 @@ bool DatabaseDispatcherHost::OnMessageReceived( *message_was_ok = true; bool handled = true; IPC_BEGIN_MESSAGE_MAP_EX(DatabaseDispatcherHost, message, *message_was_ok) - IPC_MESSAGE_HANDLER(ViewHostMsg_DatabaseOpenFile, OnDatabaseOpenFile) - IPC_MESSAGE_HANDLER(ViewHostMsg_DatabaseDeleteFile, OnDatabaseDeleteFile) - IPC_MESSAGE_HANDLER(ViewHostMsg_DatabaseGetFileAttributes, - OnDatabaseGetFileAttributes) - IPC_MESSAGE_HANDLER(ViewHostMsg_DatabaseGetFileSize, - OnDatabaseGetFileSize) + IPC_MESSAGE_HANDLER_DELAY_REPLY(ViewHostMsg_DatabaseOpenFile, + OnDatabaseOpenFile) + IPC_MESSAGE_HANDLER_DELAY_REPLY(ViewHostMsg_DatabaseDeleteFile, + OnDatabaseDeleteFile) + IPC_MESSAGE_HANDLER_DELAY_REPLY(ViewHostMsg_DatabaseGetFileAttributes, + OnDatabaseGetFileAttributes) + IPC_MESSAGE_HANDLER_DELAY_REPLY(ViewHostMsg_DatabaseGetFileSize, + OnDatabaseGetFileSize) IPC_MESSAGE_HANDLER(ViewHostMsg_DatabaseOpened, OnDatabaseOpened) IPC_MESSAGE_HANDLER(ViewHostMsg_DatabaseModified, OnDatabaseModified) IPC_MESSAGE_HANDLER(ViewHostMsg_DatabaseClosed, OnDatabaseClosed) @@ -129,7 +131,7 @@ void DatabaseDispatcherHost::Send(IPC::Message* message) { void DatabaseDispatcherHost::OnDatabaseOpenFile(const string16& vfs_file_name, int desired_flags, - int32 message_id) { + IPC::Message* reply_msg) { if (!observer_added_) { observer_added_ = true; ChromeThread::PostTask( @@ -143,19 +145,7 @@ void DatabaseDispatcherHost::OnDatabaseOpenFile(const string16& vfs_file_name, &DatabaseDispatcherHost::DatabaseOpenFile, vfs_file_name, desired_flags, - message_id)); -} - -static void SetOpenFileResponseParams( - ViewMsg_DatabaseOpenFileResponse_Params* params, - base::PlatformFile file_handle, - base::PlatformFile dir_handle) { -#if defined(OS_WIN) - params->file_handle = file_handle; -#elif defined(OS_POSIX) - params->file_handle = base::FileDescriptor(file_handle, true); - params->dir_handle = base::FileDescriptor(dir_handle, true); -#endif + reply_msg)); } // Scheduled by the IO thread on the file thread. @@ -164,7 +154,7 @@ static void SetOpenFileResponseParams( // corresponding renderer process with the file handle. void DatabaseDispatcherHost::DatabaseOpenFile(const string16& vfs_file_name, int desired_flags, - int32 message_id) { + IPC::Message* reply_msg) { DCHECK(ChromeThread::CurrentlyOn(ChromeThread::FILE)); base::PlatformFile target_handle = base::kInvalidPlatformFileValue; base::PlatformFile target_dir_handle = base::kInvalidPlatformFileValue; @@ -186,21 +176,28 @@ void DatabaseDispatcherHost::DatabaseOpenFile(const string16& vfs_file_name, } } - ViewMsg_DatabaseOpenFileResponse_Params response_params; - SetOpenFileResponseParams(&response_params, target_handle, target_dir_handle); - Send(new ViewMsg_DatabaseOpenFileResponse(message_id, response_params)); + ViewHostMsg_DatabaseOpenFile::WriteReplyParams( + reply_msg, +#if defined(OS_WIN) + target_handle +#elif defined(OS_POSIX) + base::FileDescriptor(target_handle, true), + base::FileDescriptor(target_dir_handle, true) +#endif + ); + Send(reply_msg); } void DatabaseDispatcherHost::OnDatabaseDeleteFile(const string16& vfs_file_name, const bool& sync_dir, - int32 message_id) { + IPC::Message* reply_msg) { ChromeThread::PostTask( ChromeThread::FILE, FROM_HERE, NewRunnableMethod(this, &DatabaseDispatcherHost::DatabaseDeleteFile, vfs_file_name, sync_dir, - message_id, + reply_msg, kNumDeleteRetries)); } @@ -210,7 +207,7 @@ void DatabaseDispatcherHost::OnDatabaseDeleteFile(const string16& vfs_file_name, // corresponding renderer process with the error code. void DatabaseDispatcherHost::DatabaseDeleteFile(const string16& vfs_file_name, bool sync_dir, - int32 message_id, + IPC::Message* reply_msg, int reschedule_count) { DCHECK(ChromeThread::CurrentlyOn(ChromeThread::FILE)); @@ -229,25 +226,26 @@ void DatabaseDispatcherHost::DatabaseDeleteFile(const string16& vfs_file_name, &DatabaseDispatcherHost::DatabaseDeleteFile, vfs_file_name, sync_dir, - message_id, + reply_msg, reschedule_count - 1), kDelayDeleteRetryMs); return; } } - Send(new ViewMsg_DatabaseDeleteFileResponse(message_id, error_code)); + ViewHostMsg_DatabaseDeleteFile::WriteReplyParams(reply_msg, error_code); + Send(reply_msg); } void DatabaseDispatcherHost::OnDatabaseGetFileAttributes( const string16& vfs_file_name, - int32 message_id) { + IPC::Message* reply_msg) { ChromeThread::PostTask( ChromeThread::FILE, FROM_HERE, NewRunnableMethod(this, &DatabaseDispatcherHost::DatabaseGetFileAttributes, vfs_file_name, - message_id)); + reply_msg)); } // Scheduled by the IO thread on the file thread. @@ -256,24 +254,27 @@ void DatabaseDispatcherHost::OnDatabaseGetFileAttributes( // corresponding renderer process. void DatabaseDispatcherHost::DatabaseGetFileAttributes( const string16& vfs_file_name, - int32 message_id) { + IPC::Message* reply_msg) { DCHECK(ChromeThread::CurrentlyOn(ChromeThread::FILE)); int32 attributes = -1; FilePath db_file = DatabaseUtil::GetFullFilePathForVfsFile(db_tracker_, vfs_file_name); if (!db_file.empty()) attributes = VfsBackend::GetFileAttributes(db_file); - Send(new ViewMsg_DatabaseGetFileAttributesResponse(message_id, attributes)); + + ViewHostMsg_DatabaseGetFileAttributes::WriteReplyParams( + reply_msg, attributes); + Send(reply_msg); } void DatabaseDispatcherHost::OnDatabaseGetFileSize( - const string16& vfs_file_name, int32 message_id) { + const string16& vfs_file_name, IPC::Message* reply_msg) { ChromeThread::PostTask( ChromeThread::FILE, FROM_HERE, NewRunnableMethod(this, &DatabaseDispatcherHost::DatabaseGetFileSize, vfs_file_name, - message_id)); + reply_msg)); } // Scheduled by the IO thread on the file thread. @@ -281,14 +282,16 @@ void DatabaseDispatcherHost::OnDatabaseGetFileSize( // on the IO thread's message loop to send an IPC back to // the corresponding renderer process. void DatabaseDispatcherHost::DatabaseGetFileSize(const string16& vfs_file_name, - int32 message_id) { + IPC::Message* reply_msg) { DCHECK(ChromeThread::CurrentlyOn(ChromeThread::FILE)); int64 size = 0; FilePath db_file = DatabaseUtil::GetFullFilePathForVfsFile(db_tracker_, vfs_file_name); if (!db_file.empty()) size = VfsBackend::GetFileSize(db_file); - Send(new ViewMsg_DatabaseGetFileSizeResponse(message_id, size)); + + ViewHostMsg_DatabaseGetFileSize::WriteReplyParams(reply_msg, size); + Send(reply_msg); } void DatabaseDispatcherHost::OnDatabaseOpened(const string16& origin_identifier, diff --git a/chrome/browser/renderer_host/database_dispatcher_host.h b/chrome/browser/renderer_host/database_dispatcher_host.h index 748e538..023e6d1 100644 --- a/chrome/browser/renderer_host/database_dispatcher_host.h +++ b/chrome/browser/renderer_host/database_dispatcher_host.h @@ -34,14 +34,14 @@ class DatabaseDispatcherHost // VFS message handlers (IO thread) void OnDatabaseOpenFile(const string16& vfs_file_name, int desired_flags, - int32 message_id); + IPC::Message* reply_msg); void OnDatabaseDeleteFile(const string16& vfs_file_name, const bool& sync_dir, - int32 message_id); + IPC::Message* reply_msg); void OnDatabaseGetFileAttributes(const string16& vfs_file_name, - int32 message_id); + IPC::Message* reply_msg); void OnDatabaseGetFileSize(const string16& vfs_file_name, - int32 message_id); + IPC::Message* reply_msg); // Database tracker message handlers (IO thread) void OnDatabaseOpened(const string16& origin_identifier, @@ -83,15 +83,15 @@ class DatabaseDispatcherHost // VFS message handlers (file thread) void DatabaseOpenFile(const string16& vfs_file_name, int desired_flags, - int32 message_id); + IPC::Message* reply_msg); void DatabaseDeleteFile(const string16& vfs_file_name, bool sync_dir, - int32 message_id, + IPC::Message* reply_msg, int reschedule_count); void DatabaseGetFileAttributes(const string16& vfs_file_name, - int32 message_id); + IPC::Message* reply_msg); void DatabaseGetFileSize(const string16& vfs_file_name, - int32 message_id); + IPC::Message* reply_msg); // Database tracker message handlers (file thread) void DatabaseOpened(const string16& origin_identifier, diff --git a/chrome/common/child_thread.cc b/chrome/common/child_thread.cc index 877a3af..91d276e 100644 --- a/chrome/common/child_thread.cc +++ b/chrome/common/child_thread.cc @@ -13,6 +13,7 @@ #include "chrome/common/socket_stream_dispatcher.h" #include "ipc/ipc_logging.h" #include "ipc/ipc_message.h" +#include "ipc/ipc_sync_message_filter.h" #include "ipc/ipc_switches.h" #include "webkit/glue/webkit_glue.h" @@ -49,6 +50,10 @@ void ChildThread::Init() { resource_dispatcher_.reset(new ResourceDispatcher(this)); socket_stream_dispatcher_.reset(new SocketStreamDispatcher()); + sync_message_filter_ = + new IPC::SyncMessageFilter(ChildProcess::current()->GetShutDownEvent()); + channel_->AddFilter(sync_message_filter_.get()); + // When running in unit tests, there is already a NotificationService object. // Since only one can exist at a time per thread, check first. if (!NotificationService::current()) @@ -60,6 +65,8 @@ ChildThread::~ChildThread() { IPC::Logging::current()->SetIPCSender(NULL); #endif + channel_->RemoveFilter(sync_message_filter_.get()); + // The ChannelProxy object caches a pointer to the IPC thread, so need to // reset it as it's not guaranteed to outlive this object. // NOTE: this also has the side-effect of not closing the main IPC channel to @@ -126,7 +133,7 @@ void ChildThread::OnMessageReceived(const IPC::Message& msg) { #if defined(IPC_MESSAGE_LOG_ENABLED) IPC_MESSAGE_HANDLER(PluginProcessMsg_SetIPCLoggingEnabled, OnSetIPCLoggingEnabled) -#endif // IPC_MESSAGE_HANDLER +#endif IPC_MESSAGE_UNHANDLED(handled = false) IPC_END_MESSAGE_MAP() diff --git a/chrome/common/child_thread.h b/chrome/common/child_thread.h index ee05e11..72af5aa 100644 --- a/chrome/common/child_thread.h +++ b/chrome/common/child_thread.h @@ -15,6 +15,10 @@ class NotificationService; class SocketStreamDispatcher; +namespace IPC { +class SyncMessageFilter; +} + // The main thread of a child process derives from this class. class ChildThread : public IPC::Channel::Listener, public IPC::Message::Sender { @@ -49,6 +53,10 @@ class ChildThread : public IPC::Channel::Listener, return socket_stream_dispatcher_.get(); } + // Safe to call on any thread, as long as it's guaranteed that the thread's + // lifetime is less than the main thread. + IPC::SyncMessageFilter* sync_message_filter() { return sync_message_filter_; } + MessageLoop* message_loop() { return message_loop_; } // Returns the one child thread. @@ -84,6 +92,9 @@ class ChildThread : public IPC::Channel::Listener, std::string channel_name_; scoped_ptr channel_; + // Allows threads other than the main thread to send sync messages. + scoped_refptr sync_message_filter_; + // Implements message routing functionality to the consumers of ChildThread. MessageRouter router_; diff --git a/chrome/common/database_util.cc b/chrome/common/database_util.cc index 7a54724..f149f08 100644 --- a/chrome/common/database_util.cc +++ b/chrome/common/database_util.cc @@ -10,8 +10,9 @@ #include "third_party/sqlite/preprocessed/sqlite3.h" #endif -#include "chrome/common/db_message_filter.h" +#include "chrome/common/child_thread.h" #include "chrome/common/render_messages.h" +#include "ipc/ipc_sync_message_filter.h" #include "third_party/WebKit/WebKit/chromium/public/WebString.h" using WebKit::WebKitClient; @@ -20,58 +21,59 @@ using WebKit::WebString; WebKitClient::FileHandle DatabaseUtil::databaseOpenFile( const WebString& vfs_file_name, int desired_flags, WebKitClient::FileHandle* dir_handle) { - DBMessageFilter* db_message_filter = DBMessageFilter::GetInstance(); - int message_id = db_message_filter->GetUniqueID(); - + IPC::PlatformFileForTransit file_handle; #if defined(OS_WIN) - ViewMsg_DatabaseOpenFileResponse_Params default_response = - { base::kInvalidPlatformFileValue }; + file_handle = base::kInvalidPlatformFileValue; #elif defined(OS_POSIX) - ViewMsg_DatabaseOpenFileResponse_Params default_response = - { base::FileDescriptor(base::kInvalidPlatformFileValue, true), - base::FileDescriptor(base::kInvalidPlatformFileValue, true) }; + file_handle = + base::FileDescriptor(base::kInvalidPlatformFileValue, true); + base::FileDescriptor dir_handle_rv = + base::FileDescriptor(base::kInvalidPlatformFileValue, true); #endif - ViewMsg_DatabaseOpenFileResponse_Params result = - db_message_filter->SendAndWait( - new ViewHostMsg_DatabaseOpenFile( - vfs_file_name, desired_flags, message_id), - message_id, default_response); + scoped_refptr filter = + ChildThread::current()->sync_message_filter(); + + filter->Send(new ViewHostMsg_DatabaseOpenFile( + vfs_file_name, + desired_flags, + &file_handle +#if defined(OS_POSIX) + , &dir_handle_rv +#endif + )); #if defined(OS_WIN) - if (dir_handle) - *dir_handle = base::kInvalidPlatformFileValue; - return result.file_handle; + return file_handle; #elif defined(OS_POSIX) if (dir_handle) - *dir_handle = result.dir_handle.fd; - return result.file_handle.fd; + *dir_handle = dir_handle_rv.fd; + return file_handle.fd; #endif } int DatabaseUtil::databaseDeleteFile( const WebString& vfs_file_name, bool sync_dir) { - DBMessageFilter* db_message_filter = DBMessageFilter::GetInstance(); - int message_id = db_message_filter->GetUniqueID(); - return db_message_filter->SendAndWait( - new ViewHostMsg_DatabaseDeleteFile(vfs_file_name, sync_dir, message_id), - message_id, SQLITE_IOERR_DELETE); + int rv = SQLITE_IOERR_DELETE; + scoped_refptr filter = + ChildThread::current()->sync_message_filter(); + filter->Send(new ViewHostMsg_DatabaseDeleteFile( + vfs_file_name, sync_dir, &rv)); + return rv; } -long DatabaseUtil::databaseGetFileAttributes( - const WebString& vfs_file_name) { - DBMessageFilter* db_message_filter = DBMessageFilter::GetInstance(); - int message_id = db_message_filter->GetUniqueID(); - return db_message_filter->SendAndWait( - new ViewHostMsg_DatabaseGetFileAttributes(vfs_file_name, message_id), - message_id, -1L); +long DatabaseUtil::databaseGetFileAttributes(const WebString& vfs_file_name) { + int32 rv = -1; + scoped_refptr filter = + ChildThread::current()->sync_message_filter(); + filter->Send(new ViewHostMsg_DatabaseGetFileAttributes(vfs_file_name, &rv)); + return rv; } -long long DatabaseUtil::databaseGetFileSize( - const WebString& vfs_file_name) { - DBMessageFilter* db_message_filter = DBMessageFilter::GetInstance(); - int message_id = db_message_filter->GetUniqueID(); - return db_message_filter->SendAndWait( - new ViewHostMsg_DatabaseGetFileSize(vfs_file_name, message_id), - message_id, 0LL); +long long DatabaseUtil::databaseGetFileSize(const WebString& vfs_file_name) { + int64 rv = 0LL; + scoped_refptr filter = + ChildThread::current()->sync_message_filter(); + filter->Send(new ViewHostMsg_DatabaseGetFileSize(vfs_file_name, &rv)); + return rv; } diff --git a/chrome/common/db_message_filter.cc b/chrome/common/db_message_filter.cc index 39c4120..8e3bbc6 100644 --- a/chrome/common/db_message_filter.cc +++ b/chrome/common/db_message_filter.cc @@ -1,69 +1,18 @@ -// Copyright (c) 2009 The Chromium Authors. All rights reserved. +// Copyright (c) 2010 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include "chrome/common/db_message_filter.h" -#include "chrome/common/child_process.h" #include "chrome/common/render_messages.h" #include "third_party/WebKit/WebKit/chromium/public/WebDatabase.h" -DBMessageFilter* DBMessageFilter::instance_ = NULL; - -DBMessageFilter::DBMessageFilter() - : io_thread_message_loop_(ChildProcess::current()->io_message_loop()), - channel_(NULL), - shutdown_event_(ChildProcess::current()->GetShutDownEvent()), - messages_awaiting_replies_(new IDMap()), - unique_id_generator_(new base::AtomicSequenceNumber()) { - DCHECK(!instance_); - instance_ = this; -} - -int DBMessageFilter::GetUniqueID() { - return unique_id_generator_->GetNext(); -} - -static void SendMessageOnIOThread(IPC::Message* message, - IPC::Channel* channel, - Lock* channel_lock) { - AutoLock channel_auto_lock(*channel_lock); - if (channel) - channel->Send(message); - else - delete message; -} - -void DBMessageFilter::Send(IPC::Message* message) { - io_thread_message_loop_->PostTask(FROM_HERE, - NewRunnableFunction(SendMessageOnIOThread, message, channel_, - &channel_lock_)); -} - -void DBMessageFilter::OnFilterAdded(IPC::Channel* channel) { - AutoLock channel_auto_lock(channel_lock_); - channel_ = channel; -} - -void DBMessageFilter::OnChannelError() { - AutoLock channel_auto_lock(channel_lock_); - channel_ = NULL; -} - -void DBMessageFilter::OnChannelClosing() { - AutoLock channel_auto_lock(channel_lock_); - channel_ = NULL; +DBMessageFilter::DBMessageFilter() { } bool DBMessageFilter::OnMessageReceived(const IPC::Message& message) { bool handled = true; IPC_BEGIN_MESSAGE_MAP(DBMessageFilter, message) - IPC_MESSAGE_HANDLER(ViewMsg_DatabaseOpenFileResponse, - OnResponse) - IPC_MESSAGE_HANDLER(ViewMsg_DatabaseDeleteFileResponse, OnResponse) - IPC_MESSAGE_HANDLER(ViewMsg_DatabaseGetFileAttributesResponse, - OnResponse) - IPC_MESSAGE_HANDLER(ViewMsg_DatabaseGetFileSizeResponse, OnResponse) IPC_MESSAGE_HANDLER(ViewMsg_DatabaseUpdateSize, OnDatabaseUpdateSize) IPC_MESSAGE_HANDLER(ViewMsg_DatabaseCloseImmediately, OnDatabaseCloseImmediately) diff --git a/chrome/common/db_message_filter.h b/chrome/common/db_message_filter.h index 41967a0..1cb07d8 100644 --- a/chrome/common/db_message_filter.h +++ b/chrome/common/db_message_filter.h @@ -1,143 +1,27 @@ -// Copyright (c) 2009 The Chromium Authors. All rights reserved. +// Copyright (c) 2010 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #ifndef CHROME_COMMON_DB_MESSAGE_FILTER_H_ #define CHROME_COMMON_DB_MESSAGE_FILTER_H_ -#include "base/atomic_sequence_num.h" -#include "base/id_map.h" -#include "base/lock.h" -#include "base/scoped_ptr.h" -#include "base/waitable_event.h" #include "ipc/ipc_channel_proxy.h" -class Lock; -class MessageLoop; - -namespace IPC { -class Channel; -} - -// A thread-safe message filter used to send IPCs from DB threads and process -// replies from the browser process. -// -// This class should not be instantianted anywhere but RenderThread::Init(). It -// is meant to be a singleton in each renderer process. To access the singleton, -// use GetInstance(). +// Receives database messages from the browser process and processes them on the +// IO thread. class DBMessageFilter : public IPC::ChannelProxy::MessageFilter { public: - // Returns the DBMessageFilter singleton created in this renderer process. - static DBMessageFilter* GetInstance() { return instance_; } - - // Creates a new DBMessageFilter instance. DBMessageFilter(); - // Returns a unique ID for use when calling the SendAndWait() method. - virtual int GetUniqueID(); - - // Posts a task to the IO thread to send |message| to the browser. - virtual void Send(IPC::Message* message); - - // Sends |message| and blocks the current thread. Returns the result from the - // reply message, or |default_result| if the renderer process is being - // destroyed or the message could not be sent. - template - ResultType SendAndWait(IPC::Message* message, - int message_id, - ResultType default_result) { - ResultType result = default_result; - base::WaitableEvent waitable_event(false, false); - DBMessageState state = - { reinterpret_cast(&result), &waitable_event }; - { - AutoLock msgs_awaiting_replies_autolock(messages_awaiting_replies_lock_); - messages_awaiting_replies_->AddWithID(&state, message_id); - } - - Send(message); - - base::WaitableEvent* events[2] = { shutdown_event_, &waitable_event }; - base::WaitableEvent::WaitMany(events, 2); - - // Locking on messages_awaiting_replies_ guarantees that either the IO - // thread won't enter OnResponse(), or if it's already in OnResponse(), - // then WaitableEvent::Signal() will get a chance to do all its work - // before waitable_event is deleted. - AutoLock msgs_awaiting_replies_autolock(messages_awaiting_replies_lock_); - messages_awaiting_replies_->Remove(message_id); - return result; - } - - // Processes incoming message |message| from the browser process. - virtual bool OnMessageReceived(const IPC::Message& message); - private: - // The state we store for each message we send. - struct DBMessageState { - intptr_t result_address_; - base::WaitableEvent* waitable_event_; - }; - - // This is a RefCounted class, do not allow anybody to destroy it directly. - virtual ~DBMessageFilter() { instance_ = NULL; } - - // Invoked when this filter is added to |channel|. - virtual void OnFilterAdded(IPC::Channel* channel); - - // Called when the channel encounters a problem. The filter should clean up - // its internal data and not accept any more messages. - virtual void OnChannelError(); - - // Called when the channel is closing. The filter should clean up its internal - // and not accept any more messages. - virtual void OnChannelClosing(); - - // Processes the reply to a sync DB request. - template - void OnResponse(int32 message_id, ResultType result) { - AutoLock msgs_awaiting_replies_autolock(messages_awaiting_replies_lock_); - DBMessageState *state = messages_awaiting_replies_->Lookup(message_id); - if (state) { - *reinterpret_cast(state->result_address_) = result; - state->waitable_event_->Signal(); - } - } + virtual bool OnMessageReceived(const IPC::Message& message); - // Processes IPCs that indicate a change in the size of a DB file. void OnDatabaseUpdateSize(const string16& origin_identifier, const string16& database_name, int64 database_size, int64 space_available); - - // Processes IPCs that ask for a DB to be closed immediately. void OnDatabaseCloseImmediately(const string16& origin_identifier, const string16& database_name); - - // The message loop for the IO thread. - MessageLoop* io_thread_message_loop_; - - // The channel to which this filter was added. - IPC::Channel* channel_; - - // A lock around the channel. - Lock channel_lock_; - - // The shutdown event. - base::WaitableEvent* shutdown_event_; - - // The list of messages awaiting replies. For each such message we store a - // DBMessageState instance. - scoped_ptr > messages_awaiting_replies_; - - // The lock for 'messages_awaiting_replies_'. - Lock messages_awaiting_replies_lock_; - - // A thread-safe unique number generator. - scoped_ptr unique_id_generator_; - - // The singleton. - static DBMessageFilter* instance_; }; #endif // CHROME_COMMON_DB_MESSAGE_FILTER_H_ diff --git a/chrome/common/render_messages.h b/chrome/common/render_messages.h index a2db04f..211297a 100644 --- a/chrome/common/render_messages.h +++ b/chrome/common/render_messages.h @@ -422,13 +422,6 @@ struct ViewMsg_PrintPages_Params { std::vector pages; }; -struct ViewMsg_DatabaseOpenFileResponse_Params { - IPC::PlatformFileForTransit file_handle; // DB file handle -#if defined(OS_POSIX) - base::FileDescriptor dir_handle; // DB directory handle -#endif -}; - // Parameters to describe a rendered page. struct ViewHostMsg_DidPrintPage_Params { // A shared memory handle to the EMF data. This data can be quite large so a @@ -2038,33 +2031,6 @@ struct ParamTraits { }; template <> -struct ParamTraits { - typedef ViewMsg_DatabaseOpenFileResponse_Params param_type; - static void Write(Message* m, const param_type& p) { - WriteParam(m, p.file_handle); -#if defined(OS_POSIX) - WriteParam(m, p.dir_handle); -#endif - } - static bool Read(const Message* m, void** iter, param_type* p) { - bool ret = ReadParam(m, iter, &p->file_handle); -#if defined(OS_POSIX) - ret = ret && ReadParam(m, iter, &p->dir_handle); -#endif - return ret; - } - static void Log(const param_type& p, std::wstring* l) { - l->append(L"("); - LogParam(p.file_handle, l); -#if defined(OS_POSIX) - l->append(L", "); - LogParam(p.dir_handle, l); -#endif - l->append(L")"); - } -}; - -template <> struct ParamTraits { typedef appcache::Status param_type; static void Write(Message* m, const param_type& p) { diff --git a/chrome/common/render_messages_internal.h b/chrome/common/render_messages_internal.h index 5730588..b1222f1 100644 --- a/chrome/common/render_messages_internal.h +++ b/chrome/common/render_messages_internal.h @@ -821,26 +821,6 @@ IPC_BEGIN_MESSAGES(View) IPC_MESSAGE_ROUTED1(ViewMsg_ExecuteCode, ViewMsg_ExecuteCode_Params) - // Returns a file handle - IPC_MESSAGE_CONTROL2(ViewMsg_DatabaseOpenFileResponse, - int32 /* the ID of the message we're replying to */, - ViewMsg_DatabaseOpenFileResponse_Params) - - // Returns a SQLite error code - IPC_MESSAGE_CONTROL2(ViewMsg_DatabaseDeleteFileResponse, - int32 /* the ID of the message we're replying to */, - int /* SQLite error code */) - - // Returns the attributes of a file - IPC_MESSAGE_CONTROL2(ViewMsg_DatabaseGetFileAttributesResponse, - int32 /* the ID of the message we're replying to */, - int32 /* the attributes for the given DB file */) - - // Returns the size of a file - IPC_MESSAGE_CONTROL2(ViewMsg_DatabaseGetFileSizeResponse, - int32 /* the ID of the message we're replying to */, - int64 /* the size of the given DB file */) - // Notifies the child process of the new database size IPC_MESSAGE_CONTROL4(ViewMsg_DatabaseUpdateSize, string16 /* the origin */, @@ -2126,27 +2106,35 @@ IPC_BEGIN_MESSAGES(ViewHost) unsigned long /* estimated size */, bool /* result */) - // Asks the browser process to open a DB file with the given name - IPC_MESSAGE_CONTROL3(ViewHostMsg_DatabaseOpenFile, + // Asks the browser process to open a DB file with the given name. +#if defined (OS_WIN) + IPC_SYNC_MESSAGE_CONTROL2_1(ViewHostMsg_DatabaseOpenFile, + string16 /* vfs file name */, + int /* desired flags */, + IPC::PlatformFileForTransit /* file_handle */) +#elif defined(OS_POSIX) + IPC_SYNC_MESSAGE_CONTROL2_2(ViewHostMsg_DatabaseOpenFile, string16 /* vfs file name */, int /* desired flags */, - int32 /* a unique message ID */) + IPC::PlatformFileForTransit /* file_handle */, + base::FileDescriptor /* dir_handle */) +#endif // Asks the browser process to delete a DB file - IPC_MESSAGE_CONTROL3(ViewHostMsg_DatabaseDeleteFile, + IPC_SYNC_MESSAGE_CONTROL2_1(ViewHostMsg_DatabaseDeleteFile, string16 /* vfs file name */, bool /* whether or not to sync the directory */, - int32 /* a unique message ID */) + int /* SQLite error code */) // Asks the browser process to return the attributes of a DB file - IPC_MESSAGE_CONTROL2(ViewHostMsg_DatabaseGetFileAttributes, + IPC_SYNC_MESSAGE_CONTROL1_1(ViewHostMsg_DatabaseGetFileAttributes, string16 /* vfs file name */, - int32 /* a unique message ID */) + int32 /* the attributes for the given DB file */) // Asks the browser process to return the size of a DB file - IPC_MESSAGE_CONTROL2(ViewHostMsg_DatabaseGetFileSize, + IPC_SYNC_MESSAGE_CONTROL1_1(ViewHostMsg_DatabaseGetFileSize, string16 /* vfs file name */, - int32 /* a unique message ID */) + int64 /* the size of the given DB file */) // Notifies the browser process that a new database has been opened IPC_MESSAGE_CONTROL4(ViewHostMsg_DatabaseOpened, diff --git a/chrome/renderer/renderer_webkitclient_impl.cc b/chrome/renderer/renderer_webkitclient_impl.cc index bbe6786..70bccc9 100644 --- a/chrome/renderer/renderer_webkitclient_impl.cc +++ b/chrome/renderer/renderer_webkitclient_impl.cc @@ -16,7 +16,6 @@ #include "chrome/common/appcache/appcache_dispatcher.h" #include "chrome/common/chrome_switches.h" #include "chrome/common/database_util.h" -#include "chrome/common/db_message_filter.h" #include "chrome/common/render_messages.h" #include "chrome/common/webmessageportchannel_impl.h" #include "chrome/plugin/npobject_util.h" diff --git a/chrome/worker/worker_webkitclient_impl.cc b/chrome/worker/worker_webkitclient_impl.cc index 8f3249f..7160b4b 100644 --- a/chrome/worker/worker_webkitclient_impl.cc +++ b/chrome/worker/worker_webkitclient_impl.cc @@ -6,7 +6,6 @@ #include "base/logging.h" #include "chrome/common/database_util.h" -#include "chrome/common/db_message_filter.h" #include "chrome/common/render_messages.h" #include "chrome/common/webmessageportchannel_impl.h" #include "chrome/worker/worker_thread.h" diff --git a/ipc/ipc.gypi b/ipc/ipc.gypi index fb14f1f..52b27bf 100644 --- a/ipc/ipc.gypi +++ b/ipc/ipc.gypi @@ -36,6 +36,8 @@ 'ipc_sync_channel.h', 'ipc_sync_message.cc', 'ipc_sync_message.h', + 'ipc_sync_message_filter.cc', + 'ipc_sync_message_filter.h', ], 'include_dirs': [ '..', diff --git a/ipc/ipc_sync_channel.h b/ipc/ipc_sync_channel.h index cb88f47..e66dd92 100644 --- a/ipc/ipc_sync_channel.h +++ b/ipc/ipc_sync_channel.h @@ -13,6 +13,7 @@ #include "base/ref_counted.h" #include "base/waitable_event_watcher.h" #include "ipc/ipc_channel_proxy.h" +#include "ipc/ipc_sync_message.h" namespace base { class WaitableEvent; @@ -23,8 +24,8 @@ namespace IPC { class SyncMessage; class MessageReplyDeserializer; -// This is similar to IPC::ChannelProxy, with the added feature of supporting -// sending synchronous messages. +// This is similar to ChannelProxy, with the added feature of supporting sending +// synchronous messages. // Note that care must be taken that the lifetime of the ipc_thread argument // is more than this object. If the message loop goes away while this object // is running and it's used to send a message, then it will use the invalid @@ -63,7 +64,7 @@ class SyncChannel : public ChannelProxy, // Adds information about an outgoing sync message to the context so that // we know how to deserialize the reply. - void Push(IPC::SyncMessage* sync_msg); + void Push(SyncMessage* sync_msg); // Cleanly remove the top deserializer (and throw it away). Returns the // result of the Send call for that message. @@ -96,7 +97,7 @@ class SyncChannel : public ChannelProxy, private: ~SyncContext(); - // IPC::ChannelProxy methods that we override. + // ChannelProxy methods that we override. // Called on the listener thread. virtual void Clear(); @@ -113,18 +114,6 @@ class SyncChannel : public ChannelProxy, // WaitableEventWatcher::Delegate implementation. virtual void OnWaitableEventSignaled(base::WaitableEvent* arg); - // When sending a synchronous message, this structure contains an object - // that knows how to deserialize the response. - struct PendingSyncMsg { - PendingSyncMsg(int id, IPC::MessageReplyDeserializer* d, - base::WaitableEvent* e) : - id(id), deserializer(d), done_event(e), send_result(false) { } - int id; - IPC::MessageReplyDeserializer* deserializer; - base::WaitableEvent* done_event; - bool send_result; - }; - typedef std::deque PendingSyncMessageQueue; PendingSyncMessageQueue deserializers_; Lock deserializers_lock_; diff --git a/ipc/ipc_sync_channel_unittest.cc b/ipc/ipc_sync_channel_unittest.cc index c9a5c88..6033c21 100644 --- a/ipc/ipc_sync_channel_unittest.cc +++ b/ipc/ipc_sync_channel_unittest.cc @@ -18,6 +18,7 @@ #include "base/waitable_event.h" #include "ipc/ipc_message.h" #include "ipc/ipc_sync_channel.h" +#include "ipc/ipc_sync_message_filter.h" #include "testing/gtest/include/gtest/gtest.h" @@ -120,12 +121,14 @@ class Worker : public Channel::Listener, public Message::Sender { } Channel::Mode mode() { return mode_; } WaitableEvent* done_event() { return done_.get(); } + WaitableEvent* shutdown_event() { return &shutdown_event_; } void ResetChannel() { channel_.reset(); } - - protected: // Derived classes need to call this when they've completed their part of // the test. void Done() { done_->Signal(); } + + protected: + IPC::SyncChannel* channel() { return channel_.get(); } // Functions for dervied classes to implement if they wish. virtual void Run() { } virtual void OnAnswer(int* answer) { NOTREACHED(); } @@ -1054,3 +1057,61 @@ TEST_F(IPCSyncChannelTest, DoneEventRace) { workers.push_back(new SimpleClient()); RunTest(workers); } + +//----------------------------------------------------------------------------- + +namespace { + +class TestSyncMessageFilter : public IPC::SyncMessageFilter { + public: + TestSyncMessageFilter(base::WaitableEvent* shutdown_event, Worker* worker) + : SyncMessageFilter(shutdown_event), + worker_(worker), + thread_("helper_thread") { + base::Thread::Options options; + options.message_loop_type = MessageLoop::TYPE_DEFAULT; + thread_.StartWithOptions(options); + } + + virtual void OnFilterAdded(Channel* channel) { + SyncMessageFilter::OnFilterAdded(channel); + thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( + this, &TestSyncMessageFilter::SendMessageOnHelperThread)); + } + + void SendMessageOnHelperThread() { + int answer = 0; + bool result = Send(new SyncChannelTestMsg_AnswerToLife(&answer)); + DCHECK(result); + DCHECK_EQ(answer, 42); + + worker_->Done(); + } + + Worker* worker_; + base::Thread thread_; +}; + +class SyncMessageFilterServer : public Worker { + public: + SyncMessageFilterServer() + : Worker(Channel::MODE_SERVER, "sync_message_filter_server") { + filter_ = new TestSyncMessageFilter(shutdown_event(), this); + } + + void Run() { + channel()->AddFilter(filter_.get()); + } + + scoped_refptr filter_; +}; + +} // namespace + +// Tests basic synchronous call +TEST_F(IPCSyncChannelTest, SyncMessageFilter) { + std::vector workers; + workers.push_back(new SyncMessageFilterServer()); + workers.push_back(new SimpleClient()); + RunTest(workers); +} diff --git a/ipc/ipc_sync_message.cc b/ipc/ipc_sync_message.cc index 23f3d16..8ae65fa 100644 --- a/ipc/ipc_sync_message.cc +++ b/ipc/ipc_sync_message.cc @@ -1,4 +1,4 @@ -// Copyright (c) 2006-2008 The Chromium Authors. All rights reserved. +// Copyright (c) 2010 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. @@ -9,16 +9,18 @@ #endif #include +#include "base/atomic_sequence_num.h" #include "base/logging.h" #include "base/waitable_event.h" #include "ipc/ipc_sync_message.h" namespace IPC { -uint32 SyncMessage::next_id_ = 0; #define kSyncMessageHeaderSize 4 -base::WaitableEvent* dummy_event = new base::WaitableEvent(true, true); +static base::AtomicSequenceNumber g_next_id(base::LINKER_INITIALIZED); + +static base::WaitableEvent* dummy_event = new base::WaitableEvent(true, true); SyncMessage::SyncMessage( int32 routing_id, @@ -34,7 +36,7 @@ SyncMessage::SyncMessage( // Add synchronous message data before the message payload. SyncHeader header; - header.message_id = ++next_id_; + header.message_id = g_next_id.GetNext(); WriteSyncHeader(this, header); } diff --git a/ipc/ipc_sync_message.h b/ipc/ipc_sync_message.h index 48e9f99..ea6387a4 100644 --- a/ipc/ipc_sync_message.h +++ b/ipc/ipc_sync_message.h @@ -1,4 +1,4 @@ -// Copyright (c) 2009 The Chromium Authors. All rights reserved. +// Copyright (c) 2010 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. @@ -77,8 +77,6 @@ class SyncMessage : public Message { MessageReplyDeserializer* deserializer_; base::WaitableEvent* pump_messages_event_; - - static uint32 next_id_; // for generation of unique ids }; // Used to deserialize parameters from a reply to a synchronous message @@ -92,6 +90,19 @@ class MessageReplyDeserializer { virtual bool SerializeOutputParameters(const Message& msg, void* iter) = 0; }; +// When sending a synchronous message, this structure contains an object +// that knows how to deserialize the response. +struct PendingSyncMsg { + PendingSyncMsg(int id, + MessageReplyDeserializer* d, + base::WaitableEvent* e) + : id(id), deserializer(d), done_event(e), send_result(false) { } + int id; + MessageReplyDeserializer* deserializer; + base::WaitableEvent* done_event; + bool send_result; +}; + } // namespace IPC #endif // IPC_IPC_SYNC_MESSAGE_H_ diff --git a/ipc/ipc_sync_message_filter.cc b/ipc/ipc_sync_message_filter.cc new file mode 100644 index 0000000..db3f86e --- /dev/null +++ b/ipc/ipc_sync_message_filter.cc @@ -0,0 +1,126 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "ipc/ipc_sync_message_filter.h" + +#include "base/logging.h" +#include "base/message_loop.h" +#include "ipc/ipc_sync_message.h" + +namespace IPC { + +SyncMessageFilter::SyncMessageFilter(base::WaitableEvent* shutdown_event) + : channel_(NULL), + listener_loop_(MessageLoop::current()), + io_loop_(NULL), + shutdown_event_(shutdown_event) { +} + +SyncMessageFilter::~SyncMessageFilter() { +} + +bool SyncMessageFilter::Send(Message* message) { + { + AutoLock auto_lock(lock_); + if (!io_loop_) { + delete message; + return false; + } + } + + if (!message->is_sync()) { + io_loop_->PostTask( + FROM_HERE, + NewRunnableMethod(this, &SyncMessageFilter::SendOnIOThread, message)); + return true; + } + + base::WaitableEvent done_event(true, false); + PendingSyncMsg pending_message( + SyncMessage::GetMessageId(*message), + reinterpret_cast(message)->GetReplyDeserializer(), + &done_event); + + { + AutoLock auto_lock(lock_); + // Can't use this class on the main thread or else it can lead to deadlocks. + // Also by definition, can't use this on IO thread since we're blocking it. + DCHECK(MessageLoop::current() != listener_loop_); + DCHECK(MessageLoop::current() != io_loop_); + pending_sync_messages_[MessageLoop::current()] = &pending_message; + } + + io_loop_->PostTask( + FROM_HERE, + NewRunnableMethod(this, &SyncMessageFilter::SendOnIOThread, message)); + + base::WaitableEvent* events[2] = { shutdown_event_, &done_event }; + base::WaitableEvent::WaitMany(events, 2); + + { + AutoLock auto_lock(lock_); + delete pending_message.deserializer; + pending_sync_messages_.erase(MessageLoop::current()); + } + + return pending_message.send_result; +} + +void SyncMessageFilter::SendOnIOThread(Message* message) { + if (channel_) { + channel_->Send(message); + return; + } + + if (message->is_sync()) { + // We don't know which thread sent it, but it doesn't matter, just signal + // them all. + SignalAllEvents(); + } + + delete message; +} + +void SyncMessageFilter::SignalAllEvents() { + AutoLock auto_lock(lock_); + for (PendingSyncMessages::iterator iter = pending_sync_messages_.begin(); + iter != pending_sync_messages_.end(); ++iter) { + iter->second->done_event->Signal(); + } +} + +void SyncMessageFilter::OnFilterAdded(Channel* channel) { + channel_ = channel; + AutoLock auto_lock(lock_); + io_loop_ = MessageLoop::current(); +} + +void SyncMessageFilter::OnChannelError() { + channel_ = NULL; + SignalAllEvents(); +} + +void SyncMessageFilter::OnChannelClosing() { + channel_ = NULL; + SignalAllEvents(); +} + +bool SyncMessageFilter::OnMessageReceived(const Message& message) { + AutoLock auto_lock(lock_); + for (PendingSyncMessages::iterator iter = pending_sync_messages_.begin(); + iter != pending_sync_messages_.end(); ++iter) { + if (SyncMessage::IsMessageReplyTo(message, iter->second->id)) { + if (!message.is_reply_error()) { + iter->second->send_result = + iter->second->deserializer->SerializeOutputParameters(message); + } + iter->second->done_event->Signal(); + return true; + } + } + + return false; +} + +} // namespace IPC diff --git a/ipc/ipc_sync_message_filter.h b/ipc/ipc_sync_message_filter.h new file mode 100644 index 0000000..71d24c1 --- /dev/null +++ b/ipc/ipc_sync_message_filter.h @@ -0,0 +1,79 @@ +// Copyright (c) 2010 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef IPC_IPC_SYNC_MESSAGE_FILTER_H_ +#define IPC_IPC_SYNC_MESSAGE_FILTER_H_ + +#include "base/basictypes.h" +#include "base/hash_tables.h" +#include "base/lock.h" +#include "base/ref_counted.h" +#include "base/waitable_event.h" +#include "ipc/ipc_channel_proxy.h" +#include "ipc/ipc_sync_message.h" + +class MessageLoop; + +#if defined(COMPILER_GCC) +// Allows us to use MessageLoop in a hash_map with gcc (MSVC is okay without +// specifying this). +namespace __gnu_cxx { +template<> +struct hash { + size_t operator()(MessageLoop* message_loop) const { + return reinterpret_cast(message_loop); + } +}; +} +#endif + +namespace IPC { + +class MessageReplyDeserializer; + +// This MessageFilter allows sending synchronous IPC messages from a thread +// other than the listener thread associated with the SyncChannel. It does not +// support fancy features that SyncChannel does, such as handling recursion or +// receiving messages while waiting for a response. Note that this object can +// be used to send simultaneous synchronous messages from different threads. +class SyncMessageFilter : public ChannelProxy::MessageFilter, + public Message::Sender { + public: + explicit SyncMessageFilter(base::WaitableEvent* shutdown_event); + virtual ~SyncMessageFilter(); + + // Message::Sender implementation. + virtual bool Send(Message* message); + + // ChannelProxy::MessageFilter implementation. + virtual void OnFilterAdded(Channel* channel); + virtual void OnChannelError(); + virtual void OnChannelClosing(); + virtual bool OnMessageReceived(const Message& message); + + private: + void SendOnIOThread(Message* message); + // Signal all the pending sends as done, used in an error condition. + void SignalAllEvents(); + + // The channel to which this filter was added. + Channel* channel_; + + MessageLoop* listener_loop_; // The process's main thread. + MessageLoop* io_loop_; // The message loop where the Channel lives. + + typedef base::hash_map PendingSyncMessages; + PendingSyncMessages pending_sync_messages_; + + // Locks data members above. + Lock lock_; + + base::WaitableEvent* shutdown_event_; + + DISALLOW_COPY_AND_ASSIGN(SyncMessageFilter); +}; + +} // namespace IPC + +#endif // IPC_IPC_SYNC_MESSAGE_FILTER_H_ -- cgit v1.1