diff options
author | satorux@chromium.org <satorux@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2011-08-17 20:58:12 +0000 |
---|---|---|
committer | satorux@chromium.org <satorux@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2011-08-17 20:58:12 +0000 |
commit | a510761104333ec7da3943399f059ff693839856 (patch) | |
tree | 2a9f81a60bccf4db3367ad1b8f71e9ea130efb7d /dbus/bus.cc | |
parent | 07f93af15169ee054552a376ac4953abd1346cb2 (diff) | |
download | chromium_src-a510761104333ec7da3943399f059ff693839856.zip chromium_src-a510761104333ec7da3943399f059ff693839856.tar.gz chromium_src-a510761104333ec7da3943399f059ff693839856.tar.bz2 |
Implement Bus and ObjectProxy classes for our D-Bus library.
ObjectProxy is used to access remote objects.
ExportedObject is used to export objects to other D-Bus
BUG=90036
TEST=run unit tests. The code is not yet used in Chrome.
Review URL: http://codereview.chromium.org/7491029
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@97204 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'dbus/bus.cc')
-rw-r--r-- | dbus/bus.cc | 613 |
1 files changed, 613 insertions, 0 deletions
diff --git a/dbus/bus.cc b/dbus/bus.cc new file mode 100644 index 0000000..2e8fb47 --- /dev/null +++ b/dbus/bus.cc @@ -0,0 +1,613 @@ +// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. +// +// TODO(satorux): +// - Handle "disconnected" signal. +// - Add support for signal sending +// - Add support for signal monitoring +// - Collect metrics (ex. # of method calls, method call time, etc.) + +#include "dbus/bus.h" + +#include "base/bind.h" +#include "base/logging.h" +#include "base/message_loop.h" +#include "base/stl_util.h" +#include "base/threading/thread.h" +#include "base/threading/thread_restrictions.h" +#include "dbus/exported_object.h" +#include "dbus/object_proxy.h" +#include "dbus/scoped_dbus_error.h" + +namespace dbus { + +namespace { + +// The class is used for watching the file descriptor used for D-Bus +// communication. +class Watch : public base::MessagePumpLibevent::Watcher { + public: + Watch(DBusWatch* watch) + : raw_watch_(watch) { + dbus_watch_set_data(raw_watch_, this, NULL); + } + + ~Watch() { + dbus_watch_set_data(raw_watch_, NULL, NULL); + } + + // Returns true if the underlying file descriptor is ready to be watched. + bool IsReadyToBeWatched() { + return dbus_watch_get_enabled(raw_watch_); + } + + // Starts watching the underlying file descriptor. + void StartWatching() { + const int file_descriptor = dbus_watch_get_unix_fd(raw_watch_); + const int flags = dbus_watch_get_flags(raw_watch_); + + MessageLoopForIO::Mode mode = MessageLoopForIO::WATCH_READ; + if ((flags & DBUS_WATCH_READABLE) && (flags & DBUS_WATCH_WRITABLE)) + mode = MessageLoopForIO::WATCH_READ_WRITE; + else if (flags & DBUS_WATCH_READABLE) + mode = MessageLoopForIO::WATCH_READ; + else if (flags & DBUS_WATCH_WRITABLE) + mode = MessageLoopForIO::WATCH_WRITE; + else + NOTREACHED(); + + const bool persistent = true; // Watch persistently. + const bool success = MessageLoopForIO::current()->WatchFileDescriptor( + file_descriptor, + persistent, + mode, + &file_descriptor_watcher_, + this); + CHECK(success) << "Unable to allocate memory"; + } + + // Stops watching the underlying file descriptor. + void StopWatching() { + file_descriptor_watcher_.StopWatchingFileDescriptor(); + } + + private: + // Implement MessagePumpLibevent::Watcher. + virtual void OnFileCanReadWithoutBlocking(int file_descriptor) { + const bool success = dbus_watch_handle(raw_watch_, DBUS_WATCH_READABLE); + CHECK(success) << "Unable to allocate memory"; + } + + // Implement MessagePumpLibevent::Watcher. + virtual void OnFileCanWriteWithoutBlocking(int file_descriptor) { + const bool success = dbus_watch_handle(raw_watch_, DBUS_WATCH_WRITABLE); + CHECK(success) << "Unable to allocate memory"; + } + + DBusWatch* raw_watch_; + base::MessagePumpLibevent::FileDescriptorWatcher file_descriptor_watcher_; +}; + +// The class is used for monitoring the timeout used for D-Bus method +// calls. +// +// Unlike Watch, Timeout is a ref counted object, to ensure that |this| of +// the object is is alive when HandleTimeout() is called. It's unlikely +// but it may be possible that HandleTimeout() is called after +// Bus::OnRemoveTimeout(). That's why we don't simply delete the object in +// Bus::OnRemoveTimeout(). +class Timeout : public base::RefCountedThreadSafe<Timeout> { + public: + Timeout(DBusTimeout* timeout) + : raw_timeout_(timeout), + monitoring_is_active_(false), + is_completed(false) { + dbus_timeout_set_data(raw_timeout_, this, NULL); + AddRef(); // Balanced on Complete(). + } + + // Returns true if the timeout is ready to be monitored. + bool IsReadyToBeMonitored() { + return dbus_timeout_get_enabled(raw_timeout_); + } + + // Starts monitoring the timeout. + void StartMonitoring(dbus::Bus* bus) { + bus->PostDelayedTaskToDBusThread(FROM_HERE, + base::Bind(&Timeout::HandleTimeout, + this), + GetIntervalInMs()); + monitoring_is_active_ = true; + } + + // Stops monitoring the timeout. + void StopMonitoring() { + // We cannot take back the delayed task we posted in + // StartMonitoring(), so we just mark the monitoring is inactive now. + monitoring_is_active_ = false; + } + + // Returns the interval in milliseconds. + int GetIntervalInMs() { + return dbus_timeout_get_interval(raw_timeout_); + } + + // Cleans up the raw_timeout and marks that timeout is completed. + // See the class comment above for why we are doing this. + void Complete() { + dbus_timeout_set_data(raw_timeout_, NULL, NULL); + is_completed = true; + Release(); + } + + private: + friend class base::RefCountedThreadSafe<Timeout>; + ~Timeout() { + } + + // Handles the timeout. + void HandleTimeout() { + // If the timeout is marked completed, we should do nothing. This can + // occur if this function is called after Bus::OnRemoveTimeout(). + if (is_completed) + return; + // Skip if monitoring is cancled. + if (!monitoring_is_active_) + return; + + const bool success = dbus_timeout_handle(raw_timeout_); + CHECK(success) << "Unable to allocate memory"; + } + + DBusTimeout* raw_timeout_; + bool monitoring_is_active_; + bool is_completed; +}; + +} // namespace + +Bus::Options::Options() + : bus_type(SESSION), + connection_type(PRIVATE), + dbus_thread(NULL) { +} + +Bus::Options::~Options() { +} + +Bus::Bus(const Options& options) + : bus_type_(options.bus_type), + connection_type_(options.connection_type), + dbus_thread_(options.dbus_thread), + connection_(NULL), + origin_loop_(MessageLoop::current()), + origin_thread_id_(base::PlatformThread::CurrentId()), + dbus_thread_id_(base::kInvalidThreadId), + async_operations_are_set_up_(false), + num_pending_watches_(0), + num_pending_timeouts_(0) { + if (dbus_thread_) { + dbus_thread_id_ = dbus_thread_->thread_id(); + DCHECK(dbus_thread_->IsRunning()) + << "The D-Bus thread should be running"; + DCHECK_EQ(MessageLoop::TYPE_IO, + dbus_thread_->message_loop()->type()) + << "The D-Bus thread should have an MessageLoopForIO attached"; + } + + // This is safe to call multiple times. + dbus_threads_init_default(); +} + +Bus::~Bus() { + DCHECK(!connection_); + DCHECK(owned_service_names_.empty()); + DCHECK_EQ(0, num_pending_watches_); + DCHECK_EQ(0, num_pending_timeouts_); +} + +ObjectProxy* Bus::GetObjectProxy(const std::string& service_name, + const std::string& object_path) { + AssertOnOriginThread(); + + scoped_refptr<ObjectProxy> object_proxy = + new ObjectProxy(this, service_name, object_path); + object_proxies_.push_back(object_proxy); + + return object_proxy; +} + +ExportedObject* Bus::GetExportedObject(const std::string& service_name, + const std::string& object_path) { + AssertOnOriginThread(); + + scoped_refptr<ExportedObject> exported_object = + new ExportedObject(this, service_name, object_path); + exported_objects_.push_back(exported_object); + + return exported_object; +} + +bool Bus::Connect() { + // dbus_bus_get_private() and dbus_bus_get() are blocking calls. + AssertOnDBusThread(); + + // Check if it's already initialized. + if (connection_) + return true; + + ScopedDBusError error; + const DBusBusType dbus_bus_type = static_cast<DBusBusType>(bus_type_); + if (connection_type_ == PRIVATE) { + connection_ = dbus_bus_get_private(dbus_bus_type, error.get()); + } else { + connection_ = dbus_bus_get(dbus_bus_type, error.get()); + } + if (!connection_) { + LOG(ERROR) << "Failed to connect to the bus: " + << (dbus_error_is_set(error.get()) ? error.message() : ""); + return false; + } + // We shouldn't exit on the disconnected signal. + dbus_connection_set_exit_on_disconnect(connection_, false); + + return true; +} + +void Bus::ShutdownAndBlock() { + AssertOnDBusThread(); + + // Unregister the exported objects. + for (size_t i = 0; i < exported_objects_.size(); ++i) { + exported_objects_[i]->Unregister(); + } + + // Release all service names. + for (std::set<std::string>::iterator iter = owned_service_names_.begin(); + iter != owned_service_names_.end();) { + // This is a bit tricky but we should increment the iter here as + // ReleaseOwnership() may remove |service_name| from the set. + const std::string& service_name = *iter++; + ReleaseOwnership(service_name); + } + if (!owned_service_names_.empty()) { + LOG(ERROR) << "Failed to release all service names. # of services left: " + << owned_service_names_.size(); + } + + // Private connection should be closed. + if (connection_ && connection_type_ == PRIVATE) { + dbus_connection_close(connection_); + } + // dbus_connection_close() won't unref. + dbus_connection_unref(connection_); + + connection_ = NULL; +} + +void Bus::Shutdown(OnShutdownCallback callback) { + AssertOnOriginThread(); + + PostTaskToDBusThread(FROM_HERE, base::Bind(&Bus::ShutdownInternal, + this, + callback)); +} + +bool Bus::RequestOwnership(const std::string& service_name) { + DCHECK(connection_); + // dbus_bus_request_name() is a blocking call. + AssertOnDBusThread(); + + // Check if we already own the service name. + if (owned_service_names_.find(service_name) != owned_service_names_.end()) { + return true; + } + + ScopedDBusError error; + const int result = dbus_bus_request_name(connection_, + service_name.c_str(), + DBUS_NAME_FLAG_DO_NOT_QUEUE, + error.get()); + if (result != DBUS_REQUEST_NAME_REPLY_PRIMARY_OWNER) { + LOG(ERROR) << "Failed to get the onwership of " << service_name << ": " + << (dbus_error_is_set(error.get()) ? error.message() : ""); + return false; + } + owned_service_names_.insert(service_name); + return true; +} + +bool Bus::ReleaseOwnership(const std::string& service_name) { + DCHECK(connection_); + // dbus_bus_request_name() is a blocking call. + AssertOnDBusThread(); + + // Check if we already own the service name. + std::set<std::string>::iterator found = + owned_service_names_.find(service_name); + if (found == owned_service_names_.end()) { + LOG(ERROR) << service_name << " is not owned by the bus"; + return false; + } + + ScopedDBusError error; + const int result = dbus_bus_release_name(connection_, service_name.c_str(), + error.get()); + if (result == DBUS_RELEASE_NAME_REPLY_RELEASED) { + owned_service_names_.erase(found); + return true; + } else { + LOG(ERROR) << "Failed to release the onwership of " << service_name << ": " + << (error.is_set() ? error.message() : ""); + return false; + } +} + +bool Bus::SetUpAsyncOperations() { + DCHECK(connection_); + AssertOnDBusThread(); + + if (async_operations_are_set_up_) + return true; + + // Process all the incoming data if any, so that OnDispatchStatus() will + // be called when the incoming data is ready. + ProcessAllIncomingDataIfAny(); + + bool success = dbus_connection_set_watch_functions(connection_, + &Bus::OnAddWatchThunk, + &Bus::OnRemoveWatchThunk, + &Bus::OnToggleWatchThunk, + this, + NULL); + CHECK(success) << "Unable to allocate memory"; + + // TODO(satorux): Timeout is not yet implemented. + success = dbus_connection_set_timeout_functions(connection_, + &Bus::OnAddTimeoutThunk, + &Bus::OnRemoveTimeoutThunk, + &Bus::OnToggleTimeoutThunk, + this, + NULL); + CHECK(success) << "Unable to allocate memory"; + + dbus_connection_set_dispatch_status_function( + connection_, + &Bus::OnDispatchStatusChangedThunk, + this, + NULL); + + async_operations_are_set_up_ = true; + + return true; +} + +DBusMessage* Bus::SendWithReplyAndBlock(DBusMessage* request, + int timeout_ms, + DBusError* error) { + DCHECK(connection_); + AssertOnDBusThread(); + + return dbus_connection_send_with_reply_and_block( + connection_, request, timeout_ms, error); +} + +void Bus::SendWithReply(DBusMessage* request, + DBusPendingCall** pending_call, + int timeout_ms) { + DCHECK(connection_); + AssertOnDBusThread(); + + const bool success = dbus_connection_send_with_reply( + connection_, request, pending_call, timeout_ms); + CHECK(success) << "Unable to allocate memory"; +} + +bool Bus::TryRegisterObjectPath(const std::string& object_path, + const DBusObjectPathVTable* vtable, + void* user_data, + DBusError* error) { + DCHECK(connection_); + AssertOnDBusThread(); + + return dbus_connection_try_register_object_path( + connection_, + object_path.c_str(), + vtable, + user_data, + error); +} + +void Bus::UnregisterObjectPath(const std::string& object_path) { + DCHECK(connection_); + AssertOnDBusThread(); + + const bool success = dbus_connection_unregister_object_path( + connection_, + object_path.c_str()); + CHECK(success) << "Unable to allocate memory"; +} + +void Bus::ShutdownInternal(OnShutdownCallback callback) { + AssertOnDBusThread(); + + ShutdownAndBlock(); + PostTaskToOriginThread(FROM_HERE, callback); +} + +void Bus::ProcessAllIncomingDataIfAny() { + AssertOnDBusThread(); + + // As mentioned at the class comment in .h file, connection_ can be NULL. + if (!connection_ || !dbus_connection_get_is_connected(connection_)) + return; + + if (dbus_connection_get_dispatch_status(connection_) == + DBUS_DISPATCH_DATA_REMAINS) { + while (dbus_connection_dispatch(connection_) == + DBUS_DISPATCH_DATA_REMAINS); + } +} + +void Bus::PostTaskToOriginThread(const tracked_objects::Location& from_here, + const base::Closure& task) { + origin_loop_->PostTask(from_here, task); +} + +void Bus::PostTaskToDBusThread(const tracked_objects::Location& from_here, + const base::Closure& task) { + if (dbus_thread_) + dbus_thread_->message_loop()->PostTask(from_here, task); + else + origin_loop_->PostTask(from_here, task); +} + +void Bus::PostDelayedTaskToDBusThread( + const tracked_objects::Location& from_here, + const base::Closure& task, + int delay_ms) { + if (dbus_thread_) + dbus_thread_->message_loop()->PostDelayedTask(from_here, task, delay_ms); + else + origin_loop_->PostDelayedTask(from_here, task, delay_ms); +} + +bool Bus::HasDBusThread() { + return dbus_thread_ != NULL; +} + +void Bus::AssertOnOriginThread() { + DCHECK_EQ(origin_thread_id_, base::PlatformThread::CurrentId()); +} + +void Bus::AssertOnDBusThread() { + base::ThreadRestrictions::AssertIOAllowed(); + + if (dbus_thread_) { + DCHECK_EQ(dbus_thread_id_, base::PlatformThread::CurrentId()); + } else { + AssertOnOriginThread(); + } +} + +dbus_bool_t Bus::OnAddWatch(DBusWatch* raw_watch) { + AssertOnDBusThread(); + + // watch will be deleted when raw_watch is removed in OnRemoveWatch(). + Watch* watch = new Watch(raw_watch); + if (watch->IsReadyToBeWatched()) { + watch->StartWatching(); + } + ++num_pending_watches_; + return true; +} + +void Bus::OnRemoveWatch(DBusWatch* raw_watch) { + AssertOnDBusThread(); + + Watch* watch = static_cast<Watch*>(dbus_watch_get_data(raw_watch)); + delete watch; + --num_pending_watches_; +} + +void Bus::OnToggleWatch(DBusWatch* raw_watch) { + AssertOnDBusThread(); + + Watch* watch = static_cast<Watch*>(dbus_watch_get_data(raw_watch)); + if (watch->IsReadyToBeWatched()) { + watch->StartWatching(); + } else { + // It's safe to call this if StartWatching() wasn't called, per + // message_pump_libevent.h. + watch->StopWatching(); + } +} + +dbus_bool_t Bus::OnAddTimeout(DBusTimeout* raw_timeout) { + AssertOnDBusThread(); + + // timeout will be deleted when raw_timeout is removed in + // OnRemoveTimeoutThunk(). + Timeout* timeout = new Timeout(raw_timeout); + if (timeout->IsReadyToBeMonitored()) { + timeout->StartMonitoring(this); + } + ++num_pending_timeouts_; + return true; +} + +void Bus::OnRemoveTimeout(DBusTimeout* raw_timeout) { + AssertOnDBusThread(); + + Timeout* timeout = static_cast<Timeout*>(dbus_timeout_get_data(raw_timeout)); + timeout->Complete(); + --num_pending_timeouts_; +} + +void Bus::OnToggleTimeout(DBusTimeout* raw_timeout) { + AssertOnDBusThread(); + + Timeout* timeout = static_cast<Timeout*>(dbus_timeout_get_data(raw_timeout)); + if (timeout->IsReadyToBeMonitored()) { + timeout->StartMonitoring(this); + } else { + timeout->StopMonitoring(); + } +} + +void Bus::OnDispatchStatusChanged(DBusConnection* connection, + DBusDispatchStatus status) { + DCHECK_EQ(connection, connection_); + AssertOnDBusThread(); + + if (!dbus_connection_get_is_connected(connection)) + return; + + // We cannot call ProcessAllIncomingDataIfAny() here, as calling + // dbus_connection_dispatch() inside DBusDispatchStatusFunction is + // prohibited by the D-Bus library. Hence, we post a task here instead. + // See comments for dbus_connection_set_dispatch_status_function(). + PostTaskToDBusThread(FROM_HERE, + base::Bind(&Bus::ProcessAllIncomingDataIfAny, + this)); +} + +dbus_bool_t Bus::OnAddWatchThunk(DBusWatch* raw_watch, void* data) { + Bus* self = static_cast<Bus*>(data); + return self->OnAddWatch(raw_watch); +} + +void Bus::OnRemoveWatchThunk(DBusWatch* raw_watch, void* data) { + Bus* self = static_cast<Bus*>(data); + return self->OnRemoveWatch(raw_watch); +} + +void Bus::OnToggleWatchThunk(DBusWatch* raw_watch, void* data) { + Bus* self = static_cast<Bus*>(data); + return self->OnToggleWatch(raw_watch); +} + +dbus_bool_t Bus::OnAddTimeoutThunk(DBusTimeout* raw_timeout, void* data) { + Bus* self = static_cast<Bus*>(data); + return self->OnAddTimeout(raw_timeout); +} + +void Bus::OnRemoveTimeoutThunk(DBusTimeout* raw_timeout, void* data) { + Bus* self = static_cast<Bus*>(data); + return self->OnRemoveTimeout(raw_timeout); +} + +void Bus::OnToggleTimeoutThunk(DBusTimeout* raw_timeout, void* data) { + Bus* self = static_cast<Bus*>(data); + return self->OnToggleTimeout(raw_timeout); +} + +void Bus::OnDispatchStatusChangedThunk(DBusConnection* connection, + DBusDispatchStatus status, + void* data) { + Bus* self = static_cast<Bus*>(data); + return self->OnDispatchStatusChanged(connection, status); +} + +} // namespace dbus |