diff options
author | dmichael@chromium.org <dmichael@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2012-05-31 19:37:54 +0000 |
---|---|---|
committer | dmichael@chromium.org <dmichael@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2012-05-31 19:37:54 +0000 |
commit | 4e8f0a76a825acd08f373b0492c90bdd6cc10f0b (patch) | |
tree | 61fbad848b0c240ce749a48a3244da536397af89 /ipc | |
parent | 6eed4094d08609af962197b99d2348d442d00d72 (diff) | |
download | chromium_src-4e8f0a76a825acd08f373b0492c90bdd6cc10f0b.zip chromium_src-4e8f0a76a825acd08f373b0492c90bdd6cc10f0b.tar.gz chromium_src-4e8f0a76a825acd08f373b0492c90bdd6cc10f0b.tar.bz2 |
PPAPI/NaCl: Re-land speculative implementation for ipc_channel_nacl.cc
Original CL: http://codereview.chromium.org/10174048/
Was committed at 139635, reverted at 139646 due to a flake.
BUG=116317
TEST=
TBR=dmichael@chromium.org
Review URL: https://chromiumcodereview.appspot.com/10442112
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@139840 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'ipc')
-rw-r--r-- | ipc/ipc_channel_nacl.cc | 336 | ||||
-rw-r--r-- | ipc/ipc_channel_nacl.h | 80 | ||||
-rw-r--r-- | ipc/ipc_channel_proxy.cc | 2 | ||||
-rw-r--r-- | ipc/ipc_channel_proxy.h | 2 |
4 files changed, 334 insertions, 86 deletions
diff --git a/ipc/ipc_channel_nacl.cc b/ipc/ipc_channel_nacl.cc index 683353e..192b4a8b 100644 --- a/ipc/ipc_channel_nacl.cc +++ b/ipc/ipc_channel_nacl.cc @@ -4,18 +4,137 @@ #include "ipc/ipc_channel_nacl.h" +#include <errno.h> +#include <stddef.h> +#include <sys/nacl_imc_api.h> +#include <sys/nacl_syscalls.h> +#include <sys/types.h> + +#include <algorithm> + +#include "base/bind.h" #include "base/file_util.h" #include "base/logging.h" - -// This file is currently a stub to get us linking. -// TODO(brettw) implement this. +#include "base/message_loop_proxy.h" +#include "base/process_util.h" +#include "base/synchronization/lock.h" +#include "base/task_runner_util.h" +#include "base/threading/simple_thread.h" +#include "ipc/file_descriptor_set_posix.h" +#include "ipc/ipc_logging.h" namespace IPC { +namespace { + +scoped_ptr<std::vector<char> > ReadDataOnReaderThread(int pipe) { + DCHECK(pipe >= 0); + + if (pipe < 0) + return scoped_ptr<std::vector<char> >(); + + scoped_ptr<std::vector<char> > buffer( + new std::vector<char>(Channel::kReadBufferSize)); + struct NaClImcMsgHdr msg = {0}; + struct NaClImcMsgIoVec iov = {&buffer->at(0), buffer->size()}; + msg.iov = &iov; + msg.iov_length = 1; + + int bytes_read = imc_recvmsg(pipe, &msg, 0); + + if (bytes_read <= 0) { + // NaClIPCAdapter::BlockingReceive returns -1 when the pipe closes (either + // due to error or for regular shutdown). + return scoped_ptr<std::vector<char> >(); + } + DCHECK(bytes_read); + buffer->resize(bytes_read); + return buffer.Pass(); +} + +} // namespace + +class Channel::ChannelImpl::ReaderThreadRunner + : public base::DelegateSimpleThread::Delegate { + public: + // |pipe|: A file descriptor from which we will read using imc_recvmsg. + // |data_read_callback|: A callback we invoke (on the main thread) when we + // have read data. The callback is passed a buffer of + // data that was read. + // |failure_callback|: A callback we invoke when we have a failure reading + // from |pipe|. + // |main_message_loop|: A proxy for the main thread, where we will invoke the + // above callbacks. + ReaderThreadRunner( + int pipe, + base::Callback<void (scoped_ptr<std::vector<char> >)> data_read_callback, + base::Callback<void ()> failure_callback, + base::MessageLoopProxy* main_message_loop); + + // DelegateSimpleThread implementation. Reads data from the pipe in a loop + // until either we are told to quit or a read fails. + virtual void Run() OVERRIDE; + + private: + int pipe_; + base::Callback<void (scoped_ptr<std::vector<char> >)> data_read_callback_; + base::Callback<void ()> failure_callback_; + base::MessageLoopProxy* main_message_loop_; + + DISALLOW_COPY_AND_ASSIGN(ReaderThreadRunner); +}; + +Channel::ChannelImpl::ReaderThreadRunner::ReaderThreadRunner( + int pipe, + base::Callback<void (scoped_ptr<std::vector<char> >)> data_read_callback, + base::Callback<void ()> failure_callback, + base::MessageLoopProxy* main_message_loop) + : pipe_(pipe), + data_read_callback_(data_read_callback), + failure_callback_(failure_callback), + main_message_loop_(main_message_loop) { +} + +void Channel::ChannelImpl::ReaderThreadRunner::Run() { + while (true) { + scoped_ptr<std::vector<char> > buffer(ReadDataOnReaderThread(pipe_)); + if (buffer.get()) { + main_message_loop_->PostTask(FROM_HERE, + base::Bind(data_read_callback_, base::Passed(buffer.Pass()))); + } else { + main_message_loop_->PostTask(FROM_HERE, failure_callback_); + // Because the read failed, we know we're going to quit. Don't bother + // trying to read again. + return; + } + } +} Channel::ChannelImpl::ChannelImpl(const IPC::ChannelHandle& channel_handle, - Mode mode, - Listener* listener) - : ChannelReader(listener) { + Mode mode, + Listener* listener) + : ChannelReader(listener), + mode_(mode), + waiting_connect_(true), + pipe_(-1), + pipe_name_(channel_handle.name), + weak_ptr_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)) { + if (!CreatePipe(channel_handle)) { + // The pipe may have been closed already. + const char *modestr = (mode_ & MODE_SERVER_FLAG) ? "server" : "client"; + LOG(WARNING) << "Unable to create pipe named \"" << channel_handle.name + << "\" in " << modestr << " mode"; + } + reader_thread_runner_.reset( + new ReaderThreadRunner( + pipe_, + base::Bind(&Channel::ChannelImpl::DidRecvMsg, + weak_ptr_factory_.GetWeakPtr()), + base::Bind(&Channel::ChannelImpl::ReadDidFail, + weak_ptr_factory_.GetWeakPtr()), + base::MessageLoopProxy::current())); + reader_thread_.reset( + new base::DelegateSimpleThread(reader_thread_runner_.get(), + "ipc_channel_nacl reader thread")); } Channel::ChannelImpl::~ChannelImpl() { @@ -23,69 +142,165 @@ Channel::ChannelImpl::~ChannelImpl() { } bool Channel::ChannelImpl::Connect() { - NOTIMPLEMENTED(); - return false; + if (pipe_ == -1) { + DLOG(INFO) << "Channel creation failed: " << pipe_name_; + return false; + } + + reader_thread_->Start(); + waiting_connect_ = false; + // If there were any messages queued before connection, send them. + ProcessOutgoingMessages(); } void Channel::ChannelImpl::Close() { - NOTIMPLEMENTED(); + // For now, we assume that at shutdown, the reader thread will be woken with + // a failure (see NaClIPCAdapter::BlockingRead and CloseChannel). Or... we + // might simply be killed with no chance to clean up anyway :-). + // If untrusted code tries to close the channel prior to shutdown, it's likely + // to hang. + // TODO(dmichael): Can we do anything smarter here to make sure the reader + // thread wakes up and quits? + reader_thread_->Join(); + close(pipe_); + pipe_ = -1; + reader_thread_runner_.reset(); + reader_thread_.reset(); + read_queue_.clear(); + output_queue_.clear(); } bool Channel::ChannelImpl::Send(Message* message) { - NOTIMPLEMENTED(); -} + DVLOG(2) << "sending message @" << message << " on channel @" << this + << " with type " << message->type(); + scoped_ptr<Message> message_ptr(message); -int Channel::ChannelImpl::GetClientFileDescriptor() const { - NOTIMPLEMENTED(); - return -1; -} +#ifdef IPC_MESSAGE_LOG_ENABLED + Logging::GetInstance()->OnSendMessage(message, ""); +#endif // IPC_MESSAGE_LOG_ENABLED -int Channel::ChannelImpl::TakeClientFileDescriptor() { - NOTIMPLEMENTED(); - return -1; + output_queue_.push_back(linked_ptr<Message>(message)); + if (!waiting_connect_) + return ProcessOutgoingMessages(); + + return true; } -bool Channel::ChannelImpl::AcceptsConnections() const { - NOTIMPLEMENTED(); - return false; +void Channel::ChannelImpl::DidRecvMsg(scoped_ptr<std::vector<char> > buffer) { + // Close sets the pipe to -1. It's possible we'll get a buffer sent to us from + // the reader thread after Close is called. If so, we ignore it. + if (pipe_ == -1) + return; + + read_queue_.push_back(linked_ptr<std::vector<char> >(buffer.release())); } -bool Channel::ChannelImpl::HasAcceptedConnection() const { - NOTIMPLEMENTED(); - return false; +void Channel::ChannelImpl::ReadDidFail() { + Close(); } -bool Channel::ChannelImpl::GetClientEuid(uid_t* client_euid) const { - NOTIMPLEMENTED(); - return false; +bool Channel::ChannelImpl::CreatePipe( + const IPC::ChannelHandle& channel_handle) { + DCHECK(pipe_ == -1); + + // There's one possible case in NaCl: + // 1) It's a channel wrapping a pipe that is given to us. + // We don't support these: + // 2) It's for a named channel. + // 3) It's for a client that we implement ourself. + // 4) It's the initial IPC channel. + + if (channel_handle.socket.fd == -1) { + NOTIMPLEMENTED(); + return false; + } + pipe_ = channel_handle.socket.fd; + return true; } -void Channel::ChannelImpl::ResetToAcceptingConnectionState() { - NOTIMPLEMENTED(); +bool Channel::ChannelImpl::ProcessOutgoingMessages() { + DCHECK(!waiting_connect_); // Why are we trying to send messages if there's + // no connection? + if (output_queue_.empty()) + return true; + + if (pipe_ == -1) + return false; + + // Write out all the messages. The trusted implementation is guaranteed to not + // block. See NaClIPCAdapter::Send for the implementation of imc_sendmsg. + while (!output_queue_.empty()) { + linked_ptr<Message> msg = output_queue_.front(); + output_queue_.pop_front(); + + struct NaClImcMsgHdr msgh = {0}; + struct NaClImcMsgIoVec iov = {const_cast<void*>(msg->data()), msg->size()}; + msgh.iov = &iov; + msgh.iov_length = 1; + ssize_t bytes_written = imc_sendmsg(pipe_, &msgh, 0); + + if (bytes_written < 0) { + // The trusted side should only ever give us an error of EPIPE. We + // should never be interrupted, nor should we get EAGAIN. + DCHECK(errno == EPIPE); + Close(); + PLOG(ERROR) << "pipe_ error on " + << pipe_ + << " Currently writing message of size: " + << msg->size(); + return false; + } + + // Message sent OK! + DVLOG(2) << "sent message @" << msg.get() << " with type " << msg->type() + << " on fd " << pipe_; + } + return true; } -Channel::ChannelImpl::ReadState - Channel::ChannelImpl::ReadData(char* buffer, - int buffer_len, - int* bytes_read) { - return Channel::ChannelImpl::ReadState(); +Channel::ChannelImpl::ReadState Channel::ChannelImpl::ReadData( + char* buffer, + int buffer_len, + int* bytes_read) { + *bytes_read = 0; + if (pipe_ == -1) + return READ_FAILED; + if (read_queue_.empty()) + return READ_PENDING; + while (!read_queue_.empty() && *bytes_read < buffer_len) { + linked_ptr<std::vector<char> > vec(read_queue_.front()); + int bytes_to_read = buffer_len - *bytes_read; + if (vec->size() <= bytes_to_read) { + // We can read and discard the entire vector. + std::copy(vec->begin(), vec->end(), buffer + *bytes_read); + *bytes_read += vec->size(); + read_queue_.pop_front(); + } else { + // Read all the bytes we can and discard them from the front of the + // vector. (This can be slowish, since erase has to move the back of the + // vector to the front, but it's hopefully a temporary hack and it keeps + // the code simple). + std::copy(vec->begin(), vec->begin() + bytes_to_read, + buffer + *bytes_read); + vec->erase(vec->begin(), vec->begin() + bytes_to_read); + *bytes_read += bytes_to_read; + } + } + return READ_SUCCEEDED; } bool Channel::ChannelImpl::WillDispatchInputMessage(Message* msg) { - return false; + return true; } bool Channel::ChannelImpl::DidEmptyInputBuffers() { - return false; + return true; } void Channel::ChannelImpl::HandleHelloMessage(const Message& msg) { -} - -// static -bool Channel::ChannelImpl::IsNamedServerInitialized( - const std::string& channel_id) { - return false; //file_util::PathExists(FilePath(channel_id)); + // The trusted side IPC::Channel should handle the "hello" handshake; we + // should not receive the "Hello" message. + NOTREACHED(); } //------------------------------------------------------------------------------ @@ -113,39 +328,14 @@ void Channel::set_listener(Listener* listener) { channel_impl_->set_listener(listener); } -bool Channel::Send(Message* message) { - return channel_impl_->Send(message); -} - -int Channel::GetClientFileDescriptor() const { - return channel_impl_->GetClientFileDescriptor(); -} - -int Channel::TakeClientFileDescriptor() { - return channel_impl_->TakeClientFileDescriptor(); -} - -bool Channel::AcceptsConnections() const { - return channel_impl_->AcceptsConnections(); -} - -bool Channel::HasAcceptedConnection() const { - return channel_impl_->HasAcceptedConnection(); -} - -bool Channel::GetClientEuid(uid_t* client_euid) const { - return channel_impl_->GetClientEuid(client_euid); -} - -void Channel::ResetToAcceptingConnectionState() { - channel_impl_->ResetToAcceptingConnectionState(); +base::ProcessId Channel::peer_pid() const { + // This shouldn't actually get used in the untrusted side of the proxy, and we + // don't have the real pid anyway. + return -1; } -base::ProcessId Channel::peer_pid() const { return 0; } - -// static -bool Channel::IsNamedServerInitialized(const std::string& channel_id) { - return ChannelImpl::IsNamedServerInitialized(channel_id); +bool Channel::Send(Message* message) { + return channel_impl_->Send(message); } // static diff --git a/ipc/ipc_channel_nacl.h b/ipc/ipc_channel_nacl.h index 2747263..84f4ef7 100644 --- a/ipc/ipc_channel_nacl.h +++ b/ipc/ipc_channel_nacl.h @@ -6,17 +6,31 @@ #define IPC_IPC_CHANNEL_NACL_H_ #pragma once +#include <deque> +#include <string> + +#include "base/memory/linked_ptr.h" +#include "base/memory/scoped_ptr.h" +#include "base/memory/weak_ptr.h" +#include "base/process.h" +#include "base/threading/simple_thread.h" #include "ipc/ipc_channel.h" #include "ipc/ipc_channel_reader.h" namespace IPC { // Similar to the Posix version of ChannelImpl but for Native Client code. -// This is somewhat different because NaCl's send/recvmsg is slightly different -// and we don't need to worry about complicated set up and READWRITE mode for -// sharing handles. +// This is somewhat different because sendmsg/recvmsg here do not follow POSIX +// semantics. Instead, they are implemented by a custom embedding of +// NaClDescCustom. See NaClIPCAdapter for the trusted-side implementation. +// +// We don't need to worry about complicated set up and READWRITE mode for +// sharing handles. We also currently do not support passing file descriptors or +// named pipes, and we use background threads to emulate signaling when we can +// read or write without blocking. class Channel::ChannelImpl : public internal::ChannelReader { public: + // Mirror methods of Channel, see ipc_channel.h for description. ChannelImpl(const IPC::ChannelHandle& channel_handle, Mode mode, Listener* listener); @@ -26,14 +40,18 @@ class Channel::ChannelImpl : public internal::ChannelReader { bool Connect(); void Close(); bool Send(Message* message); - int GetClientFileDescriptor() const; - int TakeClientFileDescriptor(); - bool AcceptsConnections() const; - bool HasAcceptedConnection() const; - bool GetClientEuid(uid_t* client_euid) const; - void ResetToAcceptingConnectionState(); - static bool IsNamedServerInitialized(const std::string& channel_id); + // Posted to the main thread by ReaderThreadRunner. + void DidRecvMsg(scoped_ptr<std::vector<char> > buffer); + void ReadDidFail(); + + private: + class ReaderThreadRunner; + + bool CreatePipe(const IPC::ChannelHandle& channel_handle); + bool ProcessOutgoingMessages(); + + // ChannelReader implementation. virtual ReadState ReadData(char* buffer, int buffer_len, int* bytes_read) OVERRIDE; @@ -41,7 +59,47 @@ class Channel::ChannelImpl : public internal::ChannelReader { virtual bool DidEmptyInputBuffers() OVERRIDE; virtual void HandleHelloMessage(const Message& msg) OVERRIDE; - private: + Mode mode_; + bool waiting_connect_; + + // The pipe used for communication. + int pipe_; + + // The "name" of our pipe. On Windows this is the global identifier for + // the pipe. On POSIX it's used as a key in a local map of file descriptors. + // For NaCl, we don't actually support looking up file descriptors by name, + // and it's only used for debug information. + std::string pipe_name_; + + // We use a thread for reading, so that we can simply block on reading and + // post the received data back to the main thread to be properly interleaved + // with other tasks in the MessagePump. + // + // imc_recvmsg supports non-blocking reads, but there's no easy way to be + // informed when a write or read can be done without blocking (this is handled + // by libevent in Posix). + scoped_ptr<ReaderThreadRunner> reader_thread_runner_; + scoped_ptr<base::DelegateSimpleThread> reader_thread_; + + // IPC::ChannelReader expects to be able to call ReadData on us to + // synchronously read data waiting in the pipe's buffer without blocking. + // Since we can't do that (see 1 and 2 above), the reader thread does blocking + // reads and posts the data over to the main thread in byte vectors. Each byte + // vector is the result of one call to "recvmsg". When ReadData is called, it + // pulls the bytes out of these vectors in order. + // TODO(dmichael): There's probably a more efficient way to emulate this with + // a circular buffer or something, so we don't have to do so + // many heap allocations. But it maybe isn't worth + // the trouble given that we probably want to implement 1 and + // 2 above in NaCl eventually. + std::deque<linked_ptr<std::vector<char> > > read_queue_; + + // This queue is used when a message is sent prior to Connect having been + // called. Normally after we're connected, the queue is empty. + std::deque<linked_ptr<Message> > output_queue_; + + base::WeakPtrFactory<ChannelImpl> weak_ptr_factory_; + DISALLOW_IMPLICIT_CONSTRUCTORS(ChannelImpl); }; diff --git a/ipc/ipc_channel_proxy.cc b/ipc/ipc_channel_proxy.cc index 3f0afa1..b9cc79c 100644 --- a/ipc/ipc_channel_proxy.cc +++ b/ipc/ipc_channel_proxy.cc @@ -367,7 +367,7 @@ void ChannelProxy::ClearIPCMessageLoop() { context()->ClearIPCMessageLoop(); } -#if defined(OS_POSIX) +#if defined(OS_POSIX) && !defined(OS_NACL) // See the TODO regarding lazy initialization of the channel in // ChannelProxy::Init(). int ChannelProxy::GetClientFileDescriptor() { diff --git a/ipc/ipc_channel_proxy.h b/ipc/ipc_channel_proxy.h index 8c0031a..6f5b96f 100644 --- a/ipc/ipc_channel_proxy.h +++ b/ipc/ipc_channel_proxy.h @@ -179,7 +179,7 @@ class IPC_EXPORT ChannelProxy : public Message::Sender { // Returns base::kNullProcessId if the peer is not connected yet. base::ProcessId peer_pid() const { return context_->peer_pid_; } -#if defined(OS_POSIX) +#if defined(OS_POSIX) && !defined(OS_NACL) // Calls through to the underlying channel's methods. int GetClientFileDescriptor(); int TakeClientFileDescriptor(); |