diff options
author | brettw@chromium.org <brettw@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2012-03-08 12:30:28 +0000 |
---|---|---|
committer | brettw@chromium.org <brettw@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2012-03-08 12:30:28 +0000 |
commit | d805c6a8ddfceebecf1fe9fd463fb0c8e5f3a881 (patch) | |
tree | c2f96a924fb5ddbf52276f9538f51d8cf60b42d0 /ipc | |
parent | aaa11b3c9bb3d4f786f41c22aeb55abdc119a5d0 (diff) | |
download | chromium_src-d805c6a8ddfceebecf1fe9fd463fb0c8e5f3a881.zip chromium_src-d805c6a8ddfceebecf1fe9fd463fb0c8e5f3a881.tar.gz chromium_src-d805c6a8ddfceebecf1fe9fd463fb0c8e5f3a881.tar.bz2 |
Factor out the shared parts of IPC channel reading.
This adds a new class+file ChannelReader that is responsible for management
of the common parts of IPC channel reading. The existing platform-specific
ChannelImpl classes derive from this and supply platform-specific reading
features via virtual classes. This is to reduce code duplication between the
Windows and Posix implementations of Channel.
Review URL: http://codereview.chromium.org/9547009
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@125597 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'ipc')
-rw-r--r-- | ipc/ipc.gypi | 4 | ||||
-rw-r--r-- | ipc/ipc_channel.h | 24 | ||||
-rw-r--r-- | ipc/ipc_channel_posix.cc | 95 | ||||
-rw-r--r-- | ipc/ipc_channel_posix.h | 59 | ||||
-rw-r--r-- | ipc/ipc_channel_reader.cc | 93 | ||||
-rw-r--r-- | ipc/ipc_channel_reader.h | 105 | ||||
-rw-r--r-- | ipc/ipc_channel_win.cc | 84 | ||||
-rw-r--r-- | ipc/ipc_channel_win.h | 43 |
8 files changed, 255 insertions, 252 deletions
diff --git a/ipc/ipc.gypi b/ipc/ipc.gypi index 7791934..d43947a 100644 --- a/ipc/ipc.gypi +++ b/ipc/ipc.gypi @@ -1,4 +1,4 @@ -# Copyright (c) 2011 The Chromium Authors. All rights reserved. +# Copyright (c) 2012 The Chromium Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. @@ -19,6 +19,8 @@ 'ipc_channel_posix.h', 'ipc_channel_proxy.cc', 'ipc_channel_proxy.h', + 'ipc_channel_reader.cc', + 'ipc_channel_reader.h', 'ipc_channel_win.cc', 'ipc_channel_win.h', 'ipc_descriptors.h', diff --git a/ipc/ipc_channel.h b/ipc/ipc_channel.h index 983503b..63beeee 100644 --- a/ipc/ipc_channel.h +++ b/ipc/ipc_channel.h @@ -1,4 +1,4 @@ -// Copyright (c) 2011 The Chromium Authors. All rights reserved. +// Copyright (c) 2012 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. @@ -94,6 +94,17 @@ class IPC_EXPORT Channel : public Message::Sender { #endif }; + // The Hello message is internal to the Channel class. It is sent + // by the peer when the channel is connected. The message contains + // just the process id (pid). The message has a special routing_id + // (MSG_ROUTING_NONE) and type (HELLO_MESSAGE_TYPE). + enum { + HELLO_MESSAGE_TYPE = kuint16max // Maximum value of message type (uint16), + // to avoid conflicting with normal + // message types, which are enumeration + // constants starting from 0. + }; + // The maximum message size in bytes. Attempting to receive a message of this // size or bigger results in a channel error. static const size_t kMaximumMessageSize = 128 * 1024 * 1024; @@ -197,17 +208,6 @@ class IPC_EXPORT Channel : public Message::Sender { // PIMPL to which all channel calls are delegated. class ChannelImpl; ChannelImpl *channel_impl_; - - // The Hello message is internal to the Channel class. It is sent - // by the peer when the channel is connected. The message contains - // just the process id (pid). The message has a special routing_id - // (MSG_ROUTING_NONE) and type (HELLO_MESSAGE_TYPE). - enum { - HELLO_MESSAGE_TYPE = kuint16max // Maximum value of message type (uint16), - // to avoid conflicting with normal - // message types, which are enumeration - // constants starting from 0. - }; }; } // namespace IPC diff --git a/ipc/ipc_channel_posix.cc b/ipc/ipc_channel_posix.cc index 4d6b8a02..39a11be 100644 --- a/ipc/ipc_channel_posix.cc +++ b/ipc/ipc_channel_posix.cc @@ -300,7 +300,8 @@ int Channel::ChannelImpl::global_pid_ = 0; Channel::ChannelImpl::ChannelImpl(const IPC::ChannelHandle& channel_handle, Mode mode, Listener* listener) - : mode_(mode), + : ChannelReader(listener), + mode_(mode), is_blocked_on_write_(false), waiting_connect_(true), message_send_bytes_written_(0), @@ -312,9 +313,7 @@ Channel::ChannelImpl::ChannelImpl(const IPC::ChannelHandle& channel_handle, remote_fd_pipe_(-1), #endif // IPC_USES_READWRITE pipe_name_(channel_handle.name), - listener_(listener), must_unlink_(false) { - memset(input_buf_, 0, sizeof(input_buf_)); memset(input_cmsg_buf_, 0, sizeof(input_cmsg_buf_)); if (!CreatePipe(channel_handle)) { // The pipe may have been closed already. @@ -478,71 +477,6 @@ bool Channel::ChannelImpl::Connect() { return did_connect; } -bool Channel::ChannelImpl::ProcessIncomingMessages() { - while (true) { - int bytes_read = 0; - ReadState read_state = ReadData(input_buf_, Channel::kReadBufferSize, - &bytes_read); - if (read_state == READ_FAILED) - return false; - if (read_state == READ_PENDING) - return true; - - DCHECK(bytes_read > 0); - if (!DispatchInputData(input_buf_, bytes_read)) - return false; - } -} - -bool Channel::ChannelImpl::DispatchInputData(const char* input_data, - int input_data_len) { - const char* p; - const char* end; - - // Possibly combine with the overflow buffer to make a larger buffer. - if (input_overflow_buf_.empty()) { - p = input_data; - end = input_data + input_data_len; - } else { - if (input_overflow_buf_.size() > - kMaximumMessageSize - input_data_len) { - input_overflow_buf_.clear(); - LOG(ERROR) << "IPC message is too big"; - return false; - } - input_overflow_buf_.append(input_data, input_data_len); - p = input_overflow_buf_.data(); - end = p + input_overflow_buf_.size(); - } - - // Dispatch all complete messages in the data buffer. - while (p < end) { - const char* message_tail = Message::FindNext(p, end); - if (message_tail) { - int len = static_cast<int>(message_tail - p); - Message m(p, len); - if (!WillDispatchInputMessage(&m)) - return false; - - if (IsHelloMessage(&m)) - HandleHelloMessage(m); - else - listener_->OnMessageReceived(m); - p = message_tail; - } else { - // Last message is partial. - break; - } - } - - // Save any partial data in the overflow buffer. - input_overflow_buf_.assign(p, end - p); - - if (input_overflow_buf_.empty() && !DidEmptyInputBuffers()) - return false; - return true; -} - bool Channel::ChannelImpl::ProcessOutgoingMessages() { DCHECK(!waiting_connect_); // Why are we trying to send messages if there's // no connection? @@ -604,7 +538,7 @@ bool Channel::ChannelImpl::ProcessOutgoingMessages() { msg->header()->num_fds = static_cast<uint16>(num_fds); #if defined(IPC_USES_READWRITE) - if (!IsHelloMessage(msg)) { + if (!IsHelloMessage(*msg)) { // Only the Hello message sends the file descriptor with the message. // Subsequently, we can send file descriptors on the dedicated // fd_pipe_ which makes Seccomp sandbox operation more efficient. @@ -624,7 +558,7 @@ bool Channel::ChannelImpl::ProcessOutgoingMessages() { if (bytes_written == 1) { fd_written = pipe_; #if defined(IPC_USES_READWRITE) - if ((mode_ & MODE_CLIENT_FLAG) && IsHelloMessage(msg)) { + if ((mode_ & MODE_CLIENT_FLAG) && IsHelloMessage(*msg)) { DCHECK_EQ(msg->file_descriptor_set()->size(), 1U); } if (!msgh.msg_controllen) { @@ -816,7 +750,7 @@ void Channel::ChannelImpl::OnFileCanReadWithoutBlocking(int fd) { int new_pipe = 0; if (!ServerAcceptConnection(server_listen_pipe_, &new_pipe)) { Close(); - listener_->OnChannelListenError(); + listener()->OnChannelListenError(); } if (pipe_ != -1) { @@ -826,7 +760,7 @@ void Channel::ChannelImpl::OnFileCanReadWithoutBlocking(int fd) { DPLOG(ERROR) << "shutdown " << pipe_name_; if (HANDLE_EINTR(close(new_pipe)) < 0) DPLOG(ERROR) << "close " << pipe_name_; - listener_->OnChannelDenied(); + listener()->OnChannelDenied(); return; } pipe_ = new_pipe; @@ -910,13 +844,13 @@ bool Channel::ChannelImpl::AcceptConnection() { void Channel::ChannelImpl::ClosePipeOnError() { if (HasAcceptedConnection()) { ResetToAcceptingConnectionState(); - listener_->OnChannelError(); + listener()->OnChannelError(); } else { Close(); if (AcceptsConnections()) { - listener_->OnChannelListenError(); + listener()->OnChannelListenError(); } else { - listener_->OnChannelError(); + listener()->OnChannelError(); } } } @@ -953,10 +887,6 @@ void Channel::ChannelImpl::QueueHelloMessage() { output_queue_.push(msg.release()); } -bool Channel::ChannelImpl::IsHelloMessage(const Message* m) const { - return m->routing_id() == MSG_ROUTING_NONE && m->type() == HELLO_MESSAGE_TYPE; -} - Channel::ChannelImpl::ReadState Channel::ChannelImpl::ReadData( char* buffer, int buffer_len, @@ -1035,6 +965,11 @@ bool Channel::ChannelImpl::ReadFileDescriptorsFromFDPipe() { } #endif +// On Posix, we need to fix up the file descriptors before the input message +// is dispatched. +// +// This will read from the input_fds_ (READWRITE mode only) and read more +// handles from the FD pipe if necessary. bool Channel::ChannelImpl::WillDispatchInputMessage(Message* msg) { uint16 header_fds = msg->header()->num_fds; if (!header_fds) @@ -1145,7 +1080,7 @@ void Channel::ChannelImpl::HandleHelloMessage(const Message& msg) { CHECK(descriptor.auto_close); } #endif // IPC_USES_READWRITE - listener_->OnChannelConnected(pid); + listener()->OnChannelConnected(pid); } void Channel::ChannelImpl::Close() { diff --git a/ipc/ipc_channel_posix.h b/ipc/ipc_channel_posix.h index 7d26e9b..028e5d9 100644 --- a/ipc/ipc_channel_posix.h +++ b/ipc/ipc_channel_posix.h @@ -16,6 +16,7 @@ #include "base/message_loop.h" #include "ipc/file_descriptor_set_posix.h" +#include "ipc/ipc_channel_reader.h" #if !defined(OS_MACOSX) // On Linux, the seccomp sandbox makes it very expensive to call @@ -47,7 +48,8 @@ namespace IPC { -class Channel::ChannelImpl : public MessageLoopForIO::Watcher { +class Channel::ChannelImpl : public internal::ChannelReader, + public MessageLoopForIO::Watcher { public: // Mirror methods of Channel, see ipc_channel.h for description. ChannelImpl(const IPC::ChannelHandle& channel_handle, Mode mode, @@ -55,7 +57,6 @@ class Channel::ChannelImpl : public MessageLoopForIO::Watcher { virtual ~ChannelImpl(); bool Connect(); void Close(); - void set_listener(Listener* listener) { listener_ = listener; } bool Send(Message* message); int GetClientFileDescriptor(); int TakeClientFileDescriptor(); @@ -70,38 +71,22 @@ class Channel::ChannelImpl : public MessageLoopForIO::Watcher { #endif // OS_LINUX private: - enum ReadState { READ_SUCCEEDED, READ_FAILED, READ_PENDING }; - bool CreatePipe(const IPC::ChannelHandle& channel_handle); - bool ProcessIncomingMessages(); bool ProcessOutgoingMessages(); bool AcceptConnection(); void ClosePipeOnError(); int GetHelloMessageProcId(); void QueueHelloMessage(); - bool IsHelloMessage(const Message* m) const; - // Populates the given buffer with data from the pipe. - // - // Returns the state of the read. On READ_SUCCESS, the number of bytes - // read will be placed into |*bytes_read| (which can be less than the - // buffer size). On READ_FAILED, the channel will be closed. - // - // If the return value is READ_PENDING, it means that there was no data - // ready for reading. The implementation is then responsible for either - // calling AsyncReadComplete with the number of bytes read into the - // buffer, or ProcessIncomingMessages to try the read again (depending - // on whether the platform's async I/O is "try again" or "write - // asynchronously into your buffer"). - ReadState ReadData(char* buffer, int buffer_len, int* bytes_read); - - // Takes the given data received from the IPC channel and dispatches any - // fully completed messages. - // - // Returns true on success. False means channel error. - bool DispatchInputData(const char* input_data, int input_data_len); + // ChannelReader implementation. + virtual ReadState ReadData(char* buffer, + int buffer_len, + int* bytes_read) OVERRIDE; + virtual bool WillDispatchInputMessage(Message* msg) OVERRIDE; + virtual bool DidEmptyInputBuffers() OVERRIDE; + virtual void HandleHelloMessage(const Message& msg) OVERRIDE; #if defined(IPC_USES_READWRITE) // Reads the next message from the fd_pipe_ and appends them to the @@ -111,17 +96,6 @@ class Channel::ChannelImpl : public MessageLoopForIO::Watcher { bool ReadFileDescriptorsFromFDPipe(); #endif - // Loads the required file desciptors into the given message. Returns true - // on success. False means a fatal channel error. - // - // This will read from the input_fds_ and read more handles from the FD - // pipe if necessary. - bool WillDispatchInputMessage(Message* msg); - - // Performs post-dispatch checks. Called when all input buffers are empty, - // though there could be more data ready to be read from the OS. - bool DidEmptyInputBuffers(); - // Finds the set of file descriptors in the given message. On success, // appends the descriptors to the input_fds_ member and returns true // @@ -133,9 +107,6 @@ class Channel::ChannelImpl : public MessageLoopForIO::Watcher { // used to clean up handles in error conditions to avoid leaking the handles. void ClearInputFDs(); - // Handles the first message sent over the pipe which contains setup info. - void HandleHelloMessage(const Message& msg); - // MessageLoopForIO::Watcher implementation. virtual void OnFileCanReadWithoutBlocking(int fd) OVERRIDE; virtual void OnFileCanWriteWithoutBlocking(int fd) OVERRIDE; @@ -178,19 +149,9 @@ class Channel::ChannelImpl : public MessageLoopForIO::Watcher { // the pipe. On POSIX it's used as a key in a local map of file descriptors. std::string pipe_name_; - Listener* listener_; - // Messages to be sent are queued here. std::queue<Message*> output_queue_; - // We read from the pipe into this buffer. Managed by DispatchInputData, do - // not access directly outside that function. - char input_buf_[Channel::kReadBufferSize]; - - // Large messages that span multiple pipe buffers, get built-up using - // this buffer. - std::string input_overflow_buf_; - // We assume a worst case: kReadBufferSize bytes of messages, where each // message has no payload and a full complement of descriptors. static const size_t kMaxReadFDs = diff --git a/ipc/ipc_channel_reader.cc b/ipc/ipc_channel_reader.cc new file mode 100644 index 0000000..47d1e8d --- /dev/null +++ b/ipc/ipc_channel_reader.cc @@ -0,0 +1,93 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "ipc/ipc_channel_reader.h" + +namespace IPC { +namespace internal { + +ChannelReader::ChannelReader(Channel::Listener* listener) + : listener_(listener) { +} + +ChannelReader::~ChannelReader() { +} + +bool ChannelReader::ProcessIncomingMessages() { + while (true) { + int bytes_read = 0; + ReadState read_state = ReadData(input_buf_, Channel::kReadBufferSize, + &bytes_read); + if (read_state == READ_FAILED) + return false; + if (read_state == READ_PENDING) + return true; + + DCHECK(bytes_read > 0); + if (!DispatchInputData(input_buf_, bytes_read)) + return false; + } +} + +bool ChannelReader::AsyncReadComplete(int bytes_read) { + return DispatchInputData(input_buf_, bytes_read); +} + +bool ChannelReader::IsHelloMessage(const Message& m) const { + return m.routing_id() == MSG_ROUTING_NONE && + m.type() == Channel::HELLO_MESSAGE_TYPE; +} + +bool ChannelReader::DispatchInputData(const char* input_data, + int input_data_len) { + const char* p; + const char* end; + + // Possibly combine with the overflow buffer to make a larger buffer. + if (input_overflow_buf_.empty()) { + p = input_data; + end = input_data + input_data_len; + } else { + if (input_overflow_buf_.size() > + Channel::kMaximumMessageSize - input_data_len) { + input_overflow_buf_.clear(); + LOG(ERROR) << "IPC message is too big"; + return false; + } + input_overflow_buf_.append(input_data, input_data_len); + p = input_overflow_buf_.data(); + end = p + input_overflow_buf_.size(); + } + + // Dispatch all complete messages in the data buffer. + while (p < end) { + const char* message_tail = Message::FindNext(p, end); + if (message_tail) { + int len = static_cast<int>(message_tail - p); + Message m(p, len); + if (!WillDispatchInputMessage(&m)) + return false; + + if (IsHelloMessage(m)) + HandleHelloMessage(m); + else + listener_->OnMessageReceived(m); + p = message_tail; + } else { + // Last message is partial. + break; + } + } + + // Save any partial data in the overflow buffer. + input_overflow_buf_.assign(p, end - p); + + if (input_overflow_buf_.empty() && !DidEmptyInputBuffers()) + return false; + return true; +} + + +} // namespace internal +} // namespace IPC diff --git a/ipc/ipc_channel_reader.h b/ipc/ipc_channel_reader.h new file mode 100644 index 0000000..192ca2d --- /dev/null +++ b/ipc/ipc_channel_reader.h @@ -0,0 +1,105 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef IPC_IPC_CHANNEL_READER_H_ +#define IPC_IPC_CHANNEL_READER_H_ + +#include "base/basictypes.h" +#include "ipc/ipc_channel.h" + +namespace IPC { +namespace internal { + +// This class provides common pipe reading functionality for the +// platform-specific IPC channel implementations. +// +// It does the common input buffer management and message dispatch, while the +// platform-specific parts provide the pipe management through a virtual +// interface implemented on a per-platform basis. +// +// Note that there is no "writer" corresponding to this because the code for +// writing to the channel is much simpler and has very little common +// functionality that would benefit from being factored out. If we add +// something like that in the future, it would be more appropriate to add it +// here (and rename appropriately) rather than writing a different class. +class ChannelReader { + public: + explicit ChannelReader(Channel::Listener* listener); + virtual ~ChannelReader(); + + void set_listener(Channel::Listener* listener) { listener_ = listener; } + + // Call to process messages received from the IPC connection and dispatch + // them. Returns false on channel error. True indicates that everything + // succeeded, although there may not have been any messages processed. + bool ProcessIncomingMessages(); + + // Handles asynchronously read data. + // + // Optionally call this after returning READ_PENDING from ReadData to + // indicate that buffer was filled with the given number of bytes of + // data. See ReadData for more. + bool AsyncReadComplete(int bytes_read); + + // Returns true if the given message is the "hello" message sent on channel + // set-up. + bool IsHelloMessage(const Message& m) const; + + protected: + enum ReadState { READ_SUCCEEDED, READ_FAILED, READ_PENDING }; + + Channel::Listener* listener() const { return listener_; } + + // Populates the given buffer with data from the pipe. + // + // Returns the state of the read. On READ_SUCCESS, the number of bytes + // read will be placed into |*bytes_read| (which can be less than the + // buffer size). On READ_FAILED, the channel will be closed. + // + // If the return value is READ_PENDING, it means that there was no data + // ready for reading. The implementation is then responsible for either + // calling AsyncReadComplete with the number of bytes read into the + // buffer, or ProcessIncomingMessages to try the read again (depending + // on whether the platform's async I/O is "try again" or "write + // asynchronously into your buffer"). + virtual ReadState ReadData(char* buffer, int buffer_len, int* bytes_read) = 0; + + // Loads the required file desciptors into the given message. Returns true + // on success. False means a fatal channel error. + // + // This will read from the input_fds_ and read more handles from the FD + // pipe if necessary. + virtual bool WillDispatchInputMessage(Message* msg) = 0; + + // Performs post-dispatch checks. Called when all input buffers are empty, + // though there could be more data ready to be read from the OS. + virtual bool DidEmptyInputBuffers() = 0; + + // Handles the first message sent over the pipe which contains setup info. + virtual void HandleHelloMessage(const Message& msg) = 0; + + private: + // Takes the given data received from the IPC channel and dispatches any + // fully completed messages. + // + // Returns true on success. False means channel error. + bool DispatchInputData(const char* input_data, int input_data_len); + + Channel::Listener* listener_; + + // We read from the pipe into this buffer. Managed by DispatchInputData, do + // not access directly outside that function. + char input_buf_[Channel::kReadBufferSize]; + + // Large messages that span multiple pipe buffers, get built-up using + // this buffer. + std::string input_overflow_buf_; + + DISALLOW_COPY_AND_ASSIGN(ChannelReader); +}; + +} // namespace internal +} // namespace IPC + +#endif // IPC_IPC_CHANNEL_READER_H_ diff --git a/ipc/ipc_channel_win.cc b/ipc/ipc_channel_win.cc index b2a3421..ef974ee 100644 --- a/ipc/ipc_channel_win.cc +++ b/ipc/ipc_channel_win.cc @@ -30,10 +30,10 @@ Channel::ChannelImpl::State::~State() { Channel::ChannelImpl::ChannelImpl(const IPC::ChannelHandle &channel_handle, Mode mode, Listener* listener) - : ALLOW_THIS_IN_INITIALIZER_LIST(input_state_(this)), + : ChannelReader(listener), + ALLOW_THIS_IN_INITIALIZER_LIST(input_state_(this)), ALLOW_THIS_IN_INITIALIZER_LIST(output_state_(this)), pipe_(INVALID_HANDLE_VALUE), - listener_(listener), waiting_connect_(mode & MODE_SERVER_FLAG), processing_incoming_(false), ALLOW_THIS_IN_INITIALIZER_LIST(weak_factory_(this)) { @@ -113,7 +113,7 @@ Channel::ChannelImpl::ReadState Channel::ChannelImpl::ReadData( return READ_FAILED; DWORD bytes_read = 0; - BOOL ok = ReadFile(pipe_, input_buf_, Channel::kReadBufferSize, + BOOL ok = ReadFile(pipe_, buffer, buffer_len, &bytes_read, &input_state_.context.overlapped); if (!ok) { DWORD err = GetLastError(); @@ -138,75 +138,20 @@ Channel::ChannelImpl::ReadState Channel::ChannelImpl::ReadData( } bool Channel::ChannelImpl::WillDispatchInputMessage(Message* msg) { + // We don't need to do anything here. return true; } void Channel::ChannelImpl::HandleHelloMessage(const Message& msg) { // The hello message contains one parameter containing the PID. - listener_->OnChannelConnected(MessageIterator(msg).NextInt()); + listener()->OnChannelConnected(MessageIterator(msg).NextInt()); } bool Channel::ChannelImpl::DidEmptyInputBuffers() { + // We don't need to do anything here. return true; } -bool Channel::ChannelImpl::DispatchInputData(const char* input_data, - int input_data_len) { - const char* p; - const char* end; - - // Possibly combine with the overflow buffer to make a larger buffer. - if (input_overflow_buf_.empty()) { - p = input_data; - end = input_data + input_data_len; - } else { - if (input_overflow_buf_.size() > - kMaximumMessageSize - input_data_len) { - input_overflow_buf_.clear(); - LOG(ERROR) << "IPC message is too big"; - return false; - } - input_overflow_buf_.append(input_data, input_data_len); - p = input_overflow_buf_.data(); - end = p + input_overflow_buf_.size(); - } - - // Dispatch all complete messages in the data buffer. - while (p < end) { - const char* message_tail = Message::FindNext(p, end); - if (message_tail) { - int len = static_cast<int>(message_tail - p); - Message m(p, len); - if (!WillDispatchInputMessage(&m)) - return false; - - if (IsHelloMessage(m)) - HandleHelloMessage(m); - else - listener_->OnMessageReceived(m); - p = message_tail; - } else { - // Last message is partial. - break; - } - } - - // Save any partial data in the overflow buffer. - input_overflow_buf_.assign(p, end - p); - - if (input_overflow_buf_.empty() && !DidEmptyInputBuffers()) - return false; - return true; -} - -bool Channel::ChannelImpl::IsHelloMessage(const Message& m) const { - return m.routing_id() == MSG_ROUTING_NONE && m.type() == HELLO_MESSAGE_TYPE; -} - -bool Channel::ChannelImpl::AsyncReadComplete(int bytes_read) { - return DispatchInputData(input_buf_, bytes_read); -} - // static const std::wstring Channel::ChannelImpl::PipeName( const std::string& channel_id) { @@ -360,21 +305,6 @@ bool Channel::ChannelImpl::ProcessConnection() { return true; } -bool Channel::ChannelImpl::ProcessIncomingMessages() { - while (true) { - int bytes_read = 0; - ReadState read_state = ReadData(input_buf_, Channel::kReadBufferSize, - &bytes_read); - if (read_state == READ_FAILED) - return false; - if (read_state == READ_PENDING) - return true; - DCHECK(bytes_read > 0); - if (!DispatchInputData(input_buf_, bytes_read)) - return false; - } -} - bool Channel::ChannelImpl::ProcessOutgoingMessages( MessageLoopForIO::IOContext* context, DWORD bytes_written) { @@ -475,7 +405,7 @@ void Channel::ChannelImpl::OnIOCompleted(MessageLoopForIO::IOContext* context, if (!ok && INVALID_HANDLE_VALUE != pipe_) { // We don't want to re-enter Close(). Close(); - listener_->OnChannelError(); + listener()->OnChannelError(); } } diff --git a/ipc/ipc_channel_win.h b/ipc/ipc_channel_win.h index b857eb3..7a48695 100644 --- a/ipc/ipc_channel_win.h +++ b/ipc/ipc_channel_win.h @@ -14,6 +14,7 @@ #include "base/memory/scoped_ptr.h" #include "base/memory/weak_ptr.h" #include "base/message_loop.h" +#include "ipc/ipc_channel_reader.h" namespace base { class NonThreadSafe; @@ -21,7 +22,8 @@ class NonThreadSafe; namespace IPC { -class Channel::ChannelImpl : public MessageLoopForIO::IOHandler { +class Channel::ChannelImpl : public internal::ChannelReader, + public MessageLoopForIO::IOHandler { public: // Mirror methods of Channel, see ipc_channel.h for description. ChannelImpl(const IPC::ChannelHandle &channel_handle, Mode mode, @@ -29,38 +31,22 @@ class Channel::ChannelImpl : public MessageLoopForIO::IOHandler { ~ChannelImpl(); bool Connect(); void Close(); - void set_listener(Listener* listener) { listener_ = listener; } bool Send(Message* message); static bool IsNamedServerInitialized(const std::string& channel_id); private: - enum ReadState { READ_SUCCEEDED, READ_FAILED, READ_PENDING }; - - // This will become the virtual interface implemented by this class to - // handle platform-specific reading. - // TODO(brettw) finish refactoring. - ReadState ReadData(char* buffer, int buffer_len, int* bytes_read); - bool WillDispatchInputMessage(Message* msg); - void HandleHelloMessage(const Message& msg); - bool DidEmptyInputBuffers(); - - bool DispatchInputData(const char* input_data, int input_data_len); - - // Returns true if the given message is the hello message. - bool IsHelloMessage(const Message& m) const; - - // Handles asynchronously read data. - // - // Optionally call this after returning READ_PENDING from ReadData to - // indicate that buffer was filled with the given number of bytes of - // data. See ReadData for more. - bool AsyncReadComplete(int bytes_read); + // ChannelReader implementation. + virtual ReadState ReadData(char* buffer, + int buffer_len, + int* bytes_read) OVERRIDE; + virtual bool WillDispatchInputMessage(Message* msg) OVERRIDE; + bool DidEmptyInputBuffers() OVERRIDE; + virtual void HandleHelloMessage(const Message& msg) OVERRIDE; static const std::wstring PipeName(const std::string& channel_id); bool CreatePipe(const IPC::ChannelHandle &channel_handle, Mode mode); bool ProcessConnection(); - bool ProcessIncomingMessages(); bool ProcessOutgoingMessages(MessageLoopForIO::IOContext* context, DWORD bytes_written); @@ -80,18 +66,9 @@ class Channel::ChannelImpl : public MessageLoopForIO::IOHandler { HANDLE pipe_; - Listener* listener_; - // Messages to be sent are queued here. std::queue<Message*> output_queue_; - // We read from the pipe into this buffer - char input_buf_[Channel::kReadBufferSize]; - - // Large messages that span multiple pipe buffers, get built-up using - // this buffer. - std::string input_overflow_buf_; - // In server-mode, we have to wait for the client to connect before we // can begin reading. We make use of the input_state_ when performing // the connect operation in overlapped mode. |