summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorbrettw@chromium.org <brettw@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2012-03-08 12:30:28 +0000
committerbrettw@chromium.org <brettw@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98>2012-03-08 12:30:28 +0000
commitd805c6a8ddfceebecf1fe9fd463fb0c8e5f3a881 (patch)
treec2f96a924fb5ddbf52276f9538f51d8cf60b42d0
parentaaa11b3c9bb3d4f786f41c22aeb55abdc119a5d0 (diff)
downloadchromium_src-d805c6a8ddfceebecf1fe9fd463fb0c8e5f3a881.zip
chromium_src-d805c6a8ddfceebecf1fe9fd463fb0c8e5f3a881.tar.gz
chromium_src-d805c6a8ddfceebecf1fe9fd463fb0c8e5f3a881.tar.bz2
Factor out the shared parts of IPC channel reading.
This adds a new class+file ChannelReader that is responsible for management of the common parts of IPC channel reading. The existing platform-specific ChannelImpl classes derive from this and supply platform-specific reading features via virtual classes. This is to reduce code duplication between the Windows and Posix implementations of Channel. Review URL: http://codereview.chromium.org/9547009 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@125597 0039d316-1c4b-4281-b951-d872f2087c98
-rw-r--r--ipc/ipc.gypi4
-rw-r--r--ipc/ipc_channel.h24
-rw-r--r--ipc/ipc_channel_posix.cc95
-rw-r--r--ipc/ipc_channel_posix.h59
-rw-r--r--ipc/ipc_channel_reader.cc93
-rw-r--r--ipc/ipc_channel_reader.h105
-rw-r--r--ipc/ipc_channel_win.cc84
-rw-r--r--ipc/ipc_channel_win.h43
8 files changed, 255 insertions, 252 deletions
diff --git a/ipc/ipc.gypi b/ipc/ipc.gypi
index 7791934..d43947a 100644
--- a/ipc/ipc.gypi
+++ b/ipc/ipc.gypi
@@ -1,4 +1,4 @@
-# Copyright (c) 2011 The Chromium Authors. All rights reserved.
+# Copyright (c) 2012 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
@@ -19,6 +19,8 @@
'ipc_channel_posix.h',
'ipc_channel_proxy.cc',
'ipc_channel_proxy.h',
+ 'ipc_channel_reader.cc',
+ 'ipc_channel_reader.h',
'ipc_channel_win.cc',
'ipc_channel_win.h',
'ipc_descriptors.h',
diff --git a/ipc/ipc_channel.h b/ipc/ipc_channel.h
index 983503b..63beeee 100644
--- a/ipc/ipc_channel.h
+++ b/ipc/ipc_channel.h
@@ -1,4 +1,4 @@
-// Copyright (c) 2011 The Chromium Authors. All rights reserved.
+// Copyright (c) 2012 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
@@ -94,6 +94,17 @@ class IPC_EXPORT Channel : public Message::Sender {
#endif
};
+ // The Hello message is internal to the Channel class. It is sent
+ // by the peer when the channel is connected. The message contains
+ // just the process id (pid). The message has a special routing_id
+ // (MSG_ROUTING_NONE) and type (HELLO_MESSAGE_TYPE).
+ enum {
+ HELLO_MESSAGE_TYPE = kuint16max // Maximum value of message type (uint16),
+ // to avoid conflicting with normal
+ // message types, which are enumeration
+ // constants starting from 0.
+ };
+
// The maximum message size in bytes. Attempting to receive a message of this
// size or bigger results in a channel error.
static const size_t kMaximumMessageSize = 128 * 1024 * 1024;
@@ -197,17 +208,6 @@ class IPC_EXPORT Channel : public Message::Sender {
// PIMPL to which all channel calls are delegated.
class ChannelImpl;
ChannelImpl *channel_impl_;
-
- // The Hello message is internal to the Channel class. It is sent
- // by the peer when the channel is connected. The message contains
- // just the process id (pid). The message has a special routing_id
- // (MSG_ROUTING_NONE) and type (HELLO_MESSAGE_TYPE).
- enum {
- HELLO_MESSAGE_TYPE = kuint16max // Maximum value of message type (uint16),
- // to avoid conflicting with normal
- // message types, which are enumeration
- // constants starting from 0.
- };
};
} // namespace IPC
diff --git a/ipc/ipc_channel_posix.cc b/ipc/ipc_channel_posix.cc
index 4d6b8a02..39a11be 100644
--- a/ipc/ipc_channel_posix.cc
+++ b/ipc/ipc_channel_posix.cc
@@ -300,7 +300,8 @@ int Channel::ChannelImpl::global_pid_ = 0;
Channel::ChannelImpl::ChannelImpl(const IPC::ChannelHandle& channel_handle,
Mode mode, Listener* listener)
- : mode_(mode),
+ : ChannelReader(listener),
+ mode_(mode),
is_blocked_on_write_(false),
waiting_connect_(true),
message_send_bytes_written_(0),
@@ -312,9 +313,7 @@ Channel::ChannelImpl::ChannelImpl(const IPC::ChannelHandle& channel_handle,
remote_fd_pipe_(-1),
#endif // IPC_USES_READWRITE
pipe_name_(channel_handle.name),
- listener_(listener),
must_unlink_(false) {
- memset(input_buf_, 0, sizeof(input_buf_));
memset(input_cmsg_buf_, 0, sizeof(input_cmsg_buf_));
if (!CreatePipe(channel_handle)) {
// The pipe may have been closed already.
@@ -478,71 +477,6 @@ bool Channel::ChannelImpl::Connect() {
return did_connect;
}
-bool Channel::ChannelImpl::ProcessIncomingMessages() {
- while (true) {
- int bytes_read = 0;
- ReadState read_state = ReadData(input_buf_, Channel::kReadBufferSize,
- &bytes_read);
- if (read_state == READ_FAILED)
- return false;
- if (read_state == READ_PENDING)
- return true;
-
- DCHECK(bytes_read > 0);
- if (!DispatchInputData(input_buf_, bytes_read))
- return false;
- }
-}
-
-bool Channel::ChannelImpl::DispatchInputData(const char* input_data,
- int input_data_len) {
- const char* p;
- const char* end;
-
- // Possibly combine with the overflow buffer to make a larger buffer.
- if (input_overflow_buf_.empty()) {
- p = input_data;
- end = input_data + input_data_len;
- } else {
- if (input_overflow_buf_.size() >
- kMaximumMessageSize - input_data_len) {
- input_overflow_buf_.clear();
- LOG(ERROR) << "IPC message is too big";
- return false;
- }
- input_overflow_buf_.append(input_data, input_data_len);
- p = input_overflow_buf_.data();
- end = p + input_overflow_buf_.size();
- }
-
- // Dispatch all complete messages in the data buffer.
- while (p < end) {
- const char* message_tail = Message::FindNext(p, end);
- if (message_tail) {
- int len = static_cast<int>(message_tail - p);
- Message m(p, len);
- if (!WillDispatchInputMessage(&m))
- return false;
-
- if (IsHelloMessage(&m))
- HandleHelloMessage(m);
- else
- listener_->OnMessageReceived(m);
- p = message_tail;
- } else {
- // Last message is partial.
- break;
- }
- }
-
- // Save any partial data in the overflow buffer.
- input_overflow_buf_.assign(p, end - p);
-
- if (input_overflow_buf_.empty() && !DidEmptyInputBuffers())
- return false;
- return true;
-}
-
bool Channel::ChannelImpl::ProcessOutgoingMessages() {
DCHECK(!waiting_connect_); // Why are we trying to send messages if there's
// no connection?
@@ -604,7 +538,7 @@ bool Channel::ChannelImpl::ProcessOutgoingMessages() {
msg->header()->num_fds = static_cast<uint16>(num_fds);
#if defined(IPC_USES_READWRITE)
- if (!IsHelloMessage(msg)) {
+ if (!IsHelloMessage(*msg)) {
// Only the Hello message sends the file descriptor with the message.
// Subsequently, we can send file descriptors on the dedicated
// fd_pipe_ which makes Seccomp sandbox operation more efficient.
@@ -624,7 +558,7 @@ bool Channel::ChannelImpl::ProcessOutgoingMessages() {
if (bytes_written == 1) {
fd_written = pipe_;
#if defined(IPC_USES_READWRITE)
- if ((mode_ & MODE_CLIENT_FLAG) && IsHelloMessage(msg)) {
+ if ((mode_ & MODE_CLIENT_FLAG) && IsHelloMessage(*msg)) {
DCHECK_EQ(msg->file_descriptor_set()->size(), 1U);
}
if (!msgh.msg_controllen) {
@@ -816,7 +750,7 @@ void Channel::ChannelImpl::OnFileCanReadWithoutBlocking(int fd) {
int new_pipe = 0;
if (!ServerAcceptConnection(server_listen_pipe_, &new_pipe)) {
Close();
- listener_->OnChannelListenError();
+ listener()->OnChannelListenError();
}
if (pipe_ != -1) {
@@ -826,7 +760,7 @@ void Channel::ChannelImpl::OnFileCanReadWithoutBlocking(int fd) {
DPLOG(ERROR) << "shutdown " << pipe_name_;
if (HANDLE_EINTR(close(new_pipe)) < 0)
DPLOG(ERROR) << "close " << pipe_name_;
- listener_->OnChannelDenied();
+ listener()->OnChannelDenied();
return;
}
pipe_ = new_pipe;
@@ -910,13 +844,13 @@ bool Channel::ChannelImpl::AcceptConnection() {
void Channel::ChannelImpl::ClosePipeOnError() {
if (HasAcceptedConnection()) {
ResetToAcceptingConnectionState();
- listener_->OnChannelError();
+ listener()->OnChannelError();
} else {
Close();
if (AcceptsConnections()) {
- listener_->OnChannelListenError();
+ listener()->OnChannelListenError();
} else {
- listener_->OnChannelError();
+ listener()->OnChannelError();
}
}
}
@@ -953,10 +887,6 @@ void Channel::ChannelImpl::QueueHelloMessage() {
output_queue_.push(msg.release());
}
-bool Channel::ChannelImpl::IsHelloMessage(const Message* m) const {
- return m->routing_id() == MSG_ROUTING_NONE && m->type() == HELLO_MESSAGE_TYPE;
-}
-
Channel::ChannelImpl::ReadState Channel::ChannelImpl::ReadData(
char* buffer,
int buffer_len,
@@ -1035,6 +965,11 @@ bool Channel::ChannelImpl::ReadFileDescriptorsFromFDPipe() {
}
#endif
+// On Posix, we need to fix up the file descriptors before the input message
+// is dispatched.
+//
+// This will read from the input_fds_ (READWRITE mode only) and read more
+// handles from the FD pipe if necessary.
bool Channel::ChannelImpl::WillDispatchInputMessage(Message* msg) {
uint16 header_fds = msg->header()->num_fds;
if (!header_fds)
@@ -1145,7 +1080,7 @@ void Channel::ChannelImpl::HandleHelloMessage(const Message& msg) {
CHECK(descriptor.auto_close);
}
#endif // IPC_USES_READWRITE
- listener_->OnChannelConnected(pid);
+ listener()->OnChannelConnected(pid);
}
void Channel::ChannelImpl::Close() {
diff --git a/ipc/ipc_channel_posix.h b/ipc/ipc_channel_posix.h
index 7d26e9b..028e5d9 100644
--- a/ipc/ipc_channel_posix.h
+++ b/ipc/ipc_channel_posix.h
@@ -16,6 +16,7 @@
#include "base/message_loop.h"
#include "ipc/file_descriptor_set_posix.h"
+#include "ipc/ipc_channel_reader.h"
#if !defined(OS_MACOSX)
// On Linux, the seccomp sandbox makes it very expensive to call
@@ -47,7 +48,8 @@
namespace IPC {
-class Channel::ChannelImpl : public MessageLoopForIO::Watcher {
+class Channel::ChannelImpl : public internal::ChannelReader,
+ public MessageLoopForIO::Watcher {
public:
// Mirror methods of Channel, see ipc_channel.h for description.
ChannelImpl(const IPC::ChannelHandle& channel_handle, Mode mode,
@@ -55,7 +57,6 @@ class Channel::ChannelImpl : public MessageLoopForIO::Watcher {
virtual ~ChannelImpl();
bool Connect();
void Close();
- void set_listener(Listener* listener) { listener_ = listener; }
bool Send(Message* message);
int GetClientFileDescriptor();
int TakeClientFileDescriptor();
@@ -70,38 +71,22 @@ class Channel::ChannelImpl : public MessageLoopForIO::Watcher {
#endif // OS_LINUX
private:
- enum ReadState { READ_SUCCEEDED, READ_FAILED, READ_PENDING };
-
bool CreatePipe(const IPC::ChannelHandle& channel_handle);
- bool ProcessIncomingMessages();
bool ProcessOutgoingMessages();
bool AcceptConnection();
void ClosePipeOnError();
int GetHelloMessageProcId();
void QueueHelloMessage();
- bool IsHelloMessage(const Message* m) const;
- // Populates the given buffer with data from the pipe.
- //
- // Returns the state of the read. On READ_SUCCESS, the number of bytes
- // read will be placed into |*bytes_read| (which can be less than the
- // buffer size). On READ_FAILED, the channel will be closed.
- //
- // If the return value is READ_PENDING, it means that there was no data
- // ready for reading. The implementation is then responsible for either
- // calling AsyncReadComplete with the number of bytes read into the
- // buffer, or ProcessIncomingMessages to try the read again (depending
- // on whether the platform's async I/O is "try again" or "write
- // asynchronously into your buffer").
- ReadState ReadData(char* buffer, int buffer_len, int* bytes_read);
-
- // Takes the given data received from the IPC channel and dispatches any
- // fully completed messages.
- //
- // Returns true on success. False means channel error.
- bool DispatchInputData(const char* input_data, int input_data_len);
+ // ChannelReader implementation.
+ virtual ReadState ReadData(char* buffer,
+ int buffer_len,
+ int* bytes_read) OVERRIDE;
+ virtual bool WillDispatchInputMessage(Message* msg) OVERRIDE;
+ virtual bool DidEmptyInputBuffers() OVERRIDE;
+ virtual void HandleHelloMessage(const Message& msg) OVERRIDE;
#if defined(IPC_USES_READWRITE)
// Reads the next message from the fd_pipe_ and appends them to the
@@ -111,17 +96,6 @@ class Channel::ChannelImpl : public MessageLoopForIO::Watcher {
bool ReadFileDescriptorsFromFDPipe();
#endif
- // Loads the required file desciptors into the given message. Returns true
- // on success. False means a fatal channel error.
- //
- // This will read from the input_fds_ and read more handles from the FD
- // pipe if necessary.
- bool WillDispatchInputMessage(Message* msg);
-
- // Performs post-dispatch checks. Called when all input buffers are empty,
- // though there could be more data ready to be read from the OS.
- bool DidEmptyInputBuffers();
-
// Finds the set of file descriptors in the given message. On success,
// appends the descriptors to the input_fds_ member and returns true
//
@@ -133,9 +107,6 @@ class Channel::ChannelImpl : public MessageLoopForIO::Watcher {
// used to clean up handles in error conditions to avoid leaking the handles.
void ClearInputFDs();
- // Handles the first message sent over the pipe which contains setup info.
- void HandleHelloMessage(const Message& msg);
-
// MessageLoopForIO::Watcher implementation.
virtual void OnFileCanReadWithoutBlocking(int fd) OVERRIDE;
virtual void OnFileCanWriteWithoutBlocking(int fd) OVERRIDE;
@@ -178,19 +149,9 @@ class Channel::ChannelImpl : public MessageLoopForIO::Watcher {
// the pipe. On POSIX it's used as a key in a local map of file descriptors.
std::string pipe_name_;
- Listener* listener_;
-
// Messages to be sent are queued here.
std::queue<Message*> output_queue_;
- // We read from the pipe into this buffer. Managed by DispatchInputData, do
- // not access directly outside that function.
- char input_buf_[Channel::kReadBufferSize];
-
- // Large messages that span multiple pipe buffers, get built-up using
- // this buffer.
- std::string input_overflow_buf_;
-
// We assume a worst case: kReadBufferSize bytes of messages, where each
// message has no payload and a full complement of descriptors.
static const size_t kMaxReadFDs =
diff --git a/ipc/ipc_channel_reader.cc b/ipc/ipc_channel_reader.cc
new file mode 100644
index 0000000..47d1e8d
--- /dev/null
+++ b/ipc/ipc_channel_reader.cc
@@ -0,0 +1,93 @@
+// Copyright (c) 2012 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "ipc/ipc_channel_reader.h"
+
+namespace IPC {
+namespace internal {
+
+ChannelReader::ChannelReader(Channel::Listener* listener)
+ : listener_(listener) {
+}
+
+ChannelReader::~ChannelReader() {
+}
+
+bool ChannelReader::ProcessIncomingMessages() {
+ while (true) {
+ int bytes_read = 0;
+ ReadState read_state = ReadData(input_buf_, Channel::kReadBufferSize,
+ &bytes_read);
+ if (read_state == READ_FAILED)
+ return false;
+ if (read_state == READ_PENDING)
+ return true;
+
+ DCHECK(bytes_read > 0);
+ if (!DispatchInputData(input_buf_, bytes_read))
+ return false;
+ }
+}
+
+bool ChannelReader::AsyncReadComplete(int bytes_read) {
+ return DispatchInputData(input_buf_, bytes_read);
+}
+
+bool ChannelReader::IsHelloMessage(const Message& m) const {
+ return m.routing_id() == MSG_ROUTING_NONE &&
+ m.type() == Channel::HELLO_MESSAGE_TYPE;
+}
+
+bool ChannelReader::DispatchInputData(const char* input_data,
+ int input_data_len) {
+ const char* p;
+ const char* end;
+
+ // Possibly combine with the overflow buffer to make a larger buffer.
+ if (input_overflow_buf_.empty()) {
+ p = input_data;
+ end = input_data + input_data_len;
+ } else {
+ if (input_overflow_buf_.size() >
+ Channel::kMaximumMessageSize - input_data_len) {
+ input_overflow_buf_.clear();
+ LOG(ERROR) << "IPC message is too big";
+ return false;
+ }
+ input_overflow_buf_.append(input_data, input_data_len);
+ p = input_overflow_buf_.data();
+ end = p + input_overflow_buf_.size();
+ }
+
+ // Dispatch all complete messages in the data buffer.
+ while (p < end) {
+ const char* message_tail = Message::FindNext(p, end);
+ if (message_tail) {
+ int len = static_cast<int>(message_tail - p);
+ Message m(p, len);
+ if (!WillDispatchInputMessage(&m))
+ return false;
+
+ if (IsHelloMessage(m))
+ HandleHelloMessage(m);
+ else
+ listener_->OnMessageReceived(m);
+ p = message_tail;
+ } else {
+ // Last message is partial.
+ break;
+ }
+ }
+
+ // Save any partial data in the overflow buffer.
+ input_overflow_buf_.assign(p, end - p);
+
+ if (input_overflow_buf_.empty() && !DidEmptyInputBuffers())
+ return false;
+ return true;
+}
+
+
+} // namespace internal
+} // namespace IPC
diff --git a/ipc/ipc_channel_reader.h b/ipc/ipc_channel_reader.h
new file mode 100644
index 0000000..192ca2d
--- /dev/null
+++ b/ipc/ipc_channel_reader.h
@@ -0,0 +1,105 @@
+// Copyright (c) 2012 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef IPC_IPC_CHANNEL_READER_H_
+#define IPC_IPC_CHANNEL_READER_H_
+
+#include "base/basictypes.h"
+#include "ipc/ipc_channel.h"
+
+namespace IPC {
+namespace internal {
+
+// This class provides common pipe reading functionality for the
+// platform-specific IPC channel implementations.
+//
+// It does the common input buffer management and message dispatch, while the
+// platform-specific parts provide the pipe management through a virtual
+// interface implemented on a per-platform basis.
+//
+// Note that there is no "writer" corresponding to this because the code for
+// writing to the channel is much simpler and has very little common
+// functionality that would benefit from being factored out. If we add
+// something like that in the future, it would be more appropriate to add it
+// here (and rename appropriately) rather than writing a different class.
+class ChannelReader {
+ public:
+ explicit ChannelReader(Channel::Listener* listener);
+ virtual ~ChannelReader();
+
+ void set_listener(Channel::Listener* listener) { listener_ = listener; }
+
+ // Call to process messages received from the IPC connection and dispatch
+ // them. Returns false on channel error. True indicates that everything
+ // succeeded, although there may not have been any messages processed.
+ bool ProcessIncomingMessages();
+
+ // Handles asynchronously read data.
+ //
+ // Optionally call this after returning READ_PENDING from ReadData to
+ // indicate that buffer was filled with the given number of bytes of
+ // data. See ReadData for more.
+ bool AsyncReadComplete(int bytes_read);
+
+ // Returns true if the given message is the "hello" message sent on channel
+ // set-up.
+ bool IsHelloMessage(const Message& m) const;
+
+ protected:
+ enum ReadState { READ_SUCCEEDED, READ_FAILED, READ_PENDING };
+
+ Channel::Listener* listener() const { return listener_; }
+
+ // Populates the given buffer with data from the pipe.
+ //
+ // Returns the state of the read. On READ_SUCCESS, the number of bytes
+ // read will be placed into |*bytes_read| (which can be less than the
+ // buffer size). On READ_FAILED, the channel will be closed.
+ //
+ // If the return value is READ_PENDING, it means that there was no data
+ // ready for reading. The implementation is then responsible for either
+ // calling AsyncReadComplete with the number of bytes read into the
+ // buffer, or ProcessIncomingMessages to try the read again (depending
+ // on whether the platform's async I/O is "try again" or "write
+ // asynchronously into your buffer").
+ virtual ReadState ReadData(char* buffer, int buffer_len, int* bytes_read) = 0;
+
+ // Loads the required file desciptors into the given message. Returns true
+ // on success. False means a fatal channel error.
+ //
+ // This will read from the input_fds_ and read more handles from the FD
+ // pipe if necessary.
+ virtual bool WillDispatchInputMessage(Message* msg) = 0;
+
+ // Performs post-dispatch checks. Called when all input buffers are empty,
+ // though there could be more data ready to be read from the OS.
+ virtual bool DidEmptyInputBuffers() = 0;
+
+ // Handles the first message sent over the pipe which contains setup info.
+ virtual void HandleHelloMessage(const Message& msg) = 0;
+
+ private:
+ // Takes the given data received from the IPC channel and dispatches any
+ // fully completed messages.
+ //
+ // Returns true on success. False means channel error.
+ bool DispatchInputData(const char* input_data, int input_data_len);
+
+ Channel::Listener* listener_;
+
+ // We read from the pipe into this buffer. Managed by DispatchInputData, do
+ // not access directly outside that function.
+ char input_buf_[Channel::kReadBufferSize];
+
+ // Large messages that span multiple pipe buffers, get built-up using
+ // this buffer.
+ std::string input_overflow_buf_;
+
+ DISALLOW_COPY_AND_ASSIGN(ChannelReader);
+};
+
+} // namespace internal
+} // namespace IPC
+
+#endif // IPC_IPC_CHANNEL_READER_H_
diff --git a/ipc/ipc_channel_win.cc b/ipc/ipc_channel_win.cc
index b2a3421..ef974ee 100644
--- a/ipc/ipc_channel_win.cc
+++ b/ipc/ipc_channel_win.cc
@@ -30,10 +30,10 @@ Channel::ChannelImpl::State::~State() {
Channel::ChannelImpl::ChannelImpl(const IPC::ChannelHandle &channel_handle,
Mode mode, Listener* listener)
- : ALLOW_THIS_IN_INITIALIZER_LIST(input_state_(this)),
+ : ChannelReader(listener),
+ ALLOW_THIS_IN_INITIALIZER_LIST(input_state_(this)),
ALLOW_THIS_IN_INITIALIZER_LIST(output_state_(this)),
pipe_(INVALID_HANDLE_VALUE),
- listener_(listener),
waiting_connect_(mode & MODE_SERVER_FLAG),
processing_incoming_(false),
ALLOW_THIS_IN_INITIALIZER_LIST(weak_factory_(this)) {
@@ -113,7 +113,7 @@ Channel::ChannelImpl::ReadState Channel::ChannelImpl::ReadData(
return READ_FAILED;
DWORD bytes_read = 0;
- BOOL ok = ReadFile(pipe_, input_buf_, Channel::kReadBufferSize,
+ BOOL ok = ReadFile(pipe_, buffer, buffer_len,
&bytes_read, &input_state_.context.overlapped);
if (!ok) {
DWORD err = GetLastError();
@@ -138,75 +138,20 @@ Channel::ChannelImpl::ReadState Channel::ChannelImpl::ReadData(
}
bool Channel::ChannelImpl::WillDispatchInputMessage(Message* msg) {
+ // We don't need to do anything here.
return true;
}
void Channel::ChannelImpl::HandleHelloMessage(const Message& msg) {
// The hello message contains one parameter containing the PID.
- listener_->OnChannelConnected(MessageIterator(msg).NextInt());
+ listener()->OnChannelConnected(MessageIterator(msg).NextInt());
}
bool Channel::ChannelImpl::DidEmptyInputBuffers() {
+ // We don't need to do anything here.
return true;
}
-bool Channel::ChannelImpl::DispatchInputData(const char* input_data,
- int input_data_len) {
- const char* p;
- const char* end;
-
- // Possibly combine with the overflow buffer to make a larger buffer.
- if (input_overflow_buf_.empty()) {
- p = input_data;
- end = input_data + input_data_len;
- } else {
- if (input_overflow_buf_.size() >
- kMaximumMessageSize - input_data_len) {
- input_overflow_buf_.clear();
- LOG(ERROR) << "IPC message is too big";
- return false;
- }
- input_overflow_buf_.append(input_data, input_data_len);
- p = input_overflow_buf_.data();
- end = p + input_overflow_buf_.size();
- }
-
- // Dispatch all complete messages in the data buffer.
- while (p < end) {
- const char* message_tail = Message::FindNext(p, end);
- if (message_tail) {
- int len = static_cast<int>(message_tail - p);
- Message m(p, len);
- if (!WillDispatchInputMessage(&m))
- return false;
-
- if (IsHelloMessage(m))
- HandleHelloMessage(m);
- else
- listener_->OnMessageReceived(m);
- p = message_tail;
- } else {
- // Last message is partial.
- break;
- }
- }
-
- // Save any partial data in the overflow buffer.
- input_overflow_buf_.assign(p, end - p);
-
- if (input_overflow_buf_.empty() && !DidEmptyInputBuffers())
- return false;
- return true;
-}
-
-bool Channel::ChannelImpl::IsHelloMessage(const Message& m) const {
- return m.routing_id() == MSG_ROUTING_NONE && m.type() == HELLO_MESSAGE_TYPE;
-}
-
-bool Channel::ChannelImpl::AsyncReadComplete(int bytes_read) {
- return DispatchInputData(input_buf_, bytes_read);
-}
-
// static
const std::wstring Channel::ChannelImpl::PipeName(
const std::string& channel_id) {
@@ -360,21 +305,6 @@ bool Channel::ChannelImpl::ProcessConnection() {
return true;
}
-bool Channel::ChannelImpl::ProcessIncomingMessages() {
- while (true) {
- int bytes_read = 0;
- ReadState read_state = ReadData(input_buf_, Channel::kReadBufferSize,
- &bytes_read);
- if (read_state == READ_FAILED)
- return false;
- if (read_state == READ_PENDING)
- return true;
- DCHECK(bytes_read > 0);
- if (!DispatchInputData(input_buf_, bytes_read))
- return false;
- }
-}
-
bool Channel::ChannelImpl::ProcessOutgoingMessages(
MessageLoopForIO::IOContext* context,
DWORD bytes_written) {
@@ -475,7 +405,7 @@ void Channel::ChannelImpl::OnIOCompleted(MessageLoopForIO::IOContext* context,
if (!ok && INVALID_HANDLE_VALUE != pipe_) {
// We don't want to re-enter Close().
Close();
- listener_->OnChannelError();
+ listener()->OnChannelError();
}
}
diff --git a/ipc/ipc_channel_win.h b/ipc/ipc_channel_win.h
index b857eb3..7a48695 100644
--- a/ipc/ipc_channel_win.h
+++ b/ipc/ipc_channel_win.h
@@ -14,6 +14,7 @@
#include "base/memory/scoped_ptr.h"
#include "base/memory/weak_ptr.h"
#include "base/message_loop.h"
+#include "ipc/ipc_channel_reader.h"
namespace base {
class NonThreadSafe;
@@ -21,7 +22,8 @@ class NonThreadSafe;
namespace IPC {
-class Channel::ChannelImpl : public MessageLoopForIO::IOHandler {
+class Channel::ChannelImpl : public internal::ChannelReader,
+ public MessageLoopForIO::IOHandler {
public:
// Mirror methods of Channel, see ipc_channel.h for description.
ChannelImpl(const IPC::ChannelHandle &channel_handle, Mode mode,
@@ -29,38 +31,22 @@ class Channel::ChannelImpl : public MessageLoopForIO::IOHandler {
~ChannelImpl();
bool Connect();
void Close();
- void set_listener(Listener* listener) { listener_ = listener; }
bool Send(Message* message);
static bool IsNamedServerInitialized(const std::string& channel_id);
private:
- enum ReadState { READ_SUCCEEDED, READ_FAILED, READ_PENDING };
-
- // This will become the virtual interface implemented by this class to
- // handle platform-specific reading.
- // TODO(brettw) finish refactoring.
- ReadState ReadData(char* buffer, int buffer_len, int* bytes_read);
- bool WillDispatchInputMessage(Message* msg);
- void HandleHelloMessage(const Message& msg);
- bool DidEmptyInputBuffers();
-
- bool DispatchInputData(const char* input_data, int input_data_len);
-
- // Returns true if the given message is the hello message.
- bool IsHelloMessage(const Message& m) const;
-
- // Handles asynchronously read data.
- //
- // Optionally call this after returning READ_PENDING from ReadData to
- // indicate that buffer was filled with the given number of bytes of
- // data. See ReadData for more.
- bool AsyncReadComplete(int bytes_read);
+ // ChannelReader implementation.
+ virtual ReadState ReadData(char* buffer,
+ int buffer_len,
+ int* bytes_read) OVERRIDE;
+ virtual bool WillDispatchInputMessage(Message* msg) OVERRIDE;
+ bool DidEmptyInputBuffers() OVERRIDE;
+ virtual void HandleHelloMessage(const Message& msg) OVERRIDE;
static const std::wstring PipeName(const std::string& channel_id);
bool CreatePipe(const IPC::ChannelHandle &channel_handle, Mode mode);
bool ProcessConnection();
- bool ProcessIncomingMessages();
bool ProcessOutgoingMessages(MessageLoopForIO::IOContext* context,
DWORD bytes_written);
@@ -80,18 +66,9 @@ class Channel::ChannelImpl : public MessageLoopForIO::IOHandler {
HANDLE pipe_;
- Listener* listener_;
-
// Messages to be sent are queued here.
std::queue<Message*> output_queue_;
- // We read from the pipe into this buffer
- char input_buf_[Channel::kReadBufferSize];
-
- // Large messages that span multiple pipe buffers, get built-up using
- // this buffer.
- std::string input_overflow_buf_;
-
// In server-mode, we have to wait for the client to connect before we
// can begin reading. We make use of the input_state_ when performing
// the connect operation in overlapped mode.