diff options
author | satorux@chromium.org <satorux@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2011-08-17 20:58:12 +0000 |
---|---|---|
committer | satorux@chromium.org <satorux@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2011-08-17 20:58:12 +0000 |
commit | a510761104333ec7da3943399f059ff693839856 (patch) | |
tree | 2a9f81a60bccf4db3367ad1b8f71e9ea130efb7d | |
parent | 07f93af15169ee054552a376ac4953abd1346cb2 (diff) | |
download | chromium_src-a510761104333ec7da3943399f059ff693839856.zip chromium_src-a510761104333ec7da3943399f059ff693839856.tar.gz chromium_src-a510761104333ec7da3943399f059ff693839856.tar.bz2 |
Implement Bus and ObjectProxy classes for our D-Bus library.
ObjectProxy is used to access remote objects.
ExportedObject is used to export objects to other D-Bus
BUG=90036
TEST=run unit tests. The code is not yet used in Chrome.
Review URL: http://codereview.chromium.org/7491029
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@97204 0039d316-1c4b-4281-b951-d872f2087c98
-rw-r--r-- | dbus/bus.cc | 613 | ||||
-rw-r--r-- | dbus/bus.h | 367 | ||||
-rw-r--r-- | dbus/dbus.gyp | 11 | ||||
-rw-r--r-- | dbus/end_to_end_async_unittest.cc | 192 | ||||
-rw-r--r-- | dbus/end_to_end_sync_unittest.cc | 104 | ||||
-rw-r--r-- | dbus/exported_object.cc | 259 | ||||
-rw-r--r-- | dbus/exported_object.h | 144 | ||||
-rw-r--r-- | dbus/object_proxy.cc | 197 | ||||
-rw-r--r-- | dbus/object_proxy.h | 116 | ||||
-rw-r--r-- | dbus/scoped_dbus_error.h | 34 | ||||
-rw-r--r-- | dbus/test_service.cc | 101 | ||||
-rw-r--r-- | dbus/test_service.h | 73 |
12 files changed, 2211 insertions, 0 deletions
diff --git a/dbus/bus.cc b/dbus/bus.cc new file mode 100644 index 0000000..2e8fb47 --- /dev/null +++ b/dbus/bus.cc @@ -0,0 +1,613 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. +// +// TODO(satorux): +// - Handle "disconnected" signal. +// - Add support for signal sending +// - Add support for signal monitoring +// - Collect metrics (ex. # of method calls, method call time, etc.) + +#include "dbus/bus.h" + +#include "base/bind.h" +#include "base/logging.h" +#include "base/message_loop.h" +#include "base/stl_util.h" +#include "base/threading/thread.h" +#include "base/threading/thread_restrictions.h" +#include "dbus/exported_object.h" +#include "dbus/object_proxy.h" +#include "dbus/scoped_dbus_error.h" + +namespace dbus { + +namespace { + +// The class is used for watching the file descriptor used for D-Bus +// communication. +class Watch : public base::MessagePumpLibevent::Watcher { + public: + Watch(DBusWatch* watch) + : raw_watch_(watch) { + dbus_watch_set_data(raw_watch_, this, NULL); + } + + ~Watch() { + dbus_watch_set_data(raw_watch_, NULL, NULL); + } + + // Returns true if the underlying file descriptor is ready to be watched. + bool IsReadyToBeWatched() { + return dbus_watch_get_enabled(raw_watch_); + } + + // Starts watching the underlying file descriptor. + void StartWatching() { + const int file_descriptor = dbus_watch_get_unix_fd(raw_watch_); + const int flags = dbus_watch_get_flags(raw_watch_); + + MessageLoopForIO::Mode mode = MessageLoopForIO::WATCH_READ; + if ((flags & DBUS_WATCH_READABLE) && (flags & DBUS_WATCH_WRITABLE)) + mode = MessageLoopForIO::WATCH_READ_WRITE; + else if (flags & DBUS_WATCH_READABLE) + mode = MessageLoopForIO::WATCH_READ; + else if (flags & DBUS_WATCH_WRITABLE) + mode = MessageLoopForIO::WATCH_WRITE; + else + NOTREACHED(); + + const bool persistent = true; // Watch persistently. + const bool success = MessageLoopForIO::current()->WatchFileDescriptor( + file_descriptor, + persistent, + mode, + &file_descriptor_watcher_, + this); + CHECK(success) << "Unable to allocate memory"; + } + + // Stops watching the underlying file descriptor. + void StopWatching() { + file_descriptor_watcher_.StopWatchingFileDescriptor(); + } + + private: + // Implement MessagePumpLibevent::Watcher. + virtual void OnFileCanReadWithoutBlocking(int file_descriptor) { + const bool success = dbus_watch_handle(raw_watch_, DBUS_WATCH_READABLE); + CHECK(success) << "Unable to allocate memory"; + } + + // Implement MessagePumpLibevent::Watcher. + virtual void OnFileCanWriteWithoutBlocking(int file_descriptor) { + const bool success = dbus_watch_handle(raw_watch_, DBUS_WATCH_WRITABLE); + CHECK(success) << "Unable to allocate memory"; + } + + DBusWatch* raw_watch_; + base::MessagePumpLibevent::FileDescriptorWatcher file_descriptor_watcher_; +}; + +// The class is used for monitoring the timeout used for D-Bus method +// calls. +// +// Unlike Watch, Timeout is a ref counted object, to ensure that |this| of +// the object is is alive when HandleTimeout() is called. It's unlikely +// but it may be possible that HandleTimeout() is called after +// Bus::OnRemoveTimeout(). That's why we don't simply delete the object in +// Bus::OnRemoveTimeout(). +class Timeout : public base::RefCountedThreadSafe<Timeout> { + public: + Timeout(DBusTimeout* timeout) + : raw_timeout_(timeout), + monitoring_is_active_(false), + is_completed(false) { + dbus_timeout_set_data(raw_timeout_, this, NULL); + AddRef(); // Balanced on Complete(). + } + + // Returns true if the timeout is ready to be monitored. + bool IsReadyToBeMonitored() { + return dbus_timeout_get_enabled(raw_timeout_); + } + + // Starts monitoring the timeout. + void StartMonitoring(dbus::Bus* bus) { + bus->PostDelayedTaskToDBusThread(FROM_HERE, + base::Bind(&Timeout::HandleTimeout, + this), + GetIntervalInMs()); + monitoring_is_active_ = true; + } + + // Stops monitoring the timeout. + void StopMonitoring() { + // We cannot take back the delayed task we posted in + // StartMonitoring(), so we just mark the monitoring is inactive now. + monitoring_is_active_ = false; + } + + // Returns the interval in milliseconds. + int GetIntervalInMs() { + return dbus_timeout_get_interval(raw_timeout_); + } + + // Cleans up the raw_timeout and marks that timeout is completed. + // See the class comment above for why we are doing this. + void Complete() { + dbus_timeout_set_data(raw_timeout_, NULL, NULL); + is_completed = true; + Release(); + } + + private: + friend class base::RefCountedThreadSafe<Timeout>; + ~Timeout() { + } + + // Handles the timeout. + void HandleTimeout() { + // If the timeout is marked completed, we should do nothing. This can + // occur if this function is called after Bus::OnRemoveTimeout(). + if (is_completed) + return; + // Skip if monitoring is cancled. + if (!monitoring_is_active_) + return; + + const bool success = dbus_timeout_handle(raw_timeout_); + CHECK(success) << "Unable to allocate memory"; + } + + DBusTimeout* raw_timeout_; + bool monitoring_is_active_; + bool is_completed; +}; + +} // namespace + +Bus::Options::Options() + : bus_type(SESSION), + connection_type(PRIVATE), + dbus_thread(NULL) { +} + +Bus::Options::~Options() { +} + +Bus::Bus(const Options& options) + : bus_type_(options.bus_type), + connection_type_(options.connection_type), + dbus_thread_(options.dbus_thread), + connection_(NULL), + origin_loop_(MessageLoop::current()), + origin_thread_id_(base::PlatformThread::CurrentId()), + dbus_thread_id_(base::kInvalidThreadId), + async_operations_are_set_up_(false), + num_pending_watches_(0), + num_pending_timeouts_(0) { + if (dbus_thread_) { + dbus_thread_id_ = dbus_thread_->thread_id(); + DCHECK(dbus_thread_->IsRunning()) + << "The D-Bus thread should be running"; + DCHECK_EQ(MessageLoop::TYPE_IO, + dbus_thread_->message_loop()->type()) + << "The D-Bus thread should have an MessageLoopForIO attached"; + } + + // This is safe to call multiple times. + dbus_threads_init_default(); +} + +Bus::~Bus() { + DCHECK(!connection_); + DCHECK(owned_service_names_.empty()); + DCHECK_EQ(0, num_pending_watches_); + DCHECK_EQ(0, num_pending_timeouts_); +} + +ObjectProxy* Bus::GetObjectProxy(const std::string& service_name, + const std::string& object_path) { + AssertOnOriginThread(); + + scoped_refptr<ObjectProxy> object_proxy = + new ObjectProxy(this, service_name, object_path); + object_proxies_.push_back(object_proxy); + + return object_proxy; +} + +ExportedObject* Bus::GetExportedObject(const std::string& service_name, + const std::string& object_path) { + AssertOnOriginThread(); + + scoped_refptr<ExportedObject> exported_object = + new ExportedObject(this, service_name, object_path); + exported_objects_.push_back(exported_object); + + return exported_object; +} + +bool Bus::Connect() { + // dbus_bus_get_private() and dbus_bus_get() are blocking calls. + AssertOnDBusThread(); + + // Check if it's already initialized. + if (connection_) + return true; + + ScopedDBusError error; + const DBusBusType dbus_bus_type = static_cast<DBusBusType>(bus_type_); + if (connection_type_ == PRIVATE) { + connection_ = dbus_bus_get_private(dbus_bus_type, error.get()); + } else { + connection_ = dbus_bus_get(dbus_bus_type, error.get()); + } + if (!connection_) { + LOG(ERROR) << "Failed to connect to the bus: " + << (dbus_error_is_set(error.get()) ? error.message() : ""); + return false; + } + // We shouldn't exit on the disconnected signal. + dbus_connection_set_exit_on_disconnect(connection_, false); + + return true; +} + +void Bus::ShutdownAndBlock() { + AssertOnDBusThread(); + + // Unregister the exported objects. + for (size_t i = 0; i < exported_objects_.size(); ++i) { + exported_objects_[i]->Unregister(); + } + + // Release all service names. + for (std::set<std::string>::iterator iter = owned_service_names_.begin(); + iter != owned_service_names_.end();) { + // This is a bit tricky but we should increment the iter here as + // ReleaseOwnership() may remove |service_name| from the set. + const std::string& service_name = *iter++; + ReleaseOwnership(service_name); + } + if (!owned_service_names_.empty()) { + LOG(ERROR) << "Failed to release all service names. # of services left: " + << owned_service_names_.size(); + } + + // Private connection should be closed. + if (connection_ && connection_type_ == PRIVATE) { + dbus_connection_close(connection_); + } + // dbus_connection_close() won't unref. + dbus_connection_unref(connection_); + + connection_ = NULL; +} + +void Bus::Shutdown(OnShutdownCallback callback) { + AssertOnOriginThread(); + + PostTaskToDBusThread(FROM_HERE, base::Bind(&Bus::ShutdownInternal, + this, + callback)); +} + +bool Bus::RequestOwnership(const std::string& service_name) { + DCHECK(connection_); + // dbus_bus_request_name() is a blocking call. + AssertOnDBusThread(); + + // Check if we already own the service name. + if (owned_service_names_.find(service_name) != owned_service_names_.end()) { + return true; + } + + ScopedDBusError error; + const int result = dbus_bus_request_name(connection_, + service_name.c_str(), + DBUS_NAME_FLAG_DO_NOT_QUEUE, + error.get()); + if (result != DBUS_REQUEST_NAME_REPLY_PRIMARY_OWNER) { + LOG(ERROR) << "Failed to get the onwership of " << service_name << ": " + << (dbus_error_is_set(error.get()) ? error.message() : ""); + return false; + } + owned_service_names_.insert(service_name); + return true; +} + +bool Bus::ReleaseOwnership(const std::string& service_name) { + DCHECK(connection_); + // dbus_bus_request_name() is a blocking call. + AssertOnDBusThread(); + + // Check if we already own the service name. + std::set<std::string>::iterator found = + owned_service_names_.find(service_name); + if (found == owned_service_names_.end()) { + LOG(ERROR) << service_name << " is not owned by the bus"; + return false; + } + + ScopedDBusError error; + const int result = dbus_bus_release_name(connection_, service_name.c_str(), + error.get()); + if (result == DBUS_RELEASE_NAME_REPLY_RELEASED) { + owned_service_names_.erase(found); + return true; + } else { + LOG(ERROR) << "Failed to release the onwership of " << service_name << ": " + << (error.is_set() ? error.message() : ""); + return false; + } +} + +bool Bus::SetUpAsyncOperations() { + DCHECK(connection_); + AssertOnDBusThread(); + + if (async_operations_are_set_up_) + return true; + + // Process all the incoming data if any, so that OnDispatchStatus() will + // be called when the incoming data is ready. + ProcessAllIncomingDataIfAny(); + + bool success = dbus_connection_set_watch_functions(connection_, + &Bus::OnAddWatchThunk, + &Bus::OnRemoveWatchThunk, + &Bus::OnToggleWatchThunk, + this, + NULL); + CHECK(success) << "Unable to allocate memory"; + + // TODO(satorux): Timeout is not yet implemented. + success = dbus_connection_set_timeout_functions(connection_, + &Bus::OnAddTimeoutThunk, + &Bus::OnRemoveTimeoutThunk, + &Bus::OnToggleTimeoutThunk, + this, + NULL); + CHECK(success) << "Unable to allocate memory"; + + dbus_connection_set_dispatch_status_function( + connection_, + &Bus::OnDispatchStatusChangedThunk, + this, + NULL); + + async_operations_are_set_up_ = true; + + return true; +} + +DBusMessage* Bus::SendWithReplyAndBlock(DBusMessage* request, + int timeout_ms, + DBusError* error) { + DCHECK(connection_); + AssertOnDBusThread(); + + return dbus_connection_send_with_reply_and_block( + connection_, request, timeout_ms, error); +} + +void Bus::SendWithReply(DBusMessage* request, + DBusPendingCall** pending_call, + int timeout_ms) { + DCHECK(connection_); + AssertOnDBusThread(); + + const bool success = dbus_connection_send_with_reply( + connection_, request, pending_call, timeout_ms); + CHECK(success) << "Unable to allocate memory"; +} + +bool Bus::TryRegisterObjectPath(const std::string& object_path, + const DBusObjectPathVTable* vtable, + void* user_data, + DBusError* error) { + DCHECK(connection_); + AssertOnDBusThread(); + + return dbus_connection_try_register_object_path( + connection_, + object_path.c_str(), + vtable, + user_data, + error); +} + +void Bus::UnregisterObjectPath(const std::string& object_path) { + DCHECK(connection_); + AssertOnDBusThread(); + + const bool success = dbus_connection_unregister_object_path( + connection_, + object_path.c_str()); + CHECK(success) << "Unable to allocate memory"; +} + +void Bus::ShutdownInternal(OnShutdownCallback callback) { + AssertOnDBusThread(); + + ShutdownAndBlock(); + PostTaskToOriginThread(FROM_HERE, callback); +} + +void Bus::ProcessAllIncomingDataIfAny() { + AssertOnDBusThread(); + + // As mentioned at the class comment in .h file, connection_ can be NULL. + if (!connection_ || !dbus_connection_get_is_connected(connection_)) + return; + + if (dbus_connection_get_dispatch_status(connection_) == + DBUS_DISPATCH_DATA_REMAINS) { + while (dbus_connection_dispatch(connection_) == + DBUS_DISPATCH_DATA_REMAINS); + } +} + +void Bus::PostTaskToOriginThread(const tracked_objects::Location& from_here, + const base::Closure& task) { + origin_loop_->PostTask(from_here, task); +} + +void Bus::PostTaskToDBusThread(const tracked_objects::Location& from_here, + const base::Closure& task) { + if (dbus_thread_) + dbus_thread_->message_loop()->PostTask(from_here, task); + else + origin_loop_->PostTask(from_here, task); +} + +void Bus::PostDelayedTaskToDBusThread( + const tracked_objects::Location& from_here, + const base::Closure& task, + int delay_ms) { + if (dbus_thread_) + dbus_thread_->message_loop()->PostDelayedTask(from_here, task, delay_ms); + else + origin_loop_->PostDelayedTask(from_here, task, delay_ms); +} + +bool Bus::HasDBusThread() { + return dbus_thread_ != NULL; +} + +void Bus::AssertOnOriginThread() { + DCHECK_EQ(origin_thread_id_, base::PlatformThread::CurrentId()); +} + +void Bus::AssertOnDBusThread() { + base::ThreadRestrictions::AssertIOAllowed(); + + if (dbus_thread_) { + DCHECK_EQ(dbus_thread_id_, base::PlatformThread::CurrentId()); + } else { + AssertOnOriginThread(); + } +} + +dbus_bool_t Bus::OnAddWatch(DBusWatch* raw_watch) { + AssertOnDBusThread(); + + // watch will be deleted when raw_watch is removed in OnRemoveWatch(). + Watch* watch = new Watch(raw_watch); + if (watch->IsReadyToBeWatched()) { + watch->StartWatching(); + } + ++num_pending_watches_; + return true; +} + +void Bus::OnRemoveWatch(DBusWatch* raw_watch) { + AssertOnDBusThread(); + + Watch* watch = static_cast<Watch*>(dbus_watch_get_data(raw_watch)); + delete watch; + --num_pending_watches_; +} + +void Bus::OnToggleWatch(DBusWatch* raw_watch) { + AssertOnDBusThread(); + + Watch* watch = static_cast<Watch*>(dbus_watch_get_data(raw_watch)); + if (watch->IsReadyToBeWatched()) { + watch->StartWatching(); + } else { + // It's safe to call this if StartWatching() wasn't called, per + // message_pump_libevent.h. + watch->StopWatching(); + } +} + +dbus_bool_t Bus::OnAddTimeout(DBusTimeout* raw_timeout) { + AssertOnDBusThread(); + + // timeout will be deleted when raw_timeout is removed in + // OnRemoveTimeoutThunk(). + Timeout* timeout = new Timeout(raw_timeout); + if (timeout->IsReadyToBeMonitored()) { + timeout->StartMonitoring(this); + } + ++num_pending_timeouts_; + return true; +} + +void Bus::OnRemoveTimeout(DBusTimeout* raw_timeout) { + AssertOnDBusThread(); + + Timeout* timeout = static_cast<Timeout*>(dbus_timeout_get_data(raw_timeout)); + timeout->Complete(); + --num_pending_timeouts_; +} + +void Bus::OnToggleTimeout(DBusTimeout* raw_timeout) { + AssertOnDBusThread(); + + Timeout* timeout = static_cast<Timeout*>(dbus_timeout_get_data(raw_timeout)); + if (timeout->IsReadyToBeMonitored()) { + timeout->StartMonitoring(this); + } else { + timeout->StopMonitoring(); + } +} + +void Bus::OnDispatchStatusChanged(DBusConnection* connection, + DBusDispatchStatus status) { + DCHECK_EQ(connection, connection_); + AssertOnDBusThread(); + + if (!dbus_connection_get_is_connected(connection)) + return; + + // We cannot call ProcessAllIncomingDataIfAny() here, as calling + // dbus_connection_dispatch() inside DBusDispatchStatusFunction is + // prohibited by the D-Bus library. Hence, we post a task here instead. + // See comments for dbus_connection_set_dispatch_status_function(). + PostTaskToDBusThread(FROM_HERE, + base::Bind(&Bus::ProcessAllIncomingDataIfAny, + this)); +} + +dbus_bool_t Bus::OnAddWatchThunk(DBusWatch* raw_watch, void* data) { + Bus* self = static_cast<Bus*>(data); + return self->OnAddWatch(raw_watch); +} + +void Bus::OnRemoveWatchThunk(DBusWatch* raw_watch, void* data) { + Bus* self = static_cast<Bus*>(data); + return self->OnRemoveWatch(raw_watch); +} + +void Bus::OnToggleWatchThunk(DBusWatch* raw_watch, void* data) { + Bus* self = static_cast<Bus*>(data); + return self->OnToggleWatch(raw_watch); +} + +dbus_bool_t Bus::OnAddTimeoutThunk(DBusTimeout* raw_timeout, void* data) { + Bus* self = static_cast<Bus*>(data); + return self->OnAddTimeout(raw_timeout); +} + +void Bus::OnRemoveTimeoutThunk(DBusTimeout* raw_timeout, void* data) { + Bus* self = static_cast<Bus*>(data); + return self->OnRemoveTimeout(raw_timeout); +} + +void Bus::OnToggleTimeoutThunk(DBusTimeout* raw_timeout, void* data) { + Bus* self = static_cast<Bus*>(data); + return self->OnToggleTimeout(raw_timeout); +} + +void Bus::OnDispatchStatusChangedThunk(DBusConnection* connection, + DBusDispatchStatus status, + void* data) { + Bus* self = static_cast<Bus*>(data); + return self->OnDispatchStatusChanged(connection, status); +} + +} // namespace dbus diff --git a/dbus/bus.h b/dbus/bus.h new file mode 100644 index 0000000..4796c65 --- /dev/null +++ b/dbus/bus.h @@ -0,0 +1,367 @@ + +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef DBUS_BUS_H_ +#define DBUS_BUS_H_ +#pragma once + +#include <set> +#include <string> +#include <dbus/dbus.h> + +#include "base/callback.h" +#include "base/memory/ref_counted.h" +#include "base/memory/scoped_ptr.h" +#include "base/tracked_objects.h" + +class MessageLoop; + +namespace base { +class Thread; +} + +namespace dbus { + +class ExportedObject; +class ObjectProxy; + +// Bus is used to establish a connection with D-Bus, create object +// proxies, and export objects. +// +// For asynchronous operations such as an asynchronous method call, the +// bus object will use a message loop to monitor the underlying file +// descriptor used for D-Bus communication. By default, the bus will use +// the current thread's MessageLoopForIO. If |dbus_thread| option is +// specified, the bus will use the D-Bus thread's message loop. +// +// THREADING +// +// In the D-Bus library, we use the two threads: +// +// - The origin thread: the thread that created the Bus object. +// - The D-Bus thread: the thread supplifed by |dbus_thread| option. +// +// The origin thread is usually Chrome's UI thread. The D-Bus thread is +// usually a dedicated thread for the D-Bus library. +// +// BLOCKING CALLS +// +// Functions that issue blocking calls are marked "BLOCKING CALL" and +// these functions should be called in the D-Bus thread (if +// supplied). AssertOnDBusThread() is placed in these functions. +// +// Note that it's hard to tell if a libdbus function is actually blocking +// or not (ex. dbus_bus_request_name() internally calls +// dbus_connection_send_with_reply_and_block(), which is a blocking +// call). To err on the side, we consider all libdbus functions that deal +// with the connection to dbus-damoen to be blocking. +// +// EXAMPLE USAGE: +// +// Synchronous method call: +// +// dbus::Bus::Options options; +// // Set up the bus options here. +// ... +// dbus::Bus bus(options); +// +// dbus::ObjectProxy* object_proxy = +// bus.GetObjectProxy(service_name, object_path); +// +// dbus::MethodCall method_call(interface_name, method_name); +// dbus::Response response; +// bool success = +// object_proxy.CallMethodAndBlock(&method_call, timeout_ms, &response); +// +// Asynchronous method call: +// +// void OnResponse(dbus::Response* response) { +// // response is NULL if the method call failed. +// if (!response) +// return; +// } +// +// ... +// object_proxy.CallMethod(&method_call, timeout_ms, +// base::Bind(&OnResponse)); +// +// Exporting a method: +// +// Response* Echo(dbus::MethodCall* method_call) { +// // Do something with method_call. +// Response* response = Response::FromMethodCall(method_call); +// // Build response here. +// return response; +// } +// +// void OnExported(const std::string& interface_name, +// const std::string& object_path, +// bool success) { +// // success is true if the method was exported successfully. +// } +// +// ... +// dbus::ExportedObject* exported_object = +// bus.GetExportedObject(service_name, object_path); +// exported_object.ExportMethod(interface_name, method_name, +// base::Bind(&Echo), +// base::Bind(&OnExported)); +// +// WHY IS THIS A REF COUNTED OBJECT? +// +// Bus is a ref counted object, to ensure that |this| of the object is +// alive when callbacks referencing |this| are called. However, after +// Shutdown() is called, |connection_| can be NULL. Hence, calbacks should +// not rely on that |connection_| is alive. +class Bus : public base::RefCountedThreadSafe<Bus> { + public: + // Specifies the bus type. SESSION is used to communicate with per-user + // services like GNOME applications. SYSTEM is used to communicate with + // system-wide services like NetworkManager. + enum BusType { + SESSION = DBUS_BUS_SESSION, + SYSTEM = DBUS_BUS_SYSTEM, + }; + + // Specifies the connection type. PRIVATE should usually be used unless + // you are sure that SHARED is safe for you, which is unlikely the case + // in Chrome. + // + // PRIVATE gives you a private connection, that won't be shared with + // other Bus objects. + // + // SHARED gives you a connection shared among other Bus objects, which + // is unsafe if the connection is shared with multiple threads. + enum ConnectionType { + PRIVATE, + SHARED, + }; + + // Options used to create a Bus object. + struct Options { + Options(); + ~Options(); + + BusType bus_type; // SESSION by default. + ConnectionType connection_type; // PRIVATE by default. + // If the thread is set, the bus object will use the message loop + // attached to the thread to process asynchronous operations. + // + // The thread should meet the following requirements: + // 1) Already running. + // 2) Has a MessageLoopForIO. + // 3) Outlives the bus. + base::Thread* dbus_thread; // NULL by default. + }; + + // Called when shutdown is done. Used for Shutdown(). + typedef base::Callback<void ()> OnShutdownCallback; + + // Creates a Bus object. The actual connection will be established when + // Connect() is called. + explicit Bus(const Options& options); + + // Gets the object proxy for the given service name and the object path. + // The caller must not delete the returned object. The bus will own the + // object. Never returns NULL. + // + // The object proxy is used to call remote methods. + // + // |service_name| looks like "org.freedesktop.NetworkManager", and + // |object_path| looks like "/org/freedesktop/NetworkManager/Devices/0". + // + // Must be called in the origin thread. + virtual ObjectProxy* GetObjectProxy(const std::string& service_name, + const std::string& object_path); + + // Gets the exported object for the given service name and the object + // path. The caller must not delete the returned object. The bus will + // own the object. Never returns NULL. + // + // The exported object is used to export objects to other D-Bus clients. + // + // Must be called in the origin thread. + virtual ExportedObject* GetExportedObject(const std::string& service_name, + const std::string& object_path); + + // Shuts down the bus and blocks until it's done. More specifically, this + // function does the following: + // + // - Unregisters the object paths + // - Releases the service names + // - Closes the connection to dbus-daemon. + // + // BLOCKING CALL. + virtual void ShutdownAndBlock(); + + // Shuts down the bus in the D-Bus thread. |callback| will be called in + // the origin thread. + // + // Must be called in the origin thread. + virtual void Shutdown(OnShutdownCallback callback); + + // + // The public functions below are not intended to be used in client + // code. These are used to implement ObjectProxy and ExportedObject. + // + + // Connects the bus to the dbus-daemon. + // Returns true on success, or the bus is already connected. + // + // BLOCKING CALL. + virtual bool Connect(); + + // Requests the ownership of the given service name. + // Returns true on success, or the the service name is already obtained. + // + // BLOCKING CALL. + virtual bool RequestOwnership(const std::string& service_name); + + // Releases the ownership of the given service name. + // Returns true on success. + // + // BLOCKING CALL. + virtual bool ReleaseOwnership(const std::string& service_name); + + // Sets up async operations. + // Returns true on success, or it's already set up. + // This function needs to be called before starting async operations. + // + // BLOCKING CALL. + virtual bool SetUpAsyncOperations(); + + // Sends a message to the bus and blocks until the response is + // received. Used to implement synchronous method calls. + // + // BLOCKING CALL. + virtual DBusMessage* SendWithReplyAndBlock(DBusMessage* request, + int timeout_ms, + DBusError* error); + + // Requests to send a message to the bus. + // + // BLOCKING CALL. + virtual void SendWithReply(DBusMessage* request, + DBusPendingCall** pending_call, + int timeout_ms); + + // Tries to register the object path. + // + // BLOCKING CALL. + virtual bool TryRegisterObjectPath(const std::string& object_path, + const DBusObjectPathVTable* vtable, + void* user_data, + DBusError* error); + + // Unregister the object path. + // + // BLOCKING CALL. + virtual void UnregisterObjectPath(const std::string& object_path); + + // Posts the task to the message loop of the thread that created the bus. + virtual void PostTaskToOriginThread( + const tracked_objects::Location& from_here, + const base::Closure& task); + + // Posts the task to the message loop of the D-Bus thread. If D-Bus + // thread is not supplied, the message loop of the origin thread will be + // used. + virtual void PostTaskToDBusThread( + const tracked_objects::Location& from_here, + const base::Closure& task); + + // Posts the delayed task to the message loop of the D-Bus thread. If + // D-Bus thread is not supplied, the message loop of the origin thread + // will be used. + virtual void PostDelayedTaskToDBusThread( + const tracked_objects::Location& from_here, + const base::Closure& task, + int delay_ms); + + // Returns true if the bus has the D-Bus thread. + virtual bool HasDBusThread(); + + // Check whether the current thread is on the origin thread (the thread + // that created the bus). If not, DCHECK will fail. + virtual void AssertOnOriginThread(); + + // Check whether the current thread is on the D-Bus thread. If not, + // DCHECK will fail. If the D-Bus thread is not supplied, it calls + // AssertOnOriginThread(). + virtual void AssertOnDBusThread(); + + private: + friend class base::RefCountedThreadSafe<Bus>; + virtual ~Bus(); + + // Helper function used for Shutdown(). + void ShutdownInternal(OnShutdownCallback callback); + + // Processes the all incoming data to the connection, if any. + // + // BLOCKING CALL. + void ProcessAllIncomingDataIfAny(); + + // Called when a watch object is added. Used to start monitoring the + // file descriptor used for D-Bus communication. + dbus_bool_t OnAddWatch(DBusWatch* raw_watch); + + // Called when a watch object is removed. + void OnRemoveWatch(DBusWatch* raw_watch); + + // Called when the "enabled" status of |raw_watch| is toggled. + void OnToggleWatch(DBusWatch* raw_watch); + + // Called when a timeout object is added. Used to start monitoring + // timeout for method calls. + dbus_bool_t OnAddTimeout(DBusTimeout* raw_timeout); + + // Called when a timeout object is removed. + void OnRemoveTimeout(DBusTimeout* raw_timeout); + + // Called when the "enabled" status of |raw_timeout| is toggled. + void OnToggleTimeout(DBusTimeout* raw_timeout); + + // Called when the dispatch status (i.e. if any incoming data is + // available) is changed. + void OnDispatchStatusChanged(DBusConnection* connection, + DBusDispatchStatus status); + + // Callback helper functions. Redirects to the corresponding member function. + static dbus_bool_t OnAddWatchThunk(DBusWatch* raw_watch, void* data); + static void OnRemoveWatchThunk(DBusWatch* raw_watch, void* data); + static void OnToggleWatchThunk(DBusWatch* raw_watch, void* data); + static dbus_bool_t OnAddTimeoutThunk(DBusTimeout* raw_timeout, void* data); + static void OnRemoveTimeoutThunk(DBusTimeout* raw_timeout, void* data); + static void OnToggleTimeoutThunk(DBusTimeout* raw_timeout, void* data); + static void OnDispatchStatusChangedThunk(DBusConnection* connection, + DBusDispatchStatus status, + void* data); + const BusType bus_type_; + const ConnectionType connection_type_; + base::Thread* dbus_thread_; + DBusConnection* connection_; + + MessageLoop* origin_loop_; + base::PlatformThreadId origin_thread_id_; + base::PlatformThreadId dbus_thread_id_; + + std::set<std::string> owned_service_names_; + std::vector<scoped_refptr<dbus::ObjectProxy> > object_proxies_; + std::vector<scoped_refptr<dbus::ExportedObject> > exported_objects_; + + bool async_operations_are_set_up_; + + // Counters to make sure that OnAddWatch()/OnRemoveWatch() and + // OnAddTimeout()/OnRemoveTimeou() are balanced. + int num_pending_watches_; + int num_pending_timeouts_; + + DISALLOW_COPY_AND_ASSIGN(Bus); +}; + +} // namespace dbus + +#endif // DBUS_BUS_H_ diff --git a/dbus/dbus.gyp b/dbus/dbus.gyp index c1fd9fe..a310931 100644 --- a/dbus/dbus.gyp +++ b/dbus/dbus.gyp @@ -15,8 +15,15 @@ '../build/linux/system.gyp:dbus', ], 'sources': [ + 'bus.cc', + 'bus.h', + 'exported_object.h', + 'exported_object.cc', 'message.cc', 'message.h', + 'object_proxy.cc', + 'object_proxy.h', + 'scoped_dbus_error.h', ], }, { @@ -31,6 +38,10 @@ 'sources': [ '../base/test/run_all_unittests.cc', 'message_unittest.cc', + 'end_to_end_async_unittest.cc', + 'end_to_end_sync_unittest.cc', + 'test_service.cc', + 'test_service.h', ], 'include_dirs': [ '..', diff --git a/dbus/end_to_end_async_unittest.cc b/dbus/end_to_end_async_unittest.cc new file mode 100644 index 0000000..c3689ad --- /dev/null +++ b/dbus/end_to_end_async_unittest.cc @@ -0,0 +1,192 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include <algorithm> +#include <string> +#include <vector> + +#include "base/bind.h" +#include "base/memory/scoped_ptr.h" +#include "base/message_loop.h" +#include "base/stl_util.h" +#include "base/threading/thread.h" +#include "base/threading/thread_restrictions.h" +#include "dbus/bus.h" +#include "dbus/message.h" +#include "dbus/object_proxy.h" +#include "dbus/test_service.h" +#include "testing/gtest/include/gtest/gtest.h" + +// The end-to-end test exercises the asynchronos APIs in ObjectProxy and +// ExportedObject. +class EndToEndAsyncTest : public testing::Test { + public: + EndToEndAsyncTest() { + } + + void SetUp() { + // Make the main thread not to allow IO. + base::ThreadRestrictions::SetIOAllowed(false); + + // Start the test service; + test_service_.reset(new dbus::TestService); + test_service_->StartService(); + test_service_->WaitUntilServiceIsStarted(); + + // Start the D-Bus thread. + dbus_thread_.reset(new base::Thread("D-Bus Thread")); + base::Thread::Options thread_options; + thread_options.message_loop_type = MessageLoop::TYPE_IO; + dbus_thread_->StartWithOptions(thread_options); + + // Create the client. + dbus::Bus::Options bus_options; + bus_options.bus_type = dbus::Bus::SESSION; + bus_options.connection_type = dbus::Bus::PRIVATE; + bus_options.dbus_thread = dbus_thread_.get(); + bus_ = new dbus::Bus(bus_options); + object_proxy_ = bus_->GetObjectProxy("org.chromium.TestService", + "/org/chromium/TestObject"); + } + + void TearDown() { + bus_->Shutdown(base::Bind(&EndToEndAsyncTest::OnShutdown, + base::Unretained(this))); + // Wait until the bus is shutdown. OnShutdown() will be called in + // mesage_loop_. + message_loop_.Run(); + + // Reset to the default. + base::ThreadRestrictions::SetIOAllowed(true); + + // Stopping a thread is considred an IO operation, so do this after + // allowing IO. + test_service_->Stop(); + } + + protected: + // Calls the method asynchronosly. OnResponse() will be called once the + // response is received. + void CallMethod(dbus::MethodCall* method_call, + int timeout_ms) { + object_proxy_->CallMethod(method_call, + timeout_ms, + base::Bind(&EndToEndAsyncTest::OnResponse, + base::Unretained(this))); + } + + // Wait for the give number of responses. + void WaitForResponses(size_t num_responses) { + while (response_strings_.size() < num_responses) { + message_loop_.Run(); + } + } + + // Called when the response is received. + void OnResponse(dbus::Response* response) { + // |response| will be deleted on exit of the function. Copy the + // payload to |response_strings_|. + if (response) { + dbus::MessageReader reader(response); + std::string response_string; + ASSERT_TRUE(reader.PopString(&response_string)); + response_strings_.push_back(response_string); + } else { + response_strings_.push_back(""); + } + message_loop_.Quit(); + }; + + // Called when the shutdown is complete. + void OnShutdown() { + message_loop_.Quit(); + } + + MessageLoop message_loop_; + std::vector<std::string> response_strings_; + scoped_ptr<base::Thread> dbus_thread_; + scoped_refptr<dbus::Bus> bus_; + dbus::ObjectProxy* object_proxy_; + scoped_ptr<dbus::TestService> test_service_; +}; + +TEST_F(EndToEndAsyncTest, Echo) { + const char* kHello = "hello"; + + // Create the method call. + dbus::MethodCall method_call("org.chromium.TestInterface", "Echo"); + dbus::MessageWriter writer(&method_call); + writer.AppendString(kHello); + + // Call the method. + const int timeout_ms = dbus::ObjectProxy::TIMEOUT_USE_DEFAULT; + CallMethod(&method_call, timeout_ms); + + // Check the response. + WaitForResponses(1); + EXPECT_EQ(kHello, response_strings_[0]); +} + +// Call Echo method three times. +TEST_F(EndToEndAsyncTest, EchoThreeTimes) { + const char* kMessages[] = { "foo", "bar", "baz" }; + + for (size_t i = 0; i < arraysize(kMessages); ++i) { + // Create the method call. + dbus::MethodCall method_call("org.chromium.TestInterface", "Echo"); + dbus::MessageWriter writer(&method_call); + writer.AppendString(kMessages[i]); + + // Call the method. + const int timeout_ms = dbus::ObjectProxy::TIMEOUT_USE_DEFAULT; + CallMethod(&method_call, timeout_ms); + } + + // Check the responses. + WaitForResponses(3); + // Sort as the order of the returned messages is not deterministic. + std::sort(response_strings_.begin(), response_strings_.end()); + EXPECT_EQ("bar", response_strings_[0]); + EXPECT_EQ("baz", response_strings_[1]); + EXPECT_EQ("foo", response_strings_[2]); +} + +TEST_F(EndToEndAsyncTest, Timeout) { + const char* kHello = "hello"; + + // Create the method call. + dbus::MethodCall method_call("org.chromium.TestInterface", "SlowEcho"); + dbus::MessageWriter writer(&method_call); + writer.AppendString(kHello); + + // Call the method with timeout smaller than TestService::kSlowEchoSleepMs. + const int timeout_ms = dbus::TestService::kSlowEchoSleepMs / 10; + CallMethod(&method_call, timeout_ms); + WaitForResponses(1); + + // Should fail because of timeout. + ASSERT_EQ("", response_strings_[0]); +} + +TEST_F(EndToEndAsyncTest, NonexistentMethod) { + dbus::MethodCall method_call("org.chromium.TestInterface", "Nonexistent"); + + const int timeout_ms = dbus::ObjectProxy::TIMEOUT_USE_DEFAULT; + CallMethod(&method_call, timeout_ms); + WaitForResponses(1); + + // Should fail because the method is nonexistent. + ASSERT_EQ("", response_strings_[0]); +} + +TEST_F(EndToEndAsyncTest, BrokenMethod) { + dbus::MethodCall method_call("org.chromium.TestInterface", "BrokenMethod"); + + const int timeout_ms = dbus::ObjectProxy::TIMEOUT_USE_DEFAULT; + CallMethod(&method_call, timeout_ms); + WaitForResponses(1); + + // Should fail because the method is broken. + ASSERT_EQ("", response_strings_[0]); +} diff --git a/dbus/end_to_end_sync_unittest.cc b/dbus/end_to_end_sync_unittest.cc new file mode 100644 index 0000000..2f81878 --- /dev/null +++ b/dbus/end_to_end_sync_unittest.cc @@ -0,0 +1,104 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "base/memory/ref_counted.h" +#include "base/memory/scoped_ptr.h" +#include "dbus/bus.h" +#include "dbus/message.h" +#include "dbus/object_proxy.h" +#include "dbus/test_service.h" +#include "testing/gtest/include/gtest/gtest.h" + +// The end-to-end test exercises the synchronos APIs in ObjectProxy and +// ExportedObject. The test will launch a thread for the service side +// operations (i.e. ExportedObject side). +class EndToEndSyncTest : public testing::Test { + public: + EndToEndSyncTest() { + } + + void SetUp() { + // Start the test service; + test_service_.reset(new dbus::TestService); + test_service_->StartService(); + test_service_->WaitUntilServiceIsStarted(); + + // Create the client. + dbus::Bus::Options client_bus_options; + client_bus_options.bus_type = dbus::Bus::SESSION; + client_bus_options.connection_type = dbus::Bus::PRIVATE; + client_bus_ = new dbus::Bus(client_bus_options); + object_proxy_ = client_bus_->GetObjectProxy("org.chromium.TestService", + "/org/chromium/TestObject"); + } + + void TearDown() { + test_service_->Stop(); + client_bus_->ShutdownAndBlock(); + } + + protected: + scoped_ptr<dbus::TestService> test_service_; + scoped_refptr<dbus::Bus> client_bus_; + dbus::ObjectProxy* object_proxy_; +}; + +TEST_F(EndToEndSyncTest, Echo) { + const std::string kHello = "hello"; + + // Create the method call. + dbus::MethodCall method_call("org.chromium.TestInterface", "Echo"); + dbus::MessageWriter writer(&method_call); + writer.AppendString(kHello); + + // Call the method. + dbus::Response response; + const int timeout_ms = dbus::ObjectProxy::TIMEOUT_USE_DEFAULT; + const bool success = + object_proxy_->CallMethodAndBlock(&method_call, timeout_ms, &response); + ASSERT_TRUE(success); + + // Check the response. kHello should be echoed back. + dbus::MessageReader reader(&response); + std::string returned_message; + ASSERT_TRUE(reader.PopString(&returned_message)); + EXPECT_EQ(kHello, returned_message); +} + +TEST_F(EndToEndSyncTest, Timeout) { + const std::string kHello = "hello"; + + // Create the method call. + dbus::MethodCall method_call("org.chromium.TestInterface", "DelayedEcho"); + dbus::MessageWriter writer(&method_call); + writer.AppendString(kHello); + + // Call the method with timeout smaller than TestService::kSlowEchoSleepMs. + dbus::Response response; + const int timeout_ms = dbus::TestService::kSlowEchoSleepMs / 10; + const bool success = + object_proxy_->CallMethodAndBlock(&method_call, timeout_ms, &response); + // Should fail because of timeout. + ASSERT_FALSE(success); +} + +TEST_F(EndToEndSyncTest, NonexistentMethod) { + dbus::MethodCall method_call("org.chromium.TestInterface", "Nonexistent"); + + dbus::Response response; + const int timeout_ms = dbus::ObjectProxy::TIMEOUT_USE_DEFAULT; + const bool success = + object_proxy_->CallMethodAndBlock(&method_call, timeout_ms, &response); + ASSERT_FALSE(success); +} + +TEST_F(EndToEndSyncTest, BrokenMethod) { + dbus::MethodCall method_call("org.chromium.TestInterface", "BrokenMethod"); + + dbus::Response response; + const int timeout_ms = dbus::ObjectProxy::TIMEOUT_USE_DEFAULT; + const bool success = + object_proxy_->CallMethodAndBlock(&method_call, timeout_ms, &response); + ASSERT_FALSE(success); +} diff --git a/dbus/exported_object.cc b/dbus/exported_object.cc new file mode 100644 index 0000000..1793ed2 --- /dev/null +++ b/dbus/exported_object.cc @@ -0,0 +1,259 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "dbus/exported_object.h" + +#include "base/bind.h" +#include "base/logging.h" +#include "base/memory/ref_counted.h" +#include "base/message_loop.h" +#include "base/threading/thread_restrictions.h" +#include "base/time.h" +#include "dbus/bus.h" +#include "dbus/message.h" +#include "dbus/scoped_dbus_error.h" + +namespace dbus { + +namespace { + +// Gets the absolute method name by concatenating the interface name and +// the method name. Used for building keys for method_table_ in +// ExportedObject. +std::string GetAbsoluteMethodName( + const std::string& interface_name, + const std::string& method_name) { + return interface_name + "." + method_name; +} + +} // namespace + +ExportedObject::ExportedObject(Bus* bus, + const std::string& service_name, + const std::string& object_path) + : bus_(bus), + service_name_(service_name), + object_path_(object_path), + object_is_registered_(false), + method_is_called_(false), + response_from_method_(NULL), + on_method_is_called_(&method_is_called_lock_) { +} + +ExportedObject::~ExportedObject() { + DCHECK(!object_is_registered_); +} + +bool ExportedObject::ExportMethodAndBlock( + const std::string& interface_name, + const std::string& method_name, + MethodCallCallback method_call_callback) { + bus_->AssertOnDBusThread(); + + if (!bus_->Connect()) + return false; + if (!bus_->SetUpAsyncOperations()) + return false; + if (!bus_->RequestOwnership(service_name_)) + return false; + if (!Register()) + return false; + + const std::string absolute_method_name = + GetAbsoluteMethodName(interface_name, method_name); + if (method_table_.find(absolute_method_name) != method_table_.end()) { + LOG(ERROR) << absolute_method_name << " is already exported"; + return false; + } + method_table_[absolute_method_name] = method_call_callback; + + return true; +} + +void ExportedObject::ExportMethod(const std::string& interface_name, + const std::string& method_name, + MethodCallCallback method_call_callback, + OnExportedCallback on_exported_calback) { + bus_->AssertOnOriginThread(); + + base::Closure task = base::Bind(&ExportedObject::ExportMethodInternal, + this, + interface_name, + method_name, + method_call_callback, + on_exported_calback); + bus_->PostTaskToDBusThread(FROM_HERE, task); +} + +void ExportedObject::Unregister() { + bus_->AssertOnDBusThread(); + + if (!object_is_registered_) + return; + + bus_->UnregisterObjectPath(object_path_); + object_is_registered_ = false; +} + +void ExportedObject::ExportMethodInternal( + const std::string& interface_name, + const std::string& method_name, + MethodCallCallback method_call_callback, + OnExportedCallback on_exported_calback) { + bus_->AssertOnDBusThread(); + + const bool success = ExportMethodAndBlock(interface_name, + method_name, + method_call_callback); + bus_->PostTaskToOriginThread(FROM_HERE, + base::Bind(&ExportedObject::OnExported, + this, + on_exported_calback, + interface_name, + method_name, + success)); +} + +void ExportedObject::OnExported(OnExportedCallback on_exported_callback, + const std::string& interface_name, + const std::string& method_name, + bool success) { + bus_->AssertOnOriginThread(); + + on_exported_callback.Run(interface_name, method_name, success); +} + +bool ExportedObject::Register() { + bus_->AssertOnDBusThread(); + + if (object_is_registered_) + return true; + + ScopedDBusError error; + + DBusObjectPathVTable vtable = {}; + vtable.message_function = &ExportedObject::HandleMessageThunk; + vtable.unregister_function = &ExportedObject::OnUnregisteredThunk; + const bool success = bus_->TryRegisterObjectPath(object_path_, + &vtable, + this, + error.get()); + if (!success) { + LOG(ERROR) << "Failed to regiser the object: " << object_path_ << ": " + << (error.is_set() ? error.message() : ""); + return false; + } + + object_is_registered_ = true; + return true; +} + +DBusHandlerResult ExportedObject::HandleMessage( + DBusConnection* connection, + DBusMessage* raw_message) { + bus_->AssertOnDBusThread(); + DCHECK_EQ(DBUS_MESSAGE_TYPE_METHOD_CALL, dbus_message_get_type(raw_message)); + + // raw_message will be unrefed on exit of the function. Increment the + // reference so we can use it in MethodCall. + dbus_message_ref(raw_message); + scoped_ptr<MethodCall> method_call( + MethodCall::FromRawMessage(raw_message)); + const std::string interface = method_call->GetInterface(); + const std::string member = method_call->GetMember(); + + if (interface.empty()) { + // We don't support method calls without interface. + return DBUS_HANDLER_RESULT_NOT_YET_HANDLED; + } + + // Check if we know about the method. + const std::string absolute_method_name = GetAbsoluteMethodName( + interface, member); + MethodTable::const_iterator iter = method_table_.find(absolute_method_name); + if (iter == method_table_.end()) { + // Don't know about the method. + return DBUS_HANDLER_RESULT_NOT_YET_HANDLED; + } + + Response* response = NULL; + if (bus_->HasDBusThread()) { + response_from_method_ = NULL; + method_is_called_ = false; + // Post a task to run the method in the origin thread. + bus_->PostTaskToOriginThread(FROM_HERE, + base::Bind(&ExportedObject::RunMethod, + this, + iter->second, + method_call.get())); + // Wait until the method call is done. Blocking is not desirable but we + // should return the response to the dbus-daemon in the function, so we + // don't have a choice. We wait in the D-Bus thread, so it should be ok. + { + // We need a timeout here in case the method gets stuck. + const int kTimeoutSecs = 10; + const base::TimeDelta timeout( + base::TimeDelta::FromSeconds(kTimeoutSecs)); + const base::Time start_time = base::Time::Now(); + + base::AutoLock auto_lock(method_is_called_lock_); + while (!method_is_called_) { + on_method_is_called_.TimedWait(timeout); + CHECK(base::Time::Now() - start_time < timeout) + << "Method " << absolute_method_name << " timed out"; + } + } + response = response_from_method_; + } else { + // If the D-Bus thread is not used, just call the method directly. We + // don't need the complicated logic to wait for the method call to be + // complete. + response = iter->second.Run(method_call.get()); + } + + if (!response) { + // Something bad happend in the method call. + scoped_ptr<dbus::ErrorResponse> error_response( + ErrorResponse::FromMethodCall(method_call.get(), + DBUS_ERROR_FAILED, + "error occurred in " + member)); + dbus_connection_send(connection, error_response->raw_message(), NULL); + return DBUS_HANDLER_RESULT_HANDLED; + } + + // The method call was successful. + dbus_connection_send(connection, response->raw_message(), NULL); + delete response; + + return DBUS_HANDLER_RESULT_HANDLED; +} + +void ExportedObject::RunMethod(MethodCallCallback method_call_callback, + MethodCall* method_call) { + bus_->AssertOnOriginThread(); + + base::AutoLock auto_lock(method_is_called_lock_); + response_from_method_ = method_call_callback.Run(method_call); + method_is_called_ = true; + on_method_is_called_.Signal(); +} + +void ExportedObject::OnUnregistered(DBusConnection* connection) { +} + +DBusHandlerResult ExportedObject::HandleMessageThunk( + DBusConnection* connection, + DBusMessage* raw_message, + void* user_data) { + ExportedObject* self = reinterpret_cast<ExportedObject*>(user_data); + return self->HandleMessage(connection, raw_message); +} + +void ExportedObject::OnUnregisteredThunk(DBusConnection *connection, + void* user_data) { + ExportedObject* self = reinterpret_cast<ExportedObject*>(user_data); + return self->OnUnregistered(connection); +} + +} // namespace dbus diff --git a/dbus/exported_object.h b/dbus/exported_object.h new file mode 100644 index 0000000..8e03118 --- /dev/null +++ b/dbus/exported_object.h @@ -0,0 +1,144 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef DBUS_EXPORTED_OBJECT_H_ +#define DBUS_EXPORTED_OBJECT_H_ +#pragma once + +#include <string> +#include <map> +#include <utility> + +#include <dbus/dbus.h> + +#include "base/callback.h" +#include "base/memory/ref_counted.h" +#include "base/synchronization/condition_variable.h" +#include "base/threading/platform_thread.h" + +class MessageLoop; + +namespace dbus { + +class Bus; +class MethodCall; +class Response; + +// ExportedObject is used to export objects and methods to other D-Bus +// clients. +// +// ExportedObject is a ref counted object, to ensure that |this| of the +// object is alive when callbacks referencing |this| are called. +class ExportedObject : public base::RefCountedThreadSafe<ExportedObject> { + public: + // Client code should use Bus::GetExportedObject() instead of this + // constructor. + ExportedObject(Bus* bus, + const std::string& service_name, + const std::string& object_path); + + // Called when an exported method is called. MethodCall* is the request + // message. + typedef base::Callback<Response* (MethodCall*)> MethodCallCallback; + + // Called when method exporting is done. + // Parameters: + // - the interface name. + // - the method name. + // - whether exporting was successful or not. + typedef base::Callback<void (const std::string&, const std::string&, bool)> + OnExportedCallback; + + // Exports the method specified by |interface_name| and |method_name|, + // and blocks until exporting is done. Returns true on success. + // + // |method_call_callback| will be called in the origin thread, when the + // exported method is called. As it's called in the origin thread, + // callback| can safely reference objects in the origin thread (i.e. UI + // thread in most cases). + // + // BLOCKING CALL. + virtual bool ExportMethodAndBlock(const std::string& interface_name, + const std::string& method_name, + MethodCallCallback method_call_callback); + + // Requests to export the method specified by |interface_name| and + // |method_name|. See Also ExportMethodAndBlock(). + // + // |on_exported_callback| is called when the method is exported or + // failed to be exported, in the origin thread. + // + // Must be called in the origin thread. + virtual void ExportMethod(const std::string& interface_name, + const std::string& method_name, + MethodCallCallback method_call_callback, + OnExportedCallback on_exported_callback); + + // Unregisters the object from the bus. The Bus object will take care of + // unregistering so you don't have to do this manually. + // + // BLOCKING CALL. + virtual void Unregister(); + + private: + friend class base::RefCountedThreadSafe<ExportedObject>; + virtual ~ExportedObject(); + + // Helper function for ExportMethod(). + void ExportMethodInternal(const std::string& interface_name, + const std::string& method_name, + MethodCallCallback method_call_callback, + OnExportedCallback exported_callback); + + // Called when the object is exported. + void OnExported(OnExportedCallback on_exported_callback, + const std::string& interface_name, + const std::string& method_name, + bool success); + + // Registers this object to the bus. + // Returns true on success, or the object is already registered. + // + // BLOCKING CALL. + bool Register(); + + // Handles the incoming request messages and dispatches to the exported + // methods. + DBusHandlerResult HandleMessage(DBusConnection* connection, + DBusMessage* raw_message); + + // Runs the method. Helper function for HandleMessage(). + void RunMethod(MethodCallCallback method_call_callback, + MethodCall* method_call); + + // Called when the object is unregistered. + void OnUnregistered(DBusConnection* connection); + + // Redirects the function call to HandleMessage(). + static DBusHandlerResult HandleMessageThunk(DBusConnection* connection, + DBusMessage* raw_message, + void* user_data); + + // Redirects the function call to OnUnregistered(). + static void OnUnregisteredThunk(DBusConnection* connection, + void* user_data); + + Bus* bus_; + std::string service_name_; + std::string object_path_; + bool object_is_registered_; + bool method_is_called_; + dbus::Response* response_from_method_; + base::Lock method_is_called_lock_; + base::ConditionVariable on_method_is_called_; + + // The method table where keys are absolute method names (i.e. interface + // name + method name), and values are the corresponding callbacks. + typedef std::map<std::string, MethodCallCallback> MethodTable; + MethodTable method_table_; +}; + +} // namespace dbus + +#endif // DBUS_EXPORTED_OBJECT_H_ diff --git a/dbus/object_proxy.cc b/dbus/object_proxy.cc new file mode 100644 index 0000000..24dbde2 --- /dev/null +++ b/dbus/object_proxy.cc @@ -0,0 +1,197 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "dbus/bus.h" + +#include "base/bind.h" +#include "base/logging.h" +#include "base/message_loop.h" +#include "base/threading/thread.h" +#include "base/threading/thread_restrictions.h" +#include "dbus/message.h" +#include "dbus/object_proxy.h" +#include "dbus/scoped_dbus_error.h" + +namespace dbus { + +ObjectProxy::ObjectProxy(Bus* bus, + const std::string& service_name, + const std::string& object_path) + : bus_(bus), + service_name_(service_name), + object_path_(object_path) { +} + +ObjectProxy::~ObjectProxy() { +} + +// Originally we tried to make |method_call| a const reference, but we +// gave up as dbus_connection_send_with_reply_and_block() takes a +// non-const pointer of DBusMessage as the second parameter. +bool ObjectProxy::CallMethodAndBlock(MethodCall* method_call, + int timeout_ms, + Response* response) { + bus_->AssertOnDBusThread(); + + if (!bus_->Connect()) + return false; + + method_call->SetDestination(service_name_); + method_call->SetPath(object_path_); + DBusMessage* request_message = method_call->raw_message(); + + ScopedDBusError error; + + // Send the message synchronously. + DBusMessage* response_message = + bus_->SendWithReplyAndBlock(request_message, timeout_ms, error.get()); + + if (!response_message) { + LOG(ERROR) << "Failed to call method: " + << (error.is_set() ? error.message() : ""); + return false; + } + response->reset_raw_message(response_message); + + return true; +} + +void ObjectProxy::CallMethod(MethodCall* method_call, + int timeout_ms, + ResponseCallback callback) { + bus_->AssertOnOriginThread(); + + method_call->SetDestination(service_name_); + method_call->SetPath(object_path_); + // Increment the reference count so we can safely reference the + // underlying request message until the method call is complete. This + // will be unref'ed in StartAsyncMethodCall(). + DBusMessage* request_message = method_call->raw_message(); + dbus_message_ref(request_message); + + // Bind() won't compile if we pass request_message as-is since + // DBusMessage is an opaque struct which Bind() cannot handle. + // Hence we cast it to void* to workaround the issue. + base::Closure task = base::Bind(&ObjectProxy::StartAsyncMethodCall, + this, + timeout_ms, + static_cast<void*>(request_message), + callback); + // Wait for the response in the D-Bus thread. + bus_->PostTaskToDBusThread(FROM_HERE, task); +} + +ObjectProxy::OnPendingCallIsCompleteData::OnPendingCallIsCompleteData( + ObjectProxy* in_object_proxy, + ResponseCallback in_response_callback) + : object_proxy(in_object_proxy), + response_callback(in_response_callback) { +} + +ObjectProxy::OnPendingCallIsCompleteData::~OnPendingCallIsCompleteData() { +} + +void ObjectProxy::StartAsyncMethodCall(int timeout_ms, + void* in_request_message, + ResponseCallback response_callback) { + bus_->AssertOnDBusThread(); + + if (!bus_->Connect() || !bus_->SetUpAsyncOperations()) { + // In case of a failure, run the callback with NULL response, that + // indicates a failure. + Response* response = NULL; + base::Closure task = base::Bind(&ObjectProxy::RunResponseCallback, + this, + response_callback, + response); + bus_->PostTaskToOriginThread(FROM_HERE, task); + return; + } + + DBusMessage* request_message = + static_cast<DBusMessage*>(in_request_message); + DBusPendingCall* pending_call = NULL; + + bus_->SendWithReply(request_message, &pending_call, timeout_ms); + + // Prepare the data we'll be passing to OnPendingCallIsCompleteThunk(). + // The data will be deleted in OnPendingCallIsCompleteThunk(). + OnPendingCallIsCompleteData* data = + new OnPendingCallIsCompleteData(this, response_callback); + + // This returns false only when unable to allocate memory. + const bool success = dbus_pending_call_set_notify( + pending_call, + &ObjectProxy::OnPendingCallIsCompleteThunk, + data, + NULL); + CHECK(success) << "Unable to allocate memory"; + dbus_pending_call_unref(pending_call); + + // It's now safe to unref the request message. + dbus_message_unref(request_message); +} + +void ObjectProxy::OnPendingCallIsComplete(DBusPendingCall* pending_call, + ResponseCallback response_callback) { + bus_->AssertOnDBusThread(); + + DBusMessage* response_message = dbus_pending_call_steal_reply(pending_call); + + if (!response_message) { + // This shouldn't happen but just in case. + LOG(ERROR) << "The response message is not received for some reason"; + Response* response = NULL; + base::Closure task = base::Bind(&ObjectProxy::RunResponseCallback, + this, + response_callback, + response); + bus_->PostTaskToOriginThread(FROM_HERE, task); + return; + } + + // The response message will be deleted in RunResponseCallback(). + Response* response = new Response; + response->reset_raw_message(response_message); + base::Closure task = base::Bind(&ObjectProxy::RunResponseCallback, + this, + response_callback, + response); + bus_->PostTaskToOriginThread(FROM_HERE, task); +} + +void ObjectProxy::RunResponseCallback(ResponseCallback response_callback, + Response* response) { + bus_->AssertOnOriginThread(); + + if (!response) { + // The response is not received. + response_callback.Run(NULL); + } else if (response->GetMessageType() == Message::MESSAGE_ERROR) { + // Error message may contain the error message as string. + dbus::MessageReader reader(response); + std::string error_message; + reader.PopString(&error_message); + LOG(ERROR) << "Failed to call method: " << response->GetErrorName() + << ": " << error_message; + // We don't give the error message to the callback. + response_callback.Run(NULL); + } else { + // The response is successfuly received. + response_callback.Run(response); + } + delete response; // It's ok to delete NULL. +} + +void ObjectProxy::OnPendingCallIsCompleteThunk(DBusPendingCall* pending_call, + void* user_data) { + OnPendingCallIsCompleteData* data = + reinterpret_cast<OnPendingCallIsCompleteData*>(user_data); + ObjectProxy* self = data->object_proxy; + self->OnPendingCallIsComplete(pending_call, + data->response_callback); + delete data; +} + +} // namespace dbus diff --git a/dbus/object_proxy.h b/dbus/object_proxy.h new file mode 100644 index 0000000..024ba2c --- /dev/null +++ b/dbus/object_proxy.h @@ -0,0 +1,116 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef DBUS_OBJECT_PROXY_H_ +#define DBUS_OBJECT_PROXY_H_ +#pragma once + +#include <string> +#include <vector> +#include <dbus/dbus.h> + +#include "base/callback.h" +#include "base/memory/ref_counted.h" + +class MessageLoop; + +namespace dbus { + +class Bus; +class MethodCall; +class Response; + +// ObjectProxy is used to communicate with remote objects, mainly for +// calling methods of these objects. +// +// ObjectProxy is a ref counted object, to ensure that |this| of the +// object is is alive when callbacks referencing |this| are called. +class ObjectProxy : public base::RefCountedThreadSafe<ObjectProxy> { + public: + // Client code should use Bus::GetObjectProxy() instead of this + // constructor. + ObjectProxy(Bus* bus, + const std::string& service_name, + const std::string& object_path); + + // Special timeout constants. + // + // The constants correspond to DBUS_TIMEOUT_USE_DEFAULT and + // DBUS_TIMEOUT_INFINITE. Here we use literal numbers instead of these + // macros as these aren't defined with D-Bus earlier than 1.4.12. + enum { + TIMEOUT_USE_DEFAULT = -1, + TIMEOUT_INFINITE = 0x7fffffff, + }; + + // Called when the response is returned. Used for CallMethod(). + typedef base::Callback<void(Response*)> ResponseCallback; + + // Calls the method of the remote object and blocks until the response + // is returned. + // + // BLOCKING CALL. + virtual bool CallMethodAndBlock(MethodCall* method_call, + int timeout_ms, + Response* response); + + // Requests to call the method of the remote object. + // + // |callback| will be called in the origin thread, once the method call + // is complete. As it's called in the origin thread, |callback| can + // safely reference objects in the origin thread (i.e. UI thread in most + // cases). + // + // If the method call is successful, a pointer to Response object will + // be passed to the callback. If unsuccessful, NULL will be passed to + // the callback. + // + // Must be called in the origin thread. + virtual void CallMethod(MethodCall* method_call, + int timeout_ms, + ResponseCallback callback); + + private: + friend class base::RefCountedThreadSafe<ObjectProxy>; + virtual ~ObjectProxy(); + + // Struct of data we'll be passing from StartAsyncMethodCall() to + // OnPendingCallIsCompleteThunk(). + struct OnPendingCallIsCompleteData { + OnPendingCallIsCompleteData(ObjectProxy* in_object_proxy, + ResponseCallback in_response_callback); + ~OnPendingCallIsCompleteData(); + + ObjectProxy* object_proxy; + ResponseCallback response_callback; + }; + + // Starts the async method call. This is a helper function to implement + // CallMethod(). + void StartAsyncMethodCall(int timeout_ms, + void* request_message, + ResponseCallback response_callback); + + // Called when the pending call is complete. + void OnPendingCallIsComplete(DBusPendingCall* pending_call, + ResponseCallback response_callback); + + // Runs the response callback with the given response object. + void RunResponseCallback(ResponseCallback response_callback, + Response* response); + + // Redirects the function call to OnPendingCallIsComplete(). + static void OnPendingCallIsCompleteThunk(DBusPendingCall* pending_call, + void* user_data); + + Bus* bus_; + std::string service_name_; + std::string object_path_; + + DISALLOW_COPY_AND_ASSIGN(ObjectProxy); +}; + +} // namespace dbus + +#endif // DBUS_OBJECT_PROXY_H_ diff --git a/dbus/scoped_dbus_error.h b/dbus/scoped_dbus_error.h new file mode 100644 index 0000000..e76e4f7 --- /dev/null +++ b/dbus/scoped_dbus_error.h @@ -0,0 +1,34 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef DBUS_SCOPED_DBUS_ERROR_H_ +#define DBUS_SCOPED_DBUS_ERROR_H_ +#pragma once + +#include <dbus/dbus.h> + +namespace dbus { + +// Utility class to ensure that DBusError is freed. +class ScopedDBusError { + public: + ScopedDBusError() { + dbus_error_init(&error_); + } + + ~ScopedDBusError() { + dbus_error_free(&error_); + } + + DBusError* get() { return &error_; } + bool is_set() { return dbus_error_is_set(&error_); } + const char* message() { return error_.message; } + + private: + DBusError error_; +}; + +} // namespace dbus + +#endif // DBUS_SCOPED_DBUS_ERROR_H_ diff --git a/dbus/test_service.cc b/dbus/test_service.cc new file mode 100644 index 0000000..c25f803 --- /dev/null +++ b/dbus/test_service.cc @@ -0,0 +1,101 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "dbus/test_service.h" + +#include "base/bind.h" +#include "base/threading/platform_thread.h" +#include "dbus/bus.h" +#include "dbus/exported_object.h" +#include "dbus/message.h" + +namespace dbus { + +const int TestService::kSlowEchoSleepMs = 100; // In milliseconds. + +TestService::TestService() + : base::Thread("TestService"), + service_started_(false), + on_service_started_(&service_started_lock_) { +} + +TestService::~TestService() { +} + +void TestService::StartService() { + base::Thread::Options thread_options; + thread_options.message_loop_type = MessageLoop::TYPE_IO; + StartWithOptions(thread_options); +} + +void TestService::WaitUntilServiceIsStarted() { + message_loop()->PostTask( + FROM_HERE, + base::Bind(&TestService::OnServiceStarted, + base::Unretained(this))); + base::AutoLock auto_lock(service_started_lock_); + while (!service_started_) + on_service_started_.Wait(); +} + +void TestService::OnServiceStarted() { + base::AutoLock auto_lock(service_started_lock_); + service_started_ = true; + on_service_started_.Signal(); +} + +void TestService::Run(MessageLoop* message_loop) { + Bus::Options bus_options; + bus_options.bus_type = Bus::SESSION; + bus_options.connection_type = Bus::PRIVATE; + bus_ = new Bus(bus_options); + + exported_object_ = bus_->GetExportedObject( + "org.chromium.TestService", + "/org/chromium/TestObject"); + CHECK(exported_object_->ExportMethodAndBlock( + "org.chromium.TestInterface", + "Echo", + base::Bind(&TestService::Echo, + base::Unretained(this)))); + CHECK(exported_object_->ExportMethodAndBlock( + "org.chromium.TestInterface", + "SlowEcho", + base::Bind(&TestService::SlowEcho, + base::Unretained(this)))); + CHECK(exported_object_->ExportMethodAndBlock( + "org.chromium.TestInterface", + "BrokenMethod", + base::Bind(&TestService::BrokenMethod, + base::Unretained(this)))); + + message_loop->Run(); +} + +void TestService::CleanUp() { + bus_->ShutdownAndBlock(); +} + +Response* TestService::Echo(MethodCall* method_call) { + MessageReader reader(method_call); + std::string text_message; + if (!reader.PopString(&text_message)) + return NULL; + + Response* response = Response::FromMethodCall(method_call); + MessageWriter writer(response); + writer.AppendString(text_message); + return response; +} + +Response* TestService::SlowEcho(MethodCall* method_call) { + base::PlatformThread::Sleep(kSlowEchoSleepMs); + return Echo(method_call); +} + +Response* TestService::BrokenMethod(MethodCall* method_call) { + return NULL; +} + +} // namespace dbus diff --git a/dbus/test_service.h b/dbus/test_service.h new file mode 100644 index 0000000..362ecb7 --- /dev/null +++ b/dbus/test_service.h @@ -0,0 +1,73 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef DBUS_TEST_SERVICE_H_ +#define DBUS_TEST_SERVICE_H_ +#pragma once + +#include "base/memory/ref_counted.h" +#include "base/threading/thread.h" +#include "base/synchronization/condition_variable.h" +#include "base/synchronization/lock.h" + +namespace dbus { + +class Bus; +class ExportedObject; +class MethodCall; +class Response; + +// The test service is used for end-to-end tests. The service runs in a +// separate thread, so it does not interfere the test code that runs in +// the main thread. Methods such as Echo() and SlowEcho() are exported. +class TestService : public base::Thread { + public: + // SlowEcho() sleeps for this period of time before returns. + static const int kSlowEchoSleepMs; + + TestService(); + virtual ~TestService(); + + // Starts the service in a separate thread. + void StartService(); + + // Waits until the service is started (i.e. methods are exported). + void WaitUntilServiceIsStarted(); + + private: + // Called when the service is started (i.e. the task is run from the + // message loop). + void OnServiceStarted(); + + // base::Thread override. + virtual void Run(MessageLoop* message_loop); + + // base::Thread override. + virtual void CleanUp(); + + // + // Exported methods. + // + + // Echos the text message received from the method call. + Response* Echo(MethodCall* method_call); + + // Echos the text message received from the method call, but sleeps for + // kSlowEchoSleepMs before returning the response. + Response* SlowEcho(MethodCall* method_call); + + // Returns NULL, instead of a valid Response. + Response* BrokenMethod(MethodCall* method_call); + + bool service_started_; + base::Lock service_started_lock_; + base::ConditionVariable on_service_started_; + + scoped_refptr<Bus> bus_; + ExportedObject* exported_object_; +}; + +} // namespace dbus + +#endif // DBUS_TEST_SERVICE_H_ |