diff options
author | qsr@chromium.org <qsr@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2014-07-18 18:51:05 +0000 |
---|---|---|
committer | qsr@chromium.org <qsr@chromium.org@0039d316-1c4b-4281-b951-d872f2087c98> | 2014-07-18 18:51:05 +0000 |
commit | acc18a83656e79649ad74ed44cb9d6e9a7df0049 (patch) | |
tree | 9c6a1440826591ba10c2b3e76464eacdbfc5a0e8 /mojo/bindings/java/src/org | |
parent | d07e19ac042b1f423a72803ecea8e7f0c4ee2276 (diff) | |
download | chromium_src-acc18a83656e79649ad74ed44cb9d6e9a7df0049.zip chromium_src-acc18a83656e79649ad74ed44cb9d6e9a7df0049.tar.gz chromium_src-acc18a83656e79649ad74ed44cb9d6e9a7df0049.tar.bz2 |
Adding a router class to handle messages that expect responses.
This also introduce the notion of message header to allow routing.
R=rmcilroy@chromium.org
Committed: https://src.chromium.org/viewvc/chrome?view=rev&revision=283767
Review URL: https://codereview.chromium.org/371603003
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@284170 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'mojo/bindings/java/src/org')
12 files changed, 787 insertions, 54 deletions
diff --git a/mojo/bindings/java/src/org/chromium/mojo/bindings/AutoCloseableRouter.java b/mojo/bindings/java/src/org/chromium/mojo/bindings/AutoCloseableRouter.java new file mode 100644 index 0000000..475db27 --- /dev/null +++ b/mojo/bindings/java/src/org/chromium/mojo/bindings/AutoCloseableRouter.java @@ -0,0 +1,116 @@ +// Copyright 2014 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +package org.chromium.mojo.bindings; + +import org.chromium.mojo.system.Core; +import org.chromium.mojo.system.MessagePipeHandle; + +import java.util.concurrent.Executor; + +/** + * Wrapper around {@link Router} that will close the connection when not referenced anymore. + */ +class AutoCloseableRouter implements Router { + + /** + * The underlying router. + */ + private final Router mRouter; + + /** + * The executor to close the underlying router. + */ + private final Executor mExecutor; + + /** + * Flags to keep track if this router has been correctly closed. + */ + private boolean mClosed; + + /** + * Constructor. + */ + public AutoCloseableRouter(Core core, Router router) { + mRouter = router; + mExecutor = ExecutorFactory.getExecutorForCurrentThread(core); + } + + /** + * @see Router#setIncomingMessageReceiver(MessageReceiverWithResponder) + */ + @Override + public void setIncomingMessageReceiver(MessageReceiverWithResponder incomingMessageReceiver) { + mRouter.setIncomingMessageReceiver(incomingMessageReceiver); + } + + /** + * @see HandleOwner#passHandle() + */ + @Override + public MessagePipeHandle passHandle() { + return mRouter.passHandle(); + } + + /** + * @see MessageReceiver#accept(MessageWithHeader) + */ + @Override + public boolean accept(MessageWithHeader message) { + return mRouter.accept(message); + } + + /** + * @see MessageReceiverWithResponder#acceptWithResponder(MessageWithHeader, MessageReceiver) + */ + @Override + public boolean acceptWithResponder(MessageWithHeader message, MessageReceiver responder) { + return mRouter.acceptWithResponder(message, responder); + + } + + /** + * @see Router#start() + */ + @Override + public void start() { + mRouter.start(); + } + + /** + * @see Router#setErrorHandler(ConnectionErrorHandler) + */ + @Override + public void setErrorHandler(ConnectionErrorHandler errorHandler) { + mRouter.setErrorHandler(errorHandler); + } + + /** + * @see java.io.Closeable#close() + */ + @Override + public void close() { + mRouter.close(); + mClosed = true; + } + + /** + * @see Object#finalize() + */ + @Override + protected void finalize() throws Throwable { + if (!mClosed) { + mExecutor.execute(new Runnable() { + + @Override + public void run() { + close(); + } + }); + throw new IllegalStateException("Warning: Router objects should be explicitly closed " + + "when no longer required otherwise you may leak handles."); + } + super.finalize(); + } +} diff --git a/mojo/bindings/java/src/org/chromium/mojo/bindings/BindingsHelper.java b/mojo/bindings/java/src/org/chromium/mojo/bindings/BindingsHelper.java index 0288678..d947968 100644 --- a/mojo/bindings/java/src/org/chromium/mojo/bindings/BindingsHelper.java +++ b/mojo/bindings/java/src/org/chromium/mojo/bindings/BindingsHelper.java @@ -4,6 +4,9 @@ package org.chromium.mojo.bindings; +import org.chromium.mojo.system.AsyncWaiter; +import org.chromium.mojo.system.Handle; + /** * Helper functions. */ @@ -73,4 +76,17 @@ public class BindingsHelper { private static boolean isSurrogate(char c) { return c >= Character.MIN_SURROGATE && c < (Character.MAX_SURROGATE + 1); } + + /** + * Returns an {@link AsyncWaiter} to use with the given handle, or <code>null</code> if none if + * available. + */ + static AsyncWaiter getDefaultAsyncWaiterForHandle(Handle handle) { + if (handle.getCore() != null) { + return handle.getCore().getDefaultAsyncWaiter(); + } else { + return null; + } + } + } diff --git a/mojo/bindings/java/src/org/chromium/mojo/bindings/ConnectionErrorHandler.java b/mojo/bindings/java/src/org/chromium/mojo/bindings/ConnectionErrorHandler.java new file mode 100644 index 0000000..abb0bdf --- /dev/null +++ b/mojo/bindings/java/src/org/chromium/mojo/bindings/ConnectionErrorHandler.java @@ -0,0 +1,15 @@ +// Copyright 2014 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +package org.chromium.mojo.bindings; + +import org.chromium.mojo.system.MojoException; + +/** + * A {@link ConnectionErrorHandler} is notified of an error happening while using the bindings over + * message pipes. + */ +interface ConnectionErrorHandler { + public void onConnectionError(MojoException e); +} diff --git a/mojo/bindings/java/src/org/chromium/mojo/bindings/Connector.java b/mojo/bindings/java/src/org/chromium/mojo/bindings/Connector.java index f978ba4..17cc7ed 100644 --- a/mojo/bindings/java/src/org/chromium/mojo/bindings/Connector.java +++ b/mojo/bindings/java/src/org/chromium/mojo/bindings/Connector.java @@ -21,13 +21,6 @@ import org.chromium.mojo.system.MojoResult; public class Connector implements MessageReceiver, HandleOwner<MessagePipeHandle> { /** - * An {@link ErrorHandler} is notified of error happening while using the message pipe. - */ - interface ErrorHandler { - public void onError(MojoException e); - } - - /** * The callback that is notified when the state of the owned handle changes. */ private final AsyncWaiterCallback mAsyncWaiterCallback = new AsyncWaiterCallback(); @@ -55,14 +48,14 @@ public class Connector implements MessageReceiver, HandleOwner<MessagePipeHandle /** * The error handler to notify of errors. */ - private ErrorHandler mErrorHandler; + private ConnectionErrorHandler mErrorHandler; /** * Create a new connector over a |messagePipeHandle|. The created connector will use the default * {@link AsyncWaiter} from the {@link Core} implementation of |messagePipeHandle|. */ public Connector(MessagePipeHandle messagePipeHandle) { - this(messagePipeHandle, getDefaultAsyncWaiterForMessagePipe(messagePipeHandle)); + this(messagePipeHandle, BindingsHelper.getDefaultAsyncWaiterForHandle(messagePipeHandle)); } /** @@ -83,9 +76,10 @@ public class Connector implements MessageReceiver, HandleOwner<MessagePipeHandle } /** - * Set the {@link ErrorHandler} that will be notified of errors on the owned message pipe. + * Set the {@link ConnectionErrorHandler} that will be notified of errors on the owned message + * pipe. */ - public void setErrorHandler(ErrorHandler errorHandler) { + public void setErrorHandler(ConnectionErrorHandler errorHandler) { mErrorHandler = errorHandler; } @@ -98,13 +92,13 @@ public class Connector implements MessageReceiver, HandleOwner<MessagePipeHandle } /** - * @see MessageReceiver#accept(Message) + * @see MessageReceiver#accept(MessageWithHeader) */ @Override - public boolean accept(Message message) { + public boolean accept(MessageWithHeader message) { try { - mMessagePipeHandle.writeMessage(message.buffer, message.handles, - MessagePipeHandle.WriteFlags.NONE); + mMessagePipeHandle.writeMessage(message.getMessage().buffer, + message.getMessage().handles, MessagePipeHandle.WriteFlags.NONE); return true; } catch (MojoException e) { onError(e); @@ -133,15 +127,6 @@ public class Connector implements MessageReceiver, HandleOwner<MessagePipeHandle mMessagePipeHandle.close(); } - private static AsyncWaiter getDefaultAsyncWaiterForMessagePipe( - MessagePipeHandle messagePipeHandle) { - if (messagePipeHandle.getCore() != null) { - return messagePipeHandle.getCore().getDefaultAsyncWaiter(); - } else { - return null; - } - } - private class AsyncWaiterCallback implements AsyncWaiter.Callback { /** @@ -178,7 +163,7 @@ public class Connector implements MessageReceiver, HandleOwner<MessagePipeHandle mCancellable = null; close(); if (mErrorHandler != null) { - mErrorHandler.onError(exception); + mErrorHandler.onConnectionError(exception); } } @@ -202,7 +187,7 @@ public class Connector implements MessageReceiver, HandleOwner<MessagePipeHandle int result; do { try { - result = Message.readAndDispatchMessage(mMessagePipeHandle, + result = MessageWithHeader.readAndDispatchMessage(mMessagePipeHandle, mIncomingMessageReceiver); } catch (MojoException e) { onError(e); @@ -222,5 +207,4 @@ public class Connector implements MessageReceiver, HandleOwner<MessagePipeHandle mCancellable = null; } } - } diff --git a/mojo/bindings/java/src/org/chromium/mojo/bindings/Message.java b/mojo/bindings/java/src/org/chromium/mojo/bindings/Message.java index ee68707..07bd2bd 100644 --- a/mojo/bindings/java/src/org/chromium/mojo/bindings/Message.java +++ b/mojo/bindings/java/src/org/chromium/mojo/bindings/Message.java @@ -6,8 +6,6 @@ package org.chromium.mojo.bindings; import org.chromium.mojo.system.Handle; import org.chromium.mojo.system.MessagePipeHandle; -import org.chromium.mojo.system.MessagePipeHandle.ReadMessageResult; -import org.chromium.mojo.system.MojoResult; import java.nio.ByteBuffer; import java.util.List; @@ -38,26 +36,4 @@ public final class Message { this.buffer = buffer; this.handles = handles; } - - /** - * Read a message, and pass it to the given |MessageReceiver| if not null. If the - * |MessageReceiver| is null, the message is lost. - * - * @param receiver The {@link MessageReceiver} that will receive the read {@link Message}. Can - * be <code>null</code>, in which case the message is discarded. - */ - public static int readAndDispatchMessage(MessagePipeHandle handle, MessageReceiver receiver) { - // TODO(qsr) Allow usage of a pool of pre-allocated buffer for performance. - ReadMessageResult result = handle.readMessage(null, 0, MessagePipeHandle.ReadFlags.NONE); - if (result.getMojoResult() != MojoResult.RESOURCE_EXHAUSTED) { - return result.getMojoResult(); - } - ByteBuffer buffer = ByteBuffer.allocateDirect(result.getMessageSize()); - result = handle.readMessage(buffer, result.getHandlesCount(), - MessagePipeHandle.ReadFlags.NONE); - if (receiver != null && result.getMojoResult() == MojoResult.OK) { - receiver.accept(new Message(buffer, result.getHandles())); - } - return result.getMojoResult(); - } } diff --git a/mojo/bindings/java/src/org/chromium/mojo/bindings/MessageHeader.java b/mojo/bindings/java/src/org/chromium/mojo/bindings/MessageHeader.java new file mode 100644 index 0000000..2eb7d13 --- /dev/null +++ b/mojo/bindings/java/src/org/chromium/mojo/bindings/MessageHeader.java @@ -0,0 +1,249 @@ +// Copyright 2014 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +package org.chromium.mojo.bindings; + +import org.chromium.mojo.bindings.Struct.DataHeader; + +import java.nio.ByteBuffer; + +/** + * Header information for a message. + */ +public class MessageHeader { + + private static final int SIMPLE_MESSAGE_SIZE = 16; + private static final int SIMPLE_MESSAGE_NUM_FIELDS = 2; + private static final DataHeader SIMPLE_MESSAGE_STRUCT_INFO = + new DataHeader(SIMPLE_MESSAGE_SIZE, SIMPLE_MESSAGE_NUM_FIELDS); + + private static final int MESSAGE_WITH_REQUEST_ID_SIZE = 24; + private static final int MESSAGE_WITH_REQUEST_ID_NUM_FIELDS = 3; + private static final DataHeader MESSAGE_WITH_REQUEST_ID_STRUCT_INFO = + new DataHeader(MESSAGE_WITH_REQUEST_ID_SIZE, MESSAGE_WITH_REQUEST_ID_NUM_FIELDS); + + private static final int TYPE_OFFSET = 8; + private static final int FLAGS_OFFSET = 12; + private static final int REQUEST_ID_OFFSET = 16; + + /** + * Flag for a header of a message that expected a response. + */ + public static final int MESSAGE_EXPECTS_RESPONSE_FLAG = 1 << 0; + + /** + * Flag for a header of a message that is a response. + */ + public static final int MESSAGE_IS_RESPONSE_FLAG = 1 << 1; + + private final DataHeader mDataHeader; + private final int mType; + private final int mFlags; + private long mRequestId; + + /** + * Constructor for the header of a message which does not have a response. + */ + public MessageHeader(int type) { + mDataHeader = SIMPLE_MESSAGE_STRUCT_INFO; + mType = type; + mFlags = 0; + mRequestId = 0; + } + + /** + * Constructor for the header of a message which have a response or being itself a response. + */ + public MessageHeader(int type, int flags, long requestId) { + assert mustHaveRequestId(flags); + mDataHeader = MESSAGE_WITH_REQUEST_ID_STRUCT_INFO; + mType = type; + mFlags = flags; + mRequestId = requestId; + } + + /** + * Constructor, parsing the header from a message. Should only be used by {@link Message} + * itself. + */ + MessageHeader(Message message) { + Decoder decoder = new Decoder(message); + mDataHeader = decoder.readDataHeader(); + validateDataHeader(mDataHeader); + mType = decoder.readInt(TYPE_OFFSET); + mFlags = decoder.readInt(FLAGS_OFFSET); + if (mustHaveRequestId(mFlags)) { + if (mDataHeader.size < MESSAGE_WITH_REQUEST_ID_SIZE) { + throw new DeserializationException("Incorrect message size, expecting at least " + + MESSAGE_WITH_REQUEST_ID_SIZE + + " for a message with a request identifier, but got: " + mDataHeader.size); + + } + mRequestId = decoder.readLong(REQUEST_ID_OFFSET); + } else { + mRequestId = 0; + } + } + + /** + * Returns the size in bytes of the serialization of the header. + */ + public int getSize() { + return mDataHeader.size; + } + + /** + * Returns the type of the message. + */ + public int getType() { + return mType; + } + + /** + * Returns the flags associated to the message. + */ + public int getFlags() { + return mFlags; + } + + /** + * Returns if the message has the given flag. + */ + public boolean hasFlag(int flag) { + return (mFlags & flag) == flag; + } + + /** + * Returns if the message has a request id. + */ + public boolean hasRequestId() { + return mustHaveRequestId(mFlags); + } + + /** + * Return the request id for the message. Must only be called if the message has a request id. + */ + public long getRequestId() { + assert hasRequestId(); + return mRequestId; + } + + /** + * Encode the header. + */ + public void encode(Encoder encoder) { + encoder.encode(mDataHeader); + encoder.encode(getType(), TYPE_OFFSET); + encoder.encode(getFlags(), FLAGS_OFFSET); + if (hasRequestId()) { + encoder.encode(getRequestId(), REQUEST_ID_OFFSET); + } + } + + /** + * Returns true if the header has the expected flags. Only considers flags this class knows + * about in order to allow this class to work with future version of the header format. + */ + public boolean validateHeader(int expectedFlags) { + int knownFlags = getFlags() & (MESSAGE_EXPECTS_RESPONSE_FLAG | MESSAGE_IS_RESPONSE_FLAG); + return knownFlags == expectedFlags; + } + + /** + * Returns true if the header has the expected type and flags. Only consider flags this class + * knows about in order to allow this class to work with future version of the header format. + */ + public boolean validateHeader(int expectedType, int expectedFlags) { + return getType() == expectedType && validateHeader(expectedFlags); + } + + /** + * @see Object#hashCode() + */ + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((mDataHeader == null) ? 0 : mDataHeader.hashCode()); + result = prime * result + mFlags; + result = prime * result + (int) (mRequestId ^ (mRequestId >>> 32)); + result = prime * result + mType; + return result; + } + + /** + * @see Object#equals(Object) + */ + @Override + public boolean equals(Object object) { + if (object == this) + return true; + if (object == null) + return false; + if (getClass() != object.getClass()) + return false; + + MessageHeader other = (MessageHeader) object; + if (mDataHeader == null) { + if (other.mDataHeader != null) { + return false; + } + } else { + if (!mDataHeader.equals(other.mDataHeader)) { + return false; + } + } + + return (mFlags == other.mFlags && + mRequestId == other.mRequestId && + mType == other.mType); + } + + /** + * Set the request id on the message contained in the given buffer. + */ + void setRequestId(ByteBuffer buffer, long requestId) { + assert mustHaveRequestId(buffer.getInt(FLAGS_OFFSET)); + buffer.putLong(REQUEST_ID_OFFSET, requestId); + mRequestId = requestId; + } + + /** + * Returns whether a message with the given flags must have a request Id. + */ + private static boolean mustHaveRequestId(int flags) { + return (flags & (MESSAGE_EXPECTS_RESPONSE_FLAG | MESSAGE_IS_RESPONSE_FLAG)) != 0; + } + + /** + * Validate that the given {@link DataHeader} can be the data header of a message header. + */ + private static void validateDataHeader(DataHeader dataHeader) { + if (dataHeader.numFields < SIMPLE_MESSAGE_NUM_FIELDS) { + throw new DeserializationException( + "Incorrect number of fields, expecting at least " + SIMPLE_MESSAGE_NUM_FIELDS + + ", but got: " + dataHeader.numFields); + } + if (dataHeader.size < SIMPLE_MESSAGE_SIZE) { + throw new DeserializationException( + "Incorrect message size, expecting at least " + SIMPLE_MESSAGE_SIZE + + ", but got: " + dataHeader.size); + } + if (dataHeader.numFields == SIMPLE_MESSAGE_NUM_FIELDS + && dataHeader.size != SIMPLE_MESSAGE_SIZE) { + throw new DeserializationException( + "Incorrect message size for a message with " + SIMPLE_MESSAGE_NUM_FIELDS + + " fields, expecting " + SIMPLE_MESSAGE_SIZE + ", but got: " + + dataHeader.size); + } + if (dataHeader.numFields == MESSAGE_WITH_REQUEST_ID_NUM_FIELDS + && dataHeader.size != MESSAGE_WITH_REQUEST_ID_SIZE) { + throw new DeserializationException( + "Incorrect message size for a message with " + + MESSAGE_WITH_REQUEST_ID_NUM_FIELDS + " fields, expecting " + + MESSAGE_WITH_REQUEST_ID_SIZE + ", but got: " + dataHeader.size); + } + } + +} diff --git a/mojo/bindings/java/src/org/chromium/mojo/bindings/MessageReceiver.java b/mojo/bindings/java/src/org/chromium/mojo/bindings/MessageReceiver.java index 1736bc6..d3cd59e 100644 --- a/mojo/bindings/java/src/org/chromium/mojo/bindings/MessageReceiver.java +++ b/mojo/bindings/java/src/org/chromium/mojo/bindings/MessageReceiver.java @@ -10,8 +10,8 @@ package org.chromium.mojo.bindings; public interface MessageReceiver { /** - * Receive a {@link Message}. The {@link MessageReceiver} is allowed to mutable the message. - * Returns |true| if the message has been handled, |false| otherwise. + * Receive a {@link MessageWithHeader}. The {@link MessageReceiver} is allowed to mutable the + * message. Returns |true| if the message has been handled, |false| otherwise. */ - boolean accept(Message message); + boolean accept(MessageWithHeader message); } diff --git a/mojo/bindings/java/src/org/chromium/mojo/bindings/MessageReceiverWithResponder.java b/mojo/bindings/java/src/org/chromium/mojo/bindings/MessageReceiverWithResponder.java new file mode 100644 index 0000000..afefad4 --- /dev/null +++ b/mojo/bindings/java/src/org/chromium/mojo/bindings/MessageReceiverWithResponder.java @@ -0,0 +1,21 @@ +// Copyright 2014 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +package org.chromium.mojo.bindings; + +/** + * A {@link MessageReceiver} that can also handle the handle the response message generated from the + * given message. + */ +public interface MessageReceiverWithResponder extends MessageReceiver { + + /** + * A variant on {@link #accept(MessageWithHeader)} that registers a {@link MessageReceiver} + * (known as the responder) to handle the response message generated from the given message. The + * responder's {@link #accept(MessageWithHeader)} method may be called as part of the call to + * {@link #acceptWithResponder(MessageWithHeader, MessageReceiver)}, or some time after its + * return. + */ + boolean acceptWithResponder(MessageWithHeader message, MessageReceiver responder); +} diff --git a/mojo/bindings/java/src/org/chromium/mojo/bindings/MessageWithHeader.java b/mojo/bindings/java/src/org/chromium/mojo/bindings/MessageWithHeader.java new file mode 100644 index 0000000..5c826b6 --- /dev/null +++ b/mojo/bindings/java/src/org/chromium/mojo/bindings/MessageWithHeader.java @@ -0,0 +1,98 @@ +// Copyright 2014 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +package org.chromium.mojo.bindings; + +import org.chromium.mojo.system.MessagePipeHandle; +import org.chromium.mojo.system.MessagePipeHandle.ReadMessageResult; +import org.chromium.mojo.system.MojoResult; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +/** + * Represents a {@link Message} which contains a {@link MessageHeader}. Deals with parsing the + * {@link MessageHeader} for a message. + */ +public class MessageWithHeader { + + private final Message mBaseMessage; + private final MessageHeader mHeader; + private Message mPayload; + + /** + * Reinterpret the given |message| as a message with the given |header|. The |message| must + * contain the |header| as the start of its raw data. + */ + public MessageWithHeader(Message baseMessage, MessageHeader header) { + assert header.equals(new org.chromium.mojo.bindings.MessageHeader(baseMessage)); + this.mBaseMessage = baseMessage; + this.mHeader = header; + } + + /** + * Reinterpret the given |message| as a message with a header. The |message| must contain a + * header as the start of it's raw data, which will be parsed by this constructor. + */ + public MessageWithHeader(Message baseMessage) { + this(baseMessage, new org.chromium.mojo.bindings.MessageHeader(baseMessage)); + } + + /** + * Returns the header of the given message. This will throw a {@link DeserializationException} + * if the start of the message is not a valid header. + */ + public MessageHeader getHeader() { + return mHeader; + } + + /** + * Returns the payload of the message. + */ + public Message getPayload() { + if (mPayload == null) { + ByteBuffer truncatedBuffer = ((ByteBuffer) mBaseMessage.buffer.position( + getHeader().getSize())).slice(); + truncatedBuffer.order(ByteOrder.nativeOrder()); + mPayload = new Message(truncatedBuffer, mBaseMessage.handles); + } + return mPayload; + } + + /** + * Returns the raw message. + */ + public Message getMessage() { + return mBaseMessage; + } + + /** + * Set the request identifier on the message. + */ + void setRequestId(long requestId) { + mHeader.setRequestId(mBaseMessage.buffer, requestId); + } + + /** + * Read a message, and pass it to the given |MessageReceiver| if not null. If the + * |MessageReceiver| is null, the message is lost. + * + * @param receiver The {@link MessageReceiver} that will receive the read {@link Message}. Can + * be <code>null</code>, in which case the message is discarded. + */ + public static int readAndDispatchMessage(MessagePipeHandle handle, MessageReceiver receiver) { + // TODO(qsr) Allow usage of a pool of pre-allocated buffer for performance. + ReadMessageResult result = handle.readMessage(null, 0, MessagePipeHandle.ReadFlags.NONE); + if (result.getMojoResult() != MojoResult.RESOURCE_EXHAUSTED) { + return result.getMojoResult(); + } + ByteBuffer buffer = ByteBuffer.allocateDirect(result.getMessageSize()); + result = handle.readMessage(buffer, result.getHandlesCount(), + MessagePipeHandle.ReadFlags.NONE); + if (receiver != null && result.getMojoResult() == MojoResult.OK) { + receiver.accept(new MessageWithHeader(new Message(buffer, result.getHandles()))); + } + return result.getMojoResult(); + } +} diff --git a/mojo/bindings/java/src/org/chromium/mojo/bindings/Router.java b/mojo/bindings/java/src/org/chromium/mojo/bindings/Router.java new file mode 100644 index 0000000..3b8c102 --- /dev/null +++ b/mojo/bindings/java/src/org/chromium/mojo/bindings/Router.java @@ -0,0 +1,37 @@ +// Copyright 2014 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +package org.chromium.mojo.bindings; + +import org.chromium.mojo.system.MessagePipeHandle; + +/** + * A {@link Router} will handle mojo message and forward those to a {@link Connector}. It deals with + * parsing of headers and adding of request ids in order to be able to match a response to a + * request. + */ +public interface Router extends MessageReceiverWithResponder, HandleOwner<MessagePipeHandle> { + + /** + * Start listening for incoming messages. + */ + public void start(); + + /** + * Set the {@link MessageReceiverWithResponder} that will deserialize and use the message + * received from the pipe. + */ + public void setIncomingMessageReceiver(MessageReceiverWithResponder incomingMessageReceiver); + + /** + * Set the handle that will be notified of errors on the message pipe. + */ + public void setErrorHandler(ConnectionErrorHandler errorHandler); + + /** + * @see java.io.Closeable#close() + */ + @Override + public void close(); +} diff --git a/mojo/bindings/java/src/org/chromium/mojo/bindings/RouterImpl.java b/mojo/bindings/java/src/org/chromium/mojo/bindings/RouterImpl.java new file mode 100644 index 0000000..4633a92 --- /dev/null +++ b/mojo/bindings/java/src/org/chromium/mojo/bindings/RouterImpl.java @@ -0,0 +1,178 @@ +// Copyright 2014 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +package org.chromium.mojo.bindings; + +import org.chromium.mojo.system.AsyncWaiter; +import org.chromium.mojo.system.MessagePipeHandle; + +import java.util.HashMap; +import java.util.Map; + +/** + * Implementation of {@link Router}. + */ +public class RouterImpl implements Router { + + /** + * {@link MessageReceiver} used as the {@link Connector} callback. + */ + private class ResponderThunk implements MessageReceiver { + + /** + * @see MessageReceiver#accept(MessageWithHeader) + */ + @Override + public boolean accept(MessageWithHeader message) { + return handleIncomingMessage(message); + } + + } + + /** + * The {@link Connector} which is connected to the handle. + */ + private final Connector mConnector; + + /** + * The {@link MessageReceiverWithResponder} that will consume the messages received from the + * pipe. + */ + private MessageReceiverWithResponder mIncomingMessageReceiver; + + /** + * The next id to use for a request id which needs a response. It is auto-incremented. + */ + private long mNextRequestId = 1; + + /** + * The map from request ids to {@link MessageReceiver} of request currently in flight. + */ + private Map<Long, MessageReceiver> mResponders = new HashMap<Long, MessageReceiver>(); + + /** + * Constructor that will use the default {@link AsyncWaiter}. + * + * @param messagePipeHandle The {@link MessagePipeHandle} to route message for. + */ + public RouterImpl(MessagePipeHandle messagePipeHandle) { + this(messagePipeHandle, BindingsHelper.getDefaultAsyncWaiterForHandle(messagePipeHandle)); + } + + /** + * Constructor. + * + * @param messagePipeHandle The {@link MessagePipeHandle} to route message for. + * @param asyncWaiter the {@link AsyncWaiter} to use to get notification of new messages on the + * handle. + */ + public RouterImpl(MessagePipeHandle messagePipeHandle, AsyncWaiter asyncWaiter) { + mConnector = new Connector(messagePipeHandle, asyncWaiter); + mConnector.setIncomingMessageReceiver(new ResponderThunk()); + } + + /** + * @see org.chromium.mojo.bindings.Router#start() + */ + @Override + public void start() { + mConnector.start(); + } + + /** + * @see Router#setIncomingMessageReceiver(MessageReceiverWithResponder) + */ + @Override + public void setIncomingMessageReceiver(MessageReceiverWithResponder incomingMessageReceiver) { + this.mIncomingMessageReceiver = incomingMessageReceiver; + } + + /** + * @see MessageReceiver#accept(MessageWithHeader) + */ + @Override + public boolean accept(MessageWithHeader message) { + // A message without responder is directly forwarded to the connector. + return mConnector.accept(message); + } + + /** + * @see MessageReceiverWithResponder#acceptWithResponder(MessageWithHeader, MessageReceiver) + */ + @Override + public boolean acceptWithResponder(MessageWithHeader message, MessageReceiver responder) { + // Checking the message expects a response. + assert message.getHeader().hasFlag(MessageHeader.MESSAGE_EXPECTS_RESPONSE_FLAG); + + // Compute a request id for being able to route the response. + long requestId = mNextRequestId++; + // Reserve 0 in case we want it to convey special meaning in the future. + if (requestId == 0) { + requestId = mNextRequestId++; + } + if (mResponders.containsKey(requestId)) { + throw new IllegalStateException("Unable to find a new request identifier."); + } + message.setRequestId(requestId); + if (!mConnector.accept(message)) { + return false; + } + // Only keep the responder is the message has been accepted. + mResponders.put(requestId, responder); + return true; + } + + /** + * @see org.chromium.mojo.bindings.HandleOwner#passHandle() + */ + @Override + public MessagePipeHandle passHandle() { + return mConnector.passHandle(); + } + + /** + * @see java.io.Closeable#close() + */ + @Override + public void close() { + mConnector.close(); + } + + /** + * @see Router#setErrorHandler(ConnectionErrorHandler) + */ + @Override + public void setErrorHandler(ConnectionErrorHandler errorHandler) { + mConnector.setErrorHandler(errorHandler); + } + + /** + * Receive a message from the connector. Returns |true| if the message has been handled. + */ + private boolean handleIncomingMessage(MessageWithHeader message) { + MessageHeader header = message.getHeader(); + if (header.hasFlag(MessageHeader.MESSAGE_EXPECTS_RESPONSE_FLAG)) { + if (mIncomingMessageReceiver != null) { + return mIncomingMessageReceiver.acceptWithResponder(message, this); + } + // If we receive a request expecting a response when the client is not + // listening, then we have no choice but to tear down the pipe. + close(); + return false; + } else if (header.hasFlag(MessageHeader.MESSAGE_IS_RESPONSE_FLAG)) { + long requestId = header.getRequestId(); + MessageReceiver responder = mResponders.get(requestId); + if (responder == null) { + return false; + } + return responder.accept(message); + } else { + if (mIncomingMessageReceiver != null) { + return mIncomingMessageReceiver.accept(message); + } + // OK to drop the message. + } + return false; + } +} diff --git a/mojo/bindings/java/src/org/chromium/mojo/bindings/Struct.java b/mojo/bindings/java/src/org/chromium/mojo/bindings/Struct.java index f7c16a2..dc1742a 100644 --- a/mojo/bindings/java/src/org/chromium/mojo/bindings/Struct.java +++ b/mojo/bindings/java/src/org/chromium/mojo/bindings/Struct.java @@ -42,6 +42,35 @@ public abstract class Struct { this.size = size; this.numFields = numFields; } + + /** + * @see Object#hashCode() + */ + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + numFields; + result = prime * result + size; + return result; + } + + /** + * @see Object#equals(Object) + */ + @Override + public boolean equals(Object object) { + if (object == this) + return true; + if (object == null) + return false; + if (getClass() != object.getClass()) + return false; + + DataHeader other = (DataHeader) object; + return (numFields == other.numFields && + size == other.size); + } } /** @@ -73,4 +102,18 @@ public abstract class Struct { return encoder.getMessage(); } + /** + * Returns the serialization of the struct prepended with the given header. + * + * @param header the header to prepend to the returned message. + * @param core the |Core| implementation used to generate handles. Only used if the |Struct| + * being encoded contains interfaces, can be |null| otherwise. + */ + public MessageWithHeader serializeWithHeader(Core core, MessageHeader header) { + Encoder encoder = new Encoder(core, mEncodedBaseSize + header.getSize()); + header.encode(encoder); + encode(encoder); + return new MessageWithHeader(encoder.getMessage(), header); + } + } |