// Copyright 2014 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include "mojo/spy/spy.h" #include "base/bind.h" #include "base/compiler_specific.h" #include "base/location.h" #include "base/memory/ref_counted.h" #include "base/strings/string_number_conversions.h" #include "base/strings/string_split.h" #include "base/threading/thread.h" #include "base/threading/worker_pool.h" #include "mojo/public/cpp/system/core.h" #include "mojo/service_manager/service_manager.h" #include "mojo/spy/websocket_server.h" namespace { const size_t kMessageBufSize = 2 * 1024; const size_t kHandleBufSize = 64; const int kDefaultWebSocketPort = 42424; void CloseHandles(MojoHandle* handles, size_t count) { for (size_t ix = 0; ix != count; ++count) MojoClose(handles[ix]); } // In charge of processing messages that flow over a // single message pipe. class MessageProcessor : public base::RefCountedThreadSafe { public: MessageProcessor() : last_result_(MOJO_RESULT_OK), bytes_transfered_(0) { message_count_[0] = 0; message_count_[1] = 0; handle_count_[0] = 0; handle_count_[1] = 0; } void Start(mojo::ScopedMessagePipeHandle client, mojo::ScopedMessagePipeHandle interceptor) { std::vector pipes; pipes.push_back(client.get()); pipes.push_back(interceptor.get()); std::vector wait_flags; wait_flags.push_back(MOJO_WAIT_FLAG_READABLE); wait_flags.push_back(MOJO_WAIT_FLAG_READABLE); scoped_ptr mbuf(new char[kMessageBufSize]); scoped_ptr hbuf(new MojoHandle[kHandleBufSize]); // Main processing loop: // 1- Wait for an endpoint to have a message. // 2- Read the message // 3- Log data // 4- Wait until the opposite port is ready for writting // 4- Write the message to opposite port. for (;;) { int r = WaitMany(pipes, wait_flags, MOJO_DEADLINE_INDEFINITE); if ((r < 0) || (r > 1)) { last_result_ = r; break; } uint32_t bytes_read = kMessageBufSize; uint32_t handles_read = kHandleBufSize; if (!CheckResult(ReadMessageRaw(pipes[r], mbuf.get(), &bytes_read, hbuf.get(), &handles_read, MOJO_READ_MESSAGE_FLAG_NONE))) break; if (!bytes_read && !handles_read) continue; if (handles_read) handle_count_[r] += handles_read; ++message_count_[r]; bytes_transfered_ += bytes_read; mojo::MessagePipeHandle write_handle = (r == 0) ? pipes[1] : pipes[0]; if (!CheckResult(Wait(write_handle, MOJO_WAIT_FLAG_WRITABLE, MOJO_DEADLINE_INDEFINITE))) break; if (!CheckResult(WriteMessageRaw(write_handle, mbuf.get(), bytes_read, hbuf.get(), handles_read, MOJO_WRITE_MESSAGE_FLAG_NONE))) { // On failure we own the handles. For now just close them. if (handles_read) CloseHandles(hbuf.get(), handles_read); break; } } } private: friend class base::RefCountedThreadSafe; virtual ~MessageProcessor() {} bool CheckResult(MojoResult mr) { if (mr == MOJO_RESULT_OK) return true; last_result_ = mr; return false; } MojoResult last_result_; uint32_t bytes_transfered_; uint32_t message_count_[2]; uint32_t handle_count_[2]; }; // In charge of intercepting access to the service manager. class SpyInterceptor : public mojo::ServiceManager::Interceptor { private: virtual mojo::ScopedMessagePipeHandle OnConnectToClient( const GURL& url, mojo::ScopedMessagePipeHandle real_client) OVERRIDE { if (!MustIntercept(url)) return real_client.Pass(); // You can get an invalid handle if the app (or service) is // created by unconventional means, for example the command line. if (!real_client.is_valid()) return real_client.Pass(); mojo::ScopedMessagePipeHandle faux_client; mojo::ScopedMessagePipeHandle interceptor; CreateMessagePipe(&faux_client, &interceptor); scoped_refptr processor = new MessageProcessor(); base::WorkerPool::PostTask( FROM_HERE, base::Bind(&MessageProcessor::Start, processor, base::Passed(&real_client), base::Passed(&interceptor)), true); return faux_client.Pass(); } bool MustIntercept(const GURL& url) { // TODO(cpu): manage who and when to intercept. return true; } }; spy::WebSocketServer* ws_server = NULL; void StartServer(int port) { // TODO(cpu) figure out lifetime of the server. See Spy() dtor. ws_server = new spy::WebSocketServer(port); ws_server->Start(); } struct SpyOptions { int websocket_port; SpyOptions() : websocket_port(kDefaultWebSocketPort) { } }; SpyOptions ProcessOptions(const std::string& options) { SpyOptions spy_options; if (options.empty()) return spy_options; base::StringPairs kv_pairs; base::SplitStringIntoKeyValuePairs(options, ':', ',', &kv_pairs); base::StringPairs::iterator it = kv_pairs.begin(); for (; it != kv_pairs.end(); ++it) { if (it->first == "port") { int port; if (base::StringToInt(it->second, &port)) spy_options.websocket_port = port; } } return spy_options; } } // namespace namespace mojo { Spy::Spy(mojo::ServiceManager* service_manager, const std::string& options) { SpyOptions spy_options = ProcessOptions(options); // Start the tread what will accept commands from the frontend. control_thread_.reset(new base::Thread("mojo_spy_control_thread")); base::Thread::Options thread_options(base::MessageLoop::TYPE_IO, 0); control_thread_->StartWithOptions(thread_options); control_thread_->message_loop_proxy()->PostTask( FROM_HERE, base::Bind(&StartServer, spy_options.websocket_port)); // Start intercepting mojo services. service_manager->SetInterceptor(new SpyInterceptor()); } Spy::~Spy(){ // TODO(cpu): Do not leak the interceptor. Lifetime between the // service_manager and the spy is still unclear hence the leak. } } // namespace mojo