// Copyright 2016 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include "mojo/edk/system/channel.h" #include #include #include #include #include "base/bind.h" #include "base/location.h" #include "base/macros.h" #include "base/memory/ref_counted.h" #include "base/memory/scoped_ptr.h" #include "base/message_loop/message_loop.h" #include "base/synchronization/lock.h" #include "base/task_runner.h" #include "mojo/edk/embedder/platform_channel_utils_posix.h" #include "mojo/edk/embedder/platform_handle_vector.h" namespace mojo { namespace edk { namespace { const size_t kMaxBatchReadCapacity = 256 * 1024; // A view over a Channel::Message object. The write queue uses these since // large messages may need to be sent in chunks. class MessageView { public: // Owns |message|. |offset| indexes the first unsent byte in the message. MessageView(Channel::MessagePtr message, size_t offset) : message_(std::move(message)), offset_(offset), handles_(message_->TakeHandlesForTransport()) { DCHECK_GT(message_->data_num_bytes(), offset_); } MessageView(MessageView&& other) { *this = std::move(other); } MessageView& operator=(MessageView&& other) { message_ = std::move(other.message_); offset_ = other.offset_; handles_ = std::move(other.handles_); return *this; } ~MessageView() {} const void* data() const { return static_cast(message_->data()) + offset_; } size_t data_num_bytes() const { return message_->data_num_bytes() - offset_; } size_t data_offset() const { return offset_; } void advance_data_offset(size_t num_bytes) { DCHECK_GT(message_->data_num_bytes(), offset_ + num_bytes); offset_ += num_bytes; } ScopedPlatformHandleVectorPtr TakeHandles() { return std::move(handles_); } Channel::MessagePtr TakeMessage() { return std::move(message_); } void SetHandles(ScopedPlatformHandleVectorPtr handles) { handles_ = std::move(handles); } private: Channel::MessagePtr message_; size_t offset_; ScopedPlatformHandleVectorPtr handles_; DISALLOW_COPY_AND_ASSIGN(MessageView); }; class ChannelPosix : public Channel, public base::MessageLoop::DestructionObserver, public base::MessageLoopForIO::Watcher { public: ChannelPosix(Delegate* delegate, ScopedPlatformHandle handle, scoped_refptr io_task_runner) : Channel(delegate), self_(this), handle_(std::move(handle)), io_task_runner_(io_task_runner) #if defined(OS_MACOSX) , handles_to_close_(new PlatformHandleVector) #endif { } void Start() override { if (io_task_runner_->RunsTasksOnCurrentThread()) { StartOnIOThread(); } else { io_task_runner_->PostTask( FROM_HERE, base::Bind(&ChannelPosix::StartOnIOThread, this)); } } void ShutDownImpl() override { // Always shut down asynchronously when called through the public interface. io_task_runner_->PostTask( FROM_HERE, base::Bind(&ChannelPosix::ShutDownOnIOThread, this)); } void Write(MessagePtr message) override { bool write_error = false; { base::AutoLock lock(write_lock_); if (reject_writes_) return; if (outgoing_messages_.empty()) { if (!WriteNoLock(MessageView(std::move(message), 0))) reject_writes_ = write_error = true; } else { outgoing_messages_.emplace_back(std::move(message), 0); } } if (write_error) { // Do not synchronously invoke OnError(). Write() may have been called by // the delegate and we don't want to re-enter it. io_task_runner_->PostTask(FROM_HERE, base::Bind(&ChannelPosix::OnError, this)); } } ScopedPlatformHandleVectorPtr GetReadPlatformHandles( size_t num_handles, const void* extra_header, size_t extra_header_size) override { #if defined(OS_MACOSX) && !defined(OS_IOS) // On OSX, we can have mach ports which are located in the extra header // section. using MachPortsEntry = Channel::Message::MachPortsEntry; CHECK(extra_header_size >= num_handles * sizeof(MachPortsEntry)); size_t num_mach_ports = 0; const MachPortsEntry* mach_ports = reinterpret_cast(extra_header); for (size_t i = 0; i < num_handles; i++) { if (mach_ports[i].mach_port != MACH_PORT_NULL) num_mach_ports++; } CHECK(num_mach_ports <= num_handles); if (incoming_platform_handles_.size() + num_mach_ports < num_handles) return nullptr; ScopedPlatformHandleVectorPtr handles( new PlatformHandleVector(num_handles)); for (size_t i = 0, mach_port_index = 0; i < num_handles; ++i) { if (mach_port_index < num_mach_ports && mach_ports[mach_port_index].index == i) { (*handles)[i] = PlatformHandle( static_cast(mach_ports[mach_port_index].mach_port)); CHECK((*handles)[i].type == PlatformHandle::Type::MACH); // These are actually just Mach port names until they're resolved from // the remote process. (*handles)[i].type = PlatformHandle::Type::MACH_NAME; mach_port_index++; } else { CHECK(!incoming_platform_handles_.empty()); (*handles)[i] = incoming_platform_handles_.front(); incoming_platform_handles_.pop_front(); } } #else if (incoming_platform_handles_.size() < num_handles) return nullptr; ScopedPlatformHandleVectorPtr handles( new PlatformHandleVector(num_handles)); for (size_t i = 0; i < num_handles; ++i) { (*handles)[i] = incoming_platform_handles_.front(); incoming_platform_handles_.pop_front(); } #endif return handles; } private: ~ChannelPosix() override { DCHECK(!read_watcher_); DCHECK(!write_watcher_); for (auto handle : incoming_platform_handles_) handle.CloseIfNecessary(); } void StartOnIOThread() { DCHECK(!read_watcher_); DCHECK(!write_watcher_); read_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher); write_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher); base::MessageLoopForIO::current()->WatchFileDescriptor( handle_.get().handle, true /* persistent */, base::MessageLoopForIO::WATCH_READ, read_watcher_.get(), this); base::MessageLoop::current()->AddDestructionObserver(this); } void WaitForWriteOnIOThread() { base::AutoLock lock(write_lock_); WaitForWriteOnIOThreadNoLock(); } void WaitForWriteOnIOThreadNoLock() { if (pending_write_) return; if (!write_watcher_) return; if (io_task_runner_->RunsTasksOnCurrentThread()) { pending_write_ = true; base::MessageLoopForIO::current()->WatchFileDescriptor( handle_.get().handle, false /* persistent */, base::MessageLoopForIO::WATCH_WRITE, write_watcher_.get(), this); } else { io_task_runner_->PostTask( FROM_HERE, base::Bind(&ChannelPosix::WaitForWriteOnIOThread, this)); } } void ShutDownOnIOThread() { base::MessageLoop::current()->RemoveDestructionObserver(this); read_watcher_.reset(); write_watcher_.reset(); handle_.reset(); #if defined(OS_MACOSX) handles_to_close_.reset(); #endif // May destroy the |this| if it was the last reference. self_ = nullptr; } // base::MessageLoop::DestructionObserver: void WillDestroyCurrentMessageLoop() override { DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); if (self_) ShutDownOnIOThread(); } // base::MessageLoopForIO::Watcher: void OnFileCanReadWithoutBlocking(int fd) override { CHECK_EQ(fd, handle_.get().handle); bool read_error = false; size_t next_read_size = 0; size_t buffer_capacity = 0; size_t total_bytes_read = 0; size_t bytes_read = 0; do { buffer_capacity = next_read_size; char* buffer = GetReadBuffer(&buffer_capacity); DCHECK_GT(buffer_capacity, 0u); ssize_t read_result = PlatformChannelRecvmsg( handle_.get(), buffer, buffer_capacity, &incoming_platform_handles_); if (read_result > 0) { bytes_read = static_cast(read_result); total_bytes_read += bytes_read; if (!OnReadComplete(bytes_read, &next_read_size)) { read_error = true; break; } } else if (read_result == 0 || (errno != EAGAIN && errno != EWOULDBLOCK)) { read_error = true; break; } } while (bytes_read == buffer_capacity && total_bytes_read < kMaxBatchReadCapacity && next_read_size > 0); if (read_error) { // Stop receiving read notifications. read_watcher_.reset(); OnError(); } } void OnFileCanWriteWithoutBlocking(int fd) override { bool write_error = false; { base::AutoLock lock(write_lock_); pending_write_ = false; if (!FlushOutgoingMessagesNoLock()) reject_writes_ = write_error = true; } if (write_error) OnError(); } // Attempts to write a message directly to the channel. If the full message // cannot be written, it's queued and a wait is initiated to write the message // ASAP on the I/O thread. bool WriteNoLock(MessageView message_view) { size_t bytes_written = 0; do { message_view.advance_data_offset(bytes_written); ssize_t result; ScopedPlatformHandleVectorPtr handles = message_view.TakeHandles(); if (handles && handles->size()) { iovec iov = { const_cast(message_view.data()), message_view.data_num_bytes() }; // TODO: Handle lots of handles. result = PlatformChannelSendmsgWithHandles( handle_.get(), &iov, 1, handles->data(), handles->size()); if (result >= 0) { #if defined(OS_MACOSX) // There is a bug on OSX which makes it dangerous to close // a file descriptor while it is in transit. So instead we // store the file descriptor in a set and send a message to // the recipient, which is queued AFTER the message that // sent the FD. The recipient will reply to the message, // letting us know that it is now safe to close the file // descriptor. For more information, see: // http://crbug.com/298276 std::vector fds; for (auto& handle : *handles) fds.push_back(handle.handle); { base::AutoLock l(handles_to_close_lock_); for (auto& handle : *handles) handles_to_close_->push_back(handle); } MessagePtr fds_message( new Channel::Message(sizeof(fds[0]) * fds.size(), 0, Message::Header::MessageType::HANDLES_SENT)); memcpy(fds_message->mutable_payload(), fds.data(), sizeof(fds[0]) * fds.size()); outgoing_messages_.emplace_back(std::move(fds_message), 0); handles->clear(); #else handles.reset(); #endif // defined(OS_MACOSX) } } else { result = PlatformChannelWrite(handle_.get(), message_view.data(), message_view.data_num_bytes()); } if (result < 0) { if (errno != EAGAIN && errno != EWOULDBLOCK) return false; message_view.SetHandles(std::move(handles)); outgoing_messages_.emplace_front(std::move(message_view)); WaitForWriteOnIOThreadNoLock(); return true; } bytes_written = static_cast(result); } while (bytes_written < message_view.data_num_bytes()); return FlushOutgoingMessagesNoLock(); } bool FlushOutgoingMessagesNoLock() { std::deque messages; std::swap(outgoing_messages_, messages); while (!messages.empty()) { if (!WriteNoLock(std::move(messages.front()))) return false; messages.pop_front(); if (!outgoing_messages_.empty()) { // The message was requeued by WriteNoLock(), so we have to wait for // pipe to become writable again. Repopulate the message queue and exit. // If sending the message triggered any control messages, they may be // in |outgoing_messages_| in addition to or instead of the message // being sent. std::swap(messages, outgoing_messages_); while (!messages.empty()) { outgoing_messages_.push_front(std::move(messages.back())); messages.pop_back(); } return true; } } return true; } #if defined(OS_MACOSX) void OnControlMessage(Message::Header::MessageType message_type, const void* payload, size_t payload_size, ScopedPlatformHandleVectorPtr handles) override { switch (message_type) { case Message::Header::MessageType::HANDLES_SENT: { MessagePtr message(new Channel::Message( payload_size, 0, Message::Header::MessageType::HANDLES_SENT_ACK)); memcpy(message->mutable_payload(), payload, payload_size); Write(std::move(message)); break; } case Message::Header::MessageType::HANDLES_SENT_ACK: { const int* fds = reinterpret_cast(payload); size_t num_fds = payload_size / sizeof(*fds); if (payload_size % sizeof(*fds) != 0 || !CloseHandles(fds, num_fds)) { io_task_runner_->PostTask(FROM_HERE, base::Bind(&ChannelPosix::OnError, this)); } break; } default: NOTREACHED(); } } // Closes handles referenced by |fds|. Returns false if |num_fds| is 0, or if // |fds| does not match a sequence of handles in |handles_to_close_|. bool CloseHandles(const int* fds, size_t num_fds) { base::AutoLock l(handles_to_close_lock_); if (!num_fds) return false; auto start = std::find_if(handles_to_close_->begin(), handles_to_close_->end(), [&fds](const PlatformHandle& handle) { return handle.handle == fds[0]; }); if (start == handles_to_close_->end()) return false; auto it = start; size_t i = 0; // The FDs in the message should match a sequence of handles in // |handles_to_close_|. for (; i < num_fds && it != handles_to_close_->end(); i++, ++it) { if (it->handle != fds[i]) return false; it->CloseIfNecessary(); } if (i != num_fds) return false; handles_to_close_->erase(start, it); return true; } #endif // defined(OS_MACOSX) // Keeps the Channel alive at least until explicit shutdown on the IO thread. scoped_refptr self_; ScopedPlatformHandle handle_; scoped_refptr io_task_runner_; // These watchers must only be accessed on the IO thread. scoped_ptr read_watcher_; scoped_ptr write_watcher_; std::deque incoming_platform_handles_; // Protects |pending_write_| and |outgoing_messages_|. base::Lock write_lock_; bool pending_write_ = false; bool reject_writes_ = false; std::deque outgoing_messages_; #if defined(OS_MACOSX) base::Lock handles_to_close_lock_; ScopedPlatformHandleVectorPtr handles_to_close_; #endif DISALLOW_COPY_AND_ASSIGN(ChannelPosix); }; } // namespace // static scoped_refptr Channel::Create( Delegate* delegate, ScopedPlatformHandle platform_handle, scoped_refptr io_task_runner) { return new ChannelPosix(delegate, std::move(platform_handle), io_task_runner); } } // namespace edk } // namespace mojo