diff options
author | satorux@chromium.org <satorux@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2011-08-23 07:29:21 +0000 |
---|---|---|
committer | satorux@chromium.org <satorux@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2011-08-23 07:29:21 +0000 |
commit | 3beaaa4e9b8c41be94e1688a075357b5ae58a280 (patch) | |
tree | 1c7b6d7abc4a6d54346334c132ceb5f71869661c /dbus | |
parent | 852754acf9190bd253e8c2e9b45d2b761cf7a12a (diff) | |
download | chromium_src-3beaaa4e9b8c41be94e1688a075357b5ae58a280.zip chromium_src-3beaaa4e9b8c41be94e1688a075357b5ae58a280.tar.gz chromium_src-3beaaa4e9b8c41be94e1688a075357b5ae58a280.tar.bz2 |
Add support for sending and receiving D-Bus signals.
ObjectProxy is used to receive signals from the remote object.
ExportedObject is used to send signals from the exported object.
Note that signals are asynchronos so we don't have a test in
end_to_end_sync_unittest.cc
BUG=90036
TEST=run unit tests
Review URL: http://codereview.chromium.org/7655033
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@97831 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'dbus')
-rw-r--r-- | dbus/bus.cc | 91 | ||||
-rw-r--r-- | dbus/bus.h | 75 | ||||
-rw-r--r-- | dbus/end_to_end_async_unittest.cc | 45 | ||||
-rw-r--r-- | dbus/exported_object.cc | 42 | ||||
-rw-r--r-- | dbus/exported_object.h | 12 | ||||
-rw-r--r-- | dbus/message.cc | 25 | ||||
-rw-r--r-- | dbus/message.h | 23 | ||||
-rw-r--r-- | dbus/message_unittest.cc | 29 | ||||
-rw-r--r-- | dbus/object_proxy.cc | 177 | ||||
-rw-r--r-- | dbus/object_proxy.h | 70 | ||||
-rw-r--r-- | dbus/test_service.cc | 15 | ||||
-rw-r--r-- | dbus/test_service.h | 13 |
12 files changed, 599 insertions, 18 deletions
diff --git a/dbus/bus.cc b/dbus/bus.cc index 2e8fb47..c0bae42 100644 --- a/dbus/bus.cc +++ b/dbus/bus.cc @@ -4,8 +4,6 @@ // // TODO(satorux): // - Handle "disconnected" signal. -// - Add support for signal sending -// - Add support for signal monitoring // - Collect metrics (ex. # of method calls, method call time, etc.) #include "dbus/bus.h" @@ -203,6 +201,9 @@ Bus::Bus(const Options& options) Bus::~Bus() { DCHECK(!connection_); DCHECK(owned_service_names_.empty()); + DCHECK(match_rules_added_.empty()); + DCHECK(filter_functions_added_.empty()); + DCHECK(registered_object_paths_.empty()); DCHECK_EQ(0, num_pending_watches_); DCHECK_EQ(0, num_pending_timeouts_); } @@ -276,6 +277,11 @@ void Bus::ShutdownAndBlock() { << owned_service_names_.size(); } + // Detach from the remote objects. + for (size_t i = 0; i < object_proxies_.size(); ++i) { + object_proxies_[i]->Detach(); + } + // Private connection should be closed. if (connection_ && connection_type_ == PRIVATE) { dbus_connection_close(connection_); @@ -404,6 +410,73 @@ void Bus::SendWithReply(DBusMessage* request, CHECK(success) << "Unable to allocate memory"; } +void Bus::Send(DBusMessage* request, uint32* serial) { + DCHECK(connection_); + AssertOnDBusThread(); + + const bool success = dbus_connection_send(connection_, request, serial); + CHECK(success) << "Unable to allocate memory"; +} + +void Bus::AddFilterFunction(DBusHandleMessageFunction filter_function, + void* user_data) { + DCHECK(connection_); + AssertOnDBusThread(); + + if (filter_functions_added_.find(filter_function) != + filter_functions_added_.end()) { + LOG(ERROR) << "Filter function already exists: " << filter_function; + return; + } + + const bool success = dbus_connection_add_filter( + connection_, filter_function, user_data, NULL); + CHECK(success) << "Unable to allocate memory"; + filter_functions_added_.insert(filter_function); +} + +void Bus::RemoveFilterFunction(DBusHandleMessageFunction filter_function, + void* user_data) { + DCHECK(connection_); + AssertOnDBusThread(); + + if (filter_functions_added_.find(filter_function) == + filter_functions_added_.end()) { + LOG(ERROR) << "Requested to remove an unknown filter function: " + << filter_function; + return; + } + + dbus_connection_remove_filter(connection_, filter_function, user_data); + filter_functions_added_.erase(filter_function); +} + +void Bus::AddMatch(const std::string& match_rule, DBusError* error) { + DCHECK(connection_); + AssertOnDBusThread(); + + if (match_rules_added_.find(match_rule) != match_rules_added_.end()) { + LOG(ERROR) << "Match rule already exists: " << match_rule; + return; + } + + dbus_bus_add_match(connection_, match_rule.c_str(), error); + match_rules_added_.insert(match_rule); +} + +void Bus::RemoveMatch(const std::string& match_rule, DBusError* error) { + DCHECK(connection_); + AssertOnDBusThread(); + + if (match_rules_added_.find(match_rule) == match_rules_added_.end()) { + LOG(ERROR) << "Requested to remove an unknown match rule: " << match_rule; + return; + } + + dbus_bus_remove_match(connection_, match_rule.c_str(), error); + match_rules_added_.erase(match_rule); +} + bool Bus::TryRegisterObjectPath(const std::string& object_path, const DBusObjectPathVTable* vtable, void* user_data, @@ -411,22 +484,34 @@ bool Bus::TryRegisterObjectPath(const std::string& object_path, DCHECK(connection_); AssertOnDBusThread(); - return dbus_connection_try_register_object_path( + DCHECK(registered_object_paths_.find(object_path) == + registered_object_paths_.end()) + << "Object path already registered: " << object_path; + + const bool success = dbus_connection_try_register_object_path( connection_, object_path.c_str(), vtable, user_data, error); + if (success) + registered_object_paths_.insert(object_path); + return success; } void Bus::UnregisterObjectPath(const std::string& object_path) { DCHECK(connection_); AssertOnDBusThread(); + DCHECK(registered_object_paths_.find(object_path) != + registered_object_paths_.end()) + << "Requested to unregister an unknown object path: " << object_path; + const bool success = dbus_connection_unregister_object_path( connection_, object_path.c_str()); CHECK(success) << "Unable to allocate memory"; + registered_object_paths_.erase(object_path); } void Bus::ShutdownInternal(OnShutdownCallback callback) { @@ -167,7 +167,8 @@ class Bus : public base::RefCountedThreadSafe<Bus> { // The caller must not delete the returned object. The bus will own the // object. Never returns NULL. // - // The object proxy is used to call remote methods. + // The object proxy is used to call methods of remote objects, and + // receive signals from them. // // |service_name| looks like "org.freedesktop.NetworkManager", and // |object_path| looks like "/org/freedesktop/NetworkManager/Devices/0". @@ -180,7 +181,8 @@ class Bus : public base::RefCountedThreadSafe<Bus> { // path. The caller must not delete the returned object. The bus will // own the object. Never returns NULL. // - // The exported object is used to export objects to other D-Bus clients. + // The exported object is used to export methods of local objects, and + // send signal from them. // // Must be called in the origin thread. virtual ExportedObject* GetExportedObject(const std::string& service_name, @@ -240,14 +242,73 @@ class Bus : public base::RefCountedThreadSafe<Bus> { int timeout_ms, DBusError* error); - // Requests to send a message to the bus. + // Requests to send a message to the bus. The reply is handled with + // |pending_call| at a later time. // // BLOCKING CALL. virtual void SendWithReply(DBusMessage* request, DBusPendingCall** pending_call, int timeout_ms); - // Tries to register the object path. + // Requests to send a message to the bus. The message serial number will + // be stored in |serial|. + // + // BLOCKING CALL. + virtual void Send(DBusMessage* request, uint32* serial); + + // Adds the message filter function. |filter_function| will be called + // when incoming messages are received. + // + // When a new incoming message arrives, filter functions are called in + // the order that they were added until the the incoming message is + // handled by a filter function. + // + // The same filter function must not be added more than once. + // + // BLOCKING CALL. + virtual void AddFilterFunction(DBusHandleMessageFunction filter_function, + void* user_data); + + // Removes the message filter previously added by AddFilterFunction(). + // + // BLOCKING CALL. + virtual void RemoveFilterFunction(DBusHandleMessageFunction filter_function, + void* user_data); + + // Adds the match rule. Messages that match the rule will be processed + // by the filter functions added by AddFilterFunction(). + // + // You cannot specify which filter function to use for a match rule. + // Instead, you should check if an incoming message is what you are + // interested in, in the filter functions. + // + // The same match rule must not be added more than once. + // + // The match rule looks like: + // "type='signal', interface='org.chromium.SomeInterface'". + // + // See "Message Bus Message Routing" section in the D-Bus specification + // for details about match rules: + // http://dbus.freedesktop.org/doc/dbus-specification.html#message-bus-routing + // + // BLOCKING CALL. + virtual void AddMatch(const std::string& match_rule, DBusError* error); + + // Removes the match rule previously added by AddMatch(). + // + // BLOCKING CALL. + virtual void RemoveMatch(const std::string& match_rule, DBusError* error); + + // Tries to register the object path. Returns true on success. + // Returns false if the object path is already registered. + // + // |message_function| in |vtable| will be called every time when a new + // |message sent to the object path arrives. + // + // The same object path must not be added more than once. + // + // See also documentation of |dbus_connection_try_register_object_path| at + // http://dbus.freedesktop.org/doc/api/html/group__DBusConnection.html // // BLOCKING CALL. virtual bool TryRegisterObjectPath(const std::string& object_path, @@ -349,6 +410,12 @@ class Bus : public base::RefCountedThreadSafe<Bus> { base::PlatformThreadId dbus_thread_id_; std::set<std::string> owned_service_names_; + // The following sets are used to check if rules/object_paths/filters + // are properly cleaned up before destruction of the bus object. + std::set<std::string> match_rules_added_; + std::set<std::string> registered_object_paths_; + std::set<DBusHandleMessageFunction> filter_functions_added_; + std::vector<scoped_refptr<dbus::ObjectProxy> > object_proxies_; std::vector<scoped_refptr<dbus::ExportedObject> > exported_objects_; diff --git a/dbus/end_to_end_async_unittest.cc b/dbus/end_to_end_async_unittest.cc index 3ce6b6d..cf22a41 100644 --- a/dbus/end_to_end_async_unittest.cc +++ b/dbus/end_to_end_async_unittest.cc @@ -52,6 +52,17 @@ class EndToEndAsyncTest : public testing::Test { object_proxy_ = bus_->GetObjectProxy("org.chromium.TestService", "/org/chromium/TestObject"); ASSERT_TRUE(bus_->HasDBusThread()); + + // Connect to the "Test" signal from the remote object. + object_proxy_->ConnectToSignal( + "org.chromium.TestInterface", + "Test", + base::Bind(&EndToEndAsyncTest::OnTestSignal, + base::Unretained(this)), + base::Bind(&EndToEndAsyncTest::OnConnected, + base::Unretained(this))); + // Wait until the object proxy is connected to the signal. + message_loop_.Run(); } void TearDown() { @@ -111,12 +122,36 @@ class EndToEndAsyncTest : public testing::Test { message_loop_.Quit(); } + // Called when the "Test" signal is received, in the main thread. + // Copy the string payload to |test_signal_string_|. + void OnTestSignal(dbus::Signal* signal) { + dbus::MessageReader reader(signal); + ASSERT_TRUE(reader.PopString(&test_signal_string_)); + message_loop_.Quit(); + } + + // Called when connected to the signal. + void OnConnected(const std::string& interface_name, + const std::string& signal_name, + bool success) { + ASSERT_TRUE(success); + message_loop_.Quit(); + } + + // Wait for the hey signal to be received. + void WaitForTestSignal() { + // OnTestSignal() will quit the message loop. + message_loop_.Run(); + } + MessageLoop message_loop_; std::vector<std::string> response_strings_; scoped_ptr<base::Thread> dbus_thread_; scoped_refptr<dbus::Bus> bus_; dbus::ObjectProxy* object_proxy_; scoped_ptr<dbus::TestService> test_service_; + // Text message from "Test" signal. + std::string test_signal_string_; }; TEST_F(EndToEndAsyncTest, Echo) { @@ -198,3 +233,13 @@ TEST_F(EndToEndAsyncTest, BrokenMethod) { // Should fail because the method is broken. ASSERT_EQ("", response_strings_[0]); } + +TEST_F(EndToEndAsyncTest, TestSignal) { + const char kMessage[] = "hello, world"; + // Send the test signal from the exported object. + test_service_->SendTestSignal(kMessage); + // Receive the signal with the object proxy. The signal is handeled in + // EndToEndAsyncTest::OnTestSignal() in the main thread. + WaitForTestSignal(); + ASSERT_EQ(kMessage, test_signal_string_); +} diff --git a/dbus/exported_object.cc b/dbus/exported_object.cc index 2ac1ff0..df536f5 100644 --- a/dbus/exported_object.cc +++ b/dbus/exported_object.cc @@ -51,6 +51,14 @@ bool ExportedObject::ExportMethodAndBlock( MethodCallCallback method_call_callback) { bus_->AssertOnDBusThread(); + // Check if the method is already exported. + const std::string absolute_method_name = + GetAbsoluteMethodName(interface_name, method_name); + if (method_table_.find(absolute_method_name) != method_table_.end()) { + LOG(ERROR) << absolute_method_name << " is already exported"; + return false; + } + if (!bus_->Connect()) return false; if (!bus_->SetUpAsyncOperations()) @@ -60,12 +68,7 @@ bool ExportedObject::ExportMethodAndBlock( if (!Register()) return false; - const std::string absolute_method_name = - GetAbsoluteMethodName(interface_name, method_name); - if (method_table_.find(absolute_method_name) != method_table_.end()) { - LOG(ERROR) << absolute_method_name << " is already exported"; - return false; - } + // Add the method callback to the method table. method_table_[absolute_method_name] = method_call_callback; return true; @@ -86,6 +89,25 @@ void ExportedObject::ExportMethod(const std::string& interface_name, bus_->PostTaskToDBusThread(FROM_HERE, task); } +void ExportedObject::SendSignal(Signal* signal) { + // For signals, the object path should be set to the path to the sender + // object, which is this exported object here. + signal->SetPath(object_path_); + + // Increment the reference count so we can safely reference the + // underlying signal message until the signal sending is complete. This + // will be unref'ed in SendSignalInternal(). + DBusMessage* signal_message = signal->raw_message(); + dbus_message_ref(signal_message); + + // Bind() won't compile if we pass signal_message. See the comment at + // ObjectProxy::CallMethod() for details. + bus_->PostTaskToDBusThread(FROM_HERE, + base::Bind(&ExportedObject::SendSignalInternal, + this, + static_cast<void*>(signal_message))); +} + void ExportedObject::Unregister() { bus_->AssertOnDBusThread(); @@ -124,6 +146,14 @@ void ExportedObject::OnExported(OnExportedCallback on_exported_callback, on_exported_callback.Run(interface_name, method_name, success); } +void ExportedObject::SendSignalInternal(void* in_signal_message) { + DBusMessage* signal_message = + static_cast<DBusMessage*>(in_signal_message); + uint32 serial = 0; + bus_->Send(signal_message, &serial); + dbus_message_unref(signal_message); +} + bool ExportedObject::Register() { bus_->AssertOnDBusThread(); diff --git a/dbus/exported_object.h b/dbus/exported_object.h index 1d35c11..1351824 100644 --- a/dbus/exported_object.h +++ b/dbus/exported_object.h @@ -24,6 +24,7 @@ namespace dbus { class Bus; class MethodCall; class Response; +class Signal; // ExportedObject is used to export objects and methods to other D-Bus // clients. @@ -55,8 +56,8 @@ class ExportedObject : public base::RefCountedThreadSafe<ExportedObject> { // // |method_call_callback| will be called in the origin thread, when the // exported method is called. As it's called in the origin thread, - // callback| can safely reference objects in the origin thread (i.e. UI - // thread in most cases). + // |method_callback| can safely reference objects in the origin thread + // (i.e. UI thread in most cases). // // BLOCKING CALL. virtual bool ExportMethodAndBlock(const std::string& interface_name, @@ -75,6 +76,10 @@ class ExportedObject : public base::RefCountedThreadSafe<ExportedObject> { MethodCallCallback method_call_callback, OnExportedCallback on_exported_callback); + // Requests to send the signal from this object. The signal will be sent + // asynchronously from the message loop in the D-Bus thread. + virtual void SendSignal(Signal* signal); + // Unregisters the object from the bus. The Bus object will take care of // unregistering so you don't have to do this manually. // @@ -97,6 +102,9 @@ class ExportedObject : public base::RefCountedThreadSafe<ExportedObject> { const std::string& method_name, bool success); + // Helper function for SendSignal(). + void SendSignalInternal(void* signal_message); + // Registers this object to the bus. // Returns true on success, or the object is already registered. // diff --git a/dbus/message.cc b/dbus/message.cc index 9216515..8de6583 100644 --- a/dbus/message.cc +++ b/dbus/message.cc @@ -332,6 +332,31 @@ MethodCall* MethodCall::FromRawMessage(DBusMessage* raw_message) { } // +// Signal implementation. +// +Signal::Signal(const std::string& interface_name, + const std::string& method_name) + : Message() { + reset_raw_message(dbus_message_new(DBUS_MESSAGE_TYPE_SIGNAL)); + + SetInterface(interface_name); + SetMember(method_name); +} + +Signal* Signal::FromRawMessage(DBusMessage* raw_message) { + DCHECK_EQ(DBUS_MESSAGE_TYPE_SIGNAL, dbus_message_get_type(raw_message)); + + const char* interface = dbus_message_get_interface(raw_message); + const char* member = dbus_message_get_member(raw_message); + std::string interface_string = interface ? interface : ""; + std::string member_string = member ? member : ""; + + Signal* signal = new Signal(interface_string, member_string); + signal->reset_raw_message(raw_message); + return signal; +} + +// // Response implementation. // diff --git a/dbus/message.h b/dbus/message.h index 4f8dc07..58162a3 100644 --- a/dbus/message.h +++ b/dbus/message.h @@ -136,6 +136,29 @@ class MethodCall : public Message { DISALLOW_COPY_AND_ASSIGN(MethodCall); }; +// Signal is a type of message used to send a signal. +class Signal : public Message { + public: + // Creates a signal message for the specified interface name and the + // method name. + // + // For instance, to send "PropertiesChanged" signal of + // DBUS_INTERFACE_INTROSPECTABLE interface + // ("org.freedesktop.DBus.Introspectable"), create a signal like this: + // + // Signal signal(DBUS_INTERFACE_INTROSPECTABLE, "PropertiesChanged"); + // + // The constructor creates the internal raw_message_, so the client + // doesn't need to set this with reset_raw_message(). + Signal(const std::string& interface_name, + const std::string& method_name); + + // Returns a newly created SIGNAL from the given raw message of the type + // DBUS_MESSAGE_TYPE_SIGNAL. The caller must delete the returned + // object. Takes the ownership of |raw_message|. + static Signal* FromRawMessage(DBusMessage* raw_message); +}; + // Response is a type of message used for receiving a response from a // method via D-Bus. class Response : public Message { diff --git a/dbus/message_unittest.cc b/dbus/message_unittest.cc index 518a7e0..6e45e55 100644 --- a/dbus/message_unittest.cc +++ b/dbus/message_unittest.cc @@ -392,6 +392,35 @@ TEST(MessageTest, MethodCall_FromRawMessage) { EXPECT_EQ("SomeMethod", method_call->GetMember()); } +TEST(MessageTest, Signal) { + dbus::Signal signal("com.example.Interface", "SomeSignal"); + EXPECT_TRUE(signal.raw_message() != NULL); + EXPECT_EQ(dbus::Message::MESSAGE_SIGNAL, signal.GetMessageType()); + signal.SetPath("/com/example/Object"); + + dbus::MessageWriter writer(&signal); + writer.AppendString("payload"); + + EXPECT_EQ("path: /com/example/Object\n" + "interface: com.example.Interface\n" + "member: SomeSignal\n" + "signature: s\n" + "\n" + "string \"payload\"\n", + signal.ToString()); +} + +TEST(MessageTest, Signal_FromRawMessage) { + DBusMessage* raw_message = dbus_message_new(DBUS_MESSAGE_TYPE_SIGNAL); + dbus_message_set_interface(raw_message, "com.example.Interface"); + dbus_message_set_member(raw_message, "SomeSignal"); + + scoped_ptr<dbus::Signal> signal( + dbus::Signal::FromRawMessage(raw_message)); + EXPECT_EQ("com.example.Interface", signal->GetInterface()); + EXPECT_EQ("SomeSignal", signal->GetMember()); +} + TEST(MessageTest, Response) { dbus::Response response; EXPECT_TRUE(response.raw_message() == NULL); diff --git a/dbus/object_proxy.cc b/dbus/object_proxy.cc index 24dbde2..0ab3ce5 100644 --- a/dbus/object_proxy.cc +++ b/dbus/object_proxy.cc @@ -7,12 +7,26 @@ #include "base/bind.h" #include "base/logging.h" #include "base/message_loop.h" +#include "base/stringprintf.h" #include "base/threading/thread.h" #include "base/threading/thread_restrictions.h" #include "dbus/message.h" #include "dbus/object_proxy.h" #include "dbus/scoped_dbus_error.h" +namespace { + +// Gets the absolute signal name by concatenating the interface name and +// the signal name. Used for building keys for method_table_ in +// ObjectProxy. +std::string GetAbsoluteSignalName( + const std::string& interface_name, + const std::string& signal_name) { + return interface_name + "." + signal_name; +} + +} // namespace + namespace dbus { ObjectProxy::ObjectProxy(Bus* bus, @@ -20,7 +34,8 @@ ObjectProxy::ObjectProxy(Bus* bus, const std::string& object_path) : bus_(bus), service_name_(service_name), - object_path_(object_path) { + object_path_(object_path), + filter_added_(false) { } ObjectProxy::~ObjectProxy() { @@ -82,6 +97,37 @@ void ObjectProxy::CallMethod(MethodCall* method_call, bus_->PostTaskToDBusThread(FROM_HERE, task); } +void ObjectProxy::ConnectToSignal(const std::string& interface_name, + const std::string& signal_name, + SignalCallback signal_callback, + OnConnectedCallback on_connected_callback) { + bus_->AssertOnOriginThread(); + + bus_->PostTaskToDBusThread(FROM_HERE, + base::Bind(&ObjectProxy::ConnectToSignalInternal, + this, + interface_name, + signal_name, + signal_callback, + on_connected_callback)); +} + +void ObjectProxy::Detach() { + bus_->AssertOnDBusThread(); + + if (filter_added_) + bus_->RemoveFilterFunction(&ObjectProxy::HandleMessageThunk, this); + + for (size_t i = 0; i < match_rules_.size(); ++i) { + ScopedDBusError error; + bus_->RemoveMatch(match_rules_[i], error.get()); + if (error.is_set()) { + // There is nothing we can do to recover, so just print the error. + LOG(ERROR) << "Failed to remove match rule: " << match_rules_[i]; + } + } +} + ObjectProxy::OnPendingCallIsCompleteData::OnPendingCallIsCompleteData( ObjectProxy* in_object_proxy, ResponseCallback in_response_callback) @@ -194,4 +240,133 @@ void ObjectProxy::OnPendingCallIsCompleteThunk(DBusPendingCall* pending_call, delete data; } +void ObjectProxy::ConnectToSignalInternal( + const std::string& interface_name, + const std::string& signal_name, + SignalCallback signal_callback, + OnConnectedCallback on_connected_callback) { + bus_->AssertOnDBusThread(); + + // Check if the object is already connected to the signal. + const std::string absolute_signal_name = + GetAbsoluteSignalName(interface_name, signal_name); + if (method_table_.find(absolute_signal_name) != method_table_.end()) { + LOG(ERROR) << "The object proxy is already connected to " + << absolute_signal_name; + return; + } + + // Will become true, if everything is successful. + bool success = false; + + if (bus_->Connect() && bus_->SetUpAsyncOperations()) { + // We should add the filter only once. Otherwise, HandleMessage() will + // be called more than once. + if (!filter_added_) { + bus_->AddFilterFunction(&ObjectProxy::HandleMessageThunk, this); + filter_added_ = true; + } + // Add a match rule so the signal goes through HandleMessage(). + const std::string match_rule = + base::StringPrintf("type='signal', interface='%s', path='%s'", + interface_name.c_str(), + object_path_.c_str()); + ScopedDBusError error; + bus_->AddMatch(match_rule, error.get());; + if (error.is_set()) { + LOG(ERROR) << "Failed to add match rule: " << match_rule; + } else { + // Store the match rule, so that we can remove this in Detach(). + match_rules_.push_back(match_rule); + // Add the signal callback to the method table. + method_table_[absolute_signal_name] = signal_callback; + success = true; + } + } + + // Run on_connected_callback in the origin thread. + bus_->PostTaskToOriginThread( + FROM_HERE, + base::Bind(&ObjectProxy::OnConnected, + this, + on_connected_callback, + interface_name, + signal_name, + success)); +} + +void ObjectProxy::OnConnected(OnConnectedCallback on_connected_callback, + const std::string& interface_name, + const std::string& signal_name, + bool success) { + bus_->AssertOnOriginThread(); + + on_connected_callback.Run(interface_name, signal_name, success); +} + +DBusHandlerResult ObjectProxy::HandleMessage( + DBusConnection* connection, + DBusMessage* raw_message) { + bus_->AssertOnDBusThread(); + if (dbus_message_get_type(raw_message) != DBUS_MESSAGE_TYPE_SIGNAL) + return DBUS_HANDLER_RESULT_NOT_YET_HANDLED; + + // raw_message will be unrefed on exit of the function. Increment the + // reference so we can use it in Signal. + dbus_message_ref(raw_message); + scoped_ptr<Signal> signal( + Signal::FromRawMessage(raw_message)); + + // The signal is not coming from the remote object we are attaching to. + if (signal->GetPath() != object_path_) + return DBUS_HANDLER_RESULT_NOT_YET_HANDLED; + + const std::string interface = signal->GetInterface(); + const std::string member = signal->GetMember(); + + // Check if we know about the method. + const std::string absolute_signal_name = GetAbsoluteSignalName( + interface, member); + MethodTable::const_iterator iter = method_table_.find(absolute_signal_name); + if (iter == method_table_.end()) { + // Don't know about the method. + return DBUS_HANDLER_RESULT_NOT_YET_HANDLED; + } + + if (bus_->HasDBusThread()) { + // Post a task to run the method in the origin thread. + // Transfer the ownership of |signal| to RunMethod(). + // |released_signal| will be deleted in RunMethod(). + Signal* released_signal = signal.release(); + bus_->PostTaskToOriginThread(FROM_HERE, + base::Bind(&ObjectProxy::RunMethod, + this, + iter->second, + released_signal)); + } else { + // If the D-Bus thread is not used, just call the method directly. We + // don't need the complicated logic to wait for the method call to be + // complete. + iter->second.Run(signal.get()); + } + + return DBUS_HANDLER_RESULT_HANDLED; +} + +void ObjectProxy::RunMethod(SignalCallback signal_callback, + Signal* signal) { + bus_->AssertOnOriginThread(); + + signal_callback.Run(signal); + delete signal; +} + +DBusHandlerResult ObjectProxy::HandleMessageThunk( + DBusConnection* connection, + DBusMessage* raw_message, + void* user_data) { + ObjectProxy* self = reinterpret_cast<ObjectProxy*>(user_data); + return self->HandleMessage(connection, raw_message); +} + } // namespace dbus diff --git a/dbus/object_proxy.h b/dbus/object_proxy.h index 024ba2c..70e412df 100644 --- a/dbus/object_proxy.h +++ b/dbus/object_proxy.h @@ -20,6 +20,7 @@ namespace dbus { class Bus; class MethodCall; class Response; +class Signal; // ObjectProxy is used to communicate with remote objects, mainly for // calling methods of these objects. @@ -47,6 +48,17 @@ class ObjectProxy : public base::RefCountedThreadSafe<ObjectProxy> { // Called when the response is returned. Used for CallMethod(). typedef base::Callback<void(Response*)> ResponseCallback; + // Called when a signal is received. Signal* is the incoming signal. + typedef base::Callback<void (Signal*)> SignalCallback; + + // Called when the object proxy is connected to the signal. + // Parameters: + // - the interface name. + // - the signal name. + // - whether it was successful or not. + typedef base::Callback<void (const std::string&, const std::string&, bool)> + OnConnectedCallback; + // Calls the method of the remote object and blocks until the response // is returned. // @@ -71,6 +83,28 @@ class ObjectProxy : public base::RefCountedThreadSafe<ObjectProxy> { int timeout_ms, ResponseCallback callback); + // Requests to connect to the signal from the remote object. + // + // |signal_callback| will be called in the origin thread, when the + // signal is received from the remote object. As it's called in the + // origin thread, |signal_callback| can safely reference objects in the + // origin thread (i.e. UI thread in most cases). + // + // |on_connected_callback| is called when the object proxy is connected + // to the signal, or failed to be connected, in the origin thread. + // + // Must be called in the origin thread. + virtual void ConnectToSignal(const std::string& interface_name, + const std::string& signal_name, + SignalCallback signal_callback, + OnConnectedCallback on_connected_callback); + + // Detaches from the remote object. The Bus object will take care of + // detaching so you don't have to do this manually. + // + // BLOCKING CALL. + virtual void Detach(); + private: friend class base::RefCountedThreadSafe<ObjectProxy>; virtual ~ObjectProxy(); @@ -104,10 +138,46 @@ class ObjectProxy : public base::RefCountedThreadSafe<ObjectProxy> { static void OnPendingCallIsCompleteThunk(DBusPendingCall* pending_call, void* user_data); + // Helper function for ConnectToSignal(). + void ConnectToSignalInternal( + const std::string& interface_name, + const std::string& signal_name, + SignalCallback signal_callback, + OnConnectedCallback on_connected_callback); + + // Called when the object proxy is connected to the signal, or failed. + void OnConnected(OnConnectedCallback on_connected_callback, + const std::string& interface_name, + const std::string& signal_name, + bool success); + + // Handles the incoming request messages and dispatches to the signal + // callbacks. + DBusHandlerResult HandleMessage(DBusConnection* connection, + DBusMessage* raw_message); + + // Runs the method. Helper function for HandleMessage(). + void RunMethod(SignalCallback signal_callback, Signal* signal); + + // Redirects the function call to HandleMessage(). + static DBusHandlerResult HandleMessageThunk(DBusConnection* connection, + DBusMessage* raw_message, + void* user_data); + Bus* bus_; std::string service_name_; std::string object_path_; + // True if the message filter was added. + bool filter_added_; + + // The method table where keys are absolute signal names (i.e. interface + // name + signal name), and values are the corresponding callbacks. + typedef std::map<std::string, SignalCallback> MethodTable; + MethodTable method_table_; + + std::vector<std::string> match_rules_; + DISALLOW_COPY_AND_ASSIGN(ObjectProxy); }; diff --git a/dbus/test_service.cc b/dbus/test_service.cc index a3bd001..acab607 100644 --- a/dbus/test_service.cc +++ b/dbus/test_service.cc @@ -66,6 +66,21 @@ bool TestService::HasDBusThread() { return bus_->HasDBusThread(); } +void TestService::SendTestSignal(const std::string& message) { + message_loop()->PostTask( + FROM_HERE, + base::Bind(&TestService::SendTestSignalInternal, + base::Unretained(this), + message)); +} + +void TestService::SendTestSignalInternal(const std::string& message) { + dbus::Signal signal("org.chromium.TestInterface", "Test"); + dbus::MessageWriter writer(&signal); + writer.AppendString(message); + exported_object_->SendSignal(&signal); +} + void TestService::ShutdownInternal() { bus_->Shutdown(base::Bind(&TestService::OnShutdown, base::Unretained(this))); diff --git a/dbus/test_service.h b/dbus/test_service.h index 7d1abf7..0ed8462 100644 --- a/dbus/test_service.h +++ b/dbus/test_service.h @@ -20,7 +20,10 @@ class Response; // The test service is used for end-to-end tests. The service runs in a // separate thread, so it does not interfere the test code that runs in -// the main thread. Methods such as Echo() and SlowEcho() are exported. +// the main thread. +// +// The test service exports an object with methods such as Echo() and +// SlowEcho(). The object has ability to send "Test" signal. class TestService : public base::Thread { public: // Options for the test service. @@ -56,8 +59,14 @@ class TestService : public base::Thread { // Returns true if the bus has the D-Bus thread. bool HasDBusThread(); + // Sends "Test" signal with the given message from the exported object. + void SendTestSignal(const std::string& message); + private: - // Helper function used in Shutdown(). + // Helper function for SendTestSignal(). + void SendTestSignalInternal(const std::string& message); + + // Helper function for Shutdown(). void ShutdownInternal(); // Called when a method is exported. |