summaryrefslogtreecommitdiffstats
path: root/simple/simple-transport/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'simple/simple-transport/src/main')
-rw-r--r--simple/simple-transport/src/main/java/org/simpleframework/transport/ByteCursor.java131
-rw-r--r--simple/simple-transport/src/main/java/org/simpleframework/transport/ByteReader.java107
-rw-r--r--simple/simple-transport/src/main/java/org/simpleframework/transport/ByteWriter.java98
-rw-r--r--simple/simple-transport/src/main/java/org/simpleframework/transport/Certificate.java65
-rw-r--r--simple/simple-transport/src/main/java/org/simpleframework/transport/CertificateChallenge.java73
-rw-r--r--simple/simple-transport/src/main/java/org/simpleframework/transport/Channel.java128
-rw-r--r--simple/simple-transport/src/main/java/org/simpleframework/transport/FlushScheduler.java197
-rw-r--r--simple/simple-transport/src/main/java/org/simpleframework/transport/FlushSignaller.java120
-rw-r--r--simple/simple-transport/src/main/java/org/simpleframework/transport/Handshake.java652
-rw-r--r--simple/simple-transport/src/main/java/org/simpleframework/transport/Negotiation.java69
-rw-r--r--simple/simple-transport/src/main/java/org/simpleframework/transport/NegotiationState.java337
-rw-r--r--simple/simple-transport/src/main/java/org/simpleframework/transport/OperationFactory.java150
-rw-r--r--simple/simple-transport/src/main/java/org/simpleframework/transport/Phase.java165
-rw-r--r--simple/simple-transport/src/main/java/org/simpleframework/transport/PhaseType.java45
-rw-r--r--simple/simple-transport/src/main/java/org/simpleframework/transport/SecureTransport.java428
-rw-r--r--simple/simple-transport/src/main/java/org/simpleframework/transport/ServerCleaner.java86
-rw-r--r--simple/simple-transport/src/main/java/org/simpleframework/transport/Socket.java89
-rw-r--r--simple/simple-transport/src/main/java/org/simpleframework/transport/SocketBuffer.java308
-rw-r--r--simple/simple-transport/src/main/java/org/simpleframework/transport/SocketBufferAppender.java289
-rw-r--r--simple/simple-transport/src/main/java/org/simpleframework/transport/SocketBufferWriter.java103
-rw-r--r--simple/simple-transport/src/main/java/org/simpleframework/transport/SocketFlusher.java142
-rw-r--r--simple/simple-transport/src/main/java/org/simpleframework/transport/SocketProcessor.java61
-rw-r--r--simple/simple-transport/src/main/java/org/simpleframework/transport/SocketTransport.java262
-rw-r--r--simple/simple-transport/src/main/java/org/simpleframework/transport/SocketWrapper.java144
-rw-r--r--simple/simple-transport/src/main/java/org/simpleframework/transport/Transport.java91
-rw-r--r--simple/simple-transport/src/main/java/org/simpleframework/transport/TransportChannel.java195
-rw-r--r--simple/simple-transport/src/main/java/org/simpleframework/transport/TransportCursor.java260
-rw-r--r--simple/simple-transport/src/main/java/org/simpleframework/transport/TransportDispatcher.java114
-rw-r--r--simple/simple-transport/src/main/java/org/simpleframework/transport/TransportEvent.java91
-rw-r--r--simple/simple-transport/src/main/java/org/simpleframework/transport/TransportException.java55
-rw-r--r--simple/simple-transport/src/main/java/org/simpleframework/transport/TransportProcessor.java63
-rw-r--r--simple/simple-transport/src/main/java/org/simpleframework/transport/TransportReader.java229
-rw-r--r--simple/simple-transport/src/main/java/org/simpleframework/transport/TransportSocketProcessor.java163
-rw-r--r--simple/simple-transport/src/main/java/org/simpleframework/transport/TransportWriter.java150
-rw-r--r--simple/simple-transport/src/main/java/org/simpleframework/transport/connect/Connection.java73
-rw-r--r--simple/simple-transport/src/main/java/org/simpleframework/transport/connect/ConnectionEvent.java42
-rw-r--r--simple/simple-transport/src/main/java/org/simpleframework/transport/connect/ConnectionException.java58
-rw-r--r--simple/simple-transport/src/main/java/org/simpleframework/transport/connect/SocketAcceptor.java315
-rw-r--r--simple/simple-transport/src/main/java/org/simpleframework/transport/connect/SocketAnalyzer.java84
-rw-r--r--simple/simple-transport/src/main/java/org/simpleframework/transport/connect/SocketConnection.java141
-rw-r--r--simple/simple-transport/src/main/java/org/simpleframework/transport/connect/SocketListener.java125
-rw-r--r--simple/simple-transport/src/main/java/org/simpleframework/transport/connect/SocketListenerManager.java127
-rw-r--r--simple/simple-transport/src/main/java/org/simpleframework/transport/connect/SocketTrace.java75
-rw-r--r--simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/Action.java71
-rw-r--r--simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/ActionDistributor.java741
-rw-r--r--simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/ActionSelector.java193
-rw-r--r--simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/ActionSet.java269
-rw-r--r--simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/CancelAction.java114
-rw-r--r--simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/ExecuteAction.java121
-rw-r--r--simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/ExecutorReactor.java132
-rw-r--r--simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/Latch.java71
-rw-r--r--simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/Operation.java69
-rw-r--r--simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/OperationDistributor.java62
-rw-r--r--simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/PartitionDistributor.java136
-rw-r--r--simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/Reactor.java79
-rw-r--r--simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/ReactorEvent.java120
-rw-r--r--simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/SynchronousReactor.java107
-rw-r--r--simple/simple-transport/src/main/java/org/simpleframework/transport/trace/Trace.java57
-rw-r--r--simple/simple-transport/src/main/java/org/simpleframework/transport/trace/TraceAnalyzer.java59
59 files changed, 9101 insertions, 0 deletions
diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/ByteCursor.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/ByteCursor.java
new file mode 100644
index 0000000..a5335ed
--- /dev/null
+++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/ByteCursor.java
@@ -0,0 +1,131 @@
+/*
+ * ByteCursor.java February 2007
+ *
+ * Copyright (C) 2007, Niall Gallagher <niallg@users.sf.net>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.simpleframework.transport;
+
+import java.io.IOException;
+
+/**
+ * The <code>ByteCursor</code> object is used to acquire bytes from a
+ * given source. This provides a cursor style reading of bytes from
+ * a stream in that it will allow the reader to move the cursor back
+ * if the amount of bytes read is too much. Allowing the cursor to
+ * move ensures that excess bytes back be placed back in the stream.
+ * <p>
+ * This is used when parsing input from a stream as it ensures that
+ * on arrival at a terminal token any excess bytes can be placed
+ * back in to the stream. This allows data to be read efficiently
+ * in large chunks from blocking streams such as sockets.
+ *
+ * @author Niall Gallagher
+ *
+ * @see org.simpleframework.transport.TransportCursor
+ */
+public interface ByteCursor {
+
+ /**
+ * Determines whether the cursor is still open. The cursor is
+ * considered open if there are still bytes to read. If there is
+ * still bytes buffered and the underlying transport is closed
+ * then the cursor is still considered open.
+ *
+ * @return true if the read method does not return a -1 value
+ */
+ boolean isOpen() throws IOException;
+
+ /**
+ * Determines whether the cursor is ready for reading. When the
+ * cursor is ready then it guarantees that some amount of bytes
+ * can be read from the underlying stream without blocking.
+ *
+ * @return true if some data can be read without blocking
+ */
+ boolean isReady() throws IOException;
+
+ /**
+ * Provides the number of bytes that can be read from the stream
+ * without blocking. This is typically the number of buffered or
+ * available bytes within the stream. When this reaches zero then
+ * the cursor may perform a blocking read.
+ *
+ * @return the number of bytes that can be read without blocking
+ */
+ int ready() throws IOException;
+
+ /**
+ * Reads a block of bytes from the underlying stream. This will
+ * read up to the requested number of bytes from the underlying
+ * stream. If there are no ready bytes on the stream this can
+ * return zero, representing the fact that nothing was read.
+ *
+ * @param data this is the array to read the bytes in to
+ *
+ * @return this returns the number of bytes read from the stream
+ */
+ int read(byte[] data) throws IOException;
+
+ /**
+ * Reads a block of bytes from the underlying stream. This will
+ * read up to the requested number of bytes from the underlying
+ * stream. If there are no ready bytes on the stream this can
+ * return zero, representing the fact that nothing was read.
+ *
+ * @param data this is the array to read the bytes in to
+ * @param off this is the offset to begin writing the bytes to
+ * @param len this is the number of bytes that are requested
+ *
+ * @return this returns the number of bytes read from the stream
+ */
+ int read(byte[] data, int off, int len) throws IOException;
+
+ /**
+ * Pushes the provided data on to the cursor. Data pushed on to
+ * the cursor will be the next data read from the cursor. This
+ * complements the <code>reset</code> method which will reset
+ * the cursors position on a stream. Allowing data to be pushed
+ * on to the cursor allows more flexibility.
+ *
+ * @param data this is the data to be pushed on to the cursor
+ */
+ void push(byte[] data) throws IOException;
+
+ /**
+ * Pushes the provided data on to the cursor. Data pushed on to
+ * the cursor will be the next data read from the cursor. This
+ * complements the <code>reset</code> method which will reset
+ * the cursors position on a stream. Allowing data to be pushed
+ * on to the cursor allows more flexibility.
+ *
+ * @param data this is the data to be pushed on to the cursor
+ * @param off this is the offset to begin reading the bytes
+ * @param len this is the number of bytes that are to be used
+ */
+ void push(byte[] data, int off, int len) throws IOException;
+
+ /**
+ * Moves the cursor backward within the stream. This ensures
+ * that any bytes read from the last read can be pushed back
+ * in to the stream so that they can be read again. This will
+ * throw an exception if the reset can not be performed.
+ *
+ * @param len this is the number of bytes to reset back
+ *
+ * @return this is the number of bytes that have been reset
+ */
+ int reset(int len) throws IOException;
+}
diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/ByteReader.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/ByteReader.java
new file mode 100644
index 0000000..3e26e76
--- /dev/null
+++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/ByteReader.java
@@ -0,0 +1,107 @@
+/*
+ * ByteReader.java February 2007
+ *
+ * Copyright (C) 2007, Niall Gallagher <niallg@users.sf.net>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.simpleframework.transport;
+
+import java.io.IOException;
+
+/**
+ * The <code>ByteReader</code> object is used to acquire bytes from
+ * a given source. This provides a cursor style reading of bytes from
+ * a stream in that it will allow the reader to move the cursor back
+ * if the amount of bytes read is too much. Allowing the cursor to
+ * move ensures that excess bytes can be placed back in the stream.
+ * <p>
+ * This is used when parsing input from a stream as it ensures that
+ * on arrival at a terminal token any excess bytes can be placed
+ * back in to the stream. This allows data to be read efficiently
+ * in large chunks from blocking streams such as sockets.
+ *
+ * @author Niall Gallagher
+ *
+ * @see org.simpleframework.transport.ByteCursor
+ */
+interface ByteReader {
+
+ /**
+ * Determines whether the source is still open. The source is
+ * considered open if there are still bytes to read. If there is
+ * still bytes buffered and the underlying transport is closed
+ * then the source is still considered open.
+ *
+ * @return true if the read method does not return a -1 value
+ */
+ boolean isOpen() throws IOException;
+
+ /**
+ * Determines whether the source is ready for reading. When the
+ * source is ready then it guarantees that some amount of bytes
+ * can be read from the underlying stream without blocking.
+ *
+ * @return true if some data can be read without blocking
+ */
+ boolean isReady() throws IOException;
+
+ /**
+ * Provides the number of bytes that can be read from the stream
+ * without blocking. This is typically the number of buffered or
+ * available bytes within the stream. When this reaches zero then
+ * the source may perform a blocking read.
+ *
+ * @return the number of bytes that can be read without blocking
+ */
+ int ready() throws IOException;
+
+ /**
+ * Reads a block of bytes from the underlying stream. This will
+ * read up to the requested number of bytes from the underlying
+ * stream. If there are no ready bytes on the stream this can
+ * return zero, representing the fact that nothing was read.
+ *
+ * @param data this is the array to read the bytes in to
+ *
+ * @return this returns the number of bytes read from the source
+ */
+ int read(byte[] data) throws IOException;
+
+ /**
+ * Reads a block of bytes from the underlying stream. This will
+ * read up to the requested number of bytes from the underlying
+ * stream. If there are no ready bytes on the stream this can
+ * return zero, representing the fact that nothing was read.
+ *
+ * @param data this is the array to read the bytes in to
+ * @param off this is the offset to begin writing the bytes to
+ * @param len this is the number of bytes that are requested
+ *
+ * @return this returns the number of bytes read from the source
+ */
+ int read(byte[] data, int off, int len) throws IOException;
+
+ /**
+ * Moves the source backward within the stream. This ensures
+ * that any bytes read from the last read can be pushed back
+ * in to the stream so that they can be read again. This will
+ * throw an exception if the reset can not be performed.
+ *
+ * @param len this is the number of bytes to reset back
+ *
+ * @return this is the number of bytes that have been reset
+ */
+ int reset(int len) throws IOException;
+}
diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/ByteWriter.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/ByteWriter.java
new file mode 100644
index 0000000..dc647cf
--- /dev/null
+++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/ByteWriter.java
@@ -0,0 +1,98 @@
+/*
+ * ByteWriter.java February 2007
+ *
+ * Copyright (C) 2007, Niall Gallagher <niallg@users.sf.net>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.simpleframework.transport;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * The <code>ByteWriter</code> object is used to send data over the TCP
+ * transport. This provides direct contact with the connected socket.
+ * Delivery over a sender implementation can be either SSL based or
+ * direct. It is the responsibility of the implementation to provide
+ * such behavior as required.
+ *
+ * @author Niall Gallagher
+ */
+public interface ByteWriter {
+
+ /**
+ * This method is used to deliver the provided array of bytes to
+ * the underlying transport. Depending on the connection type the
+ * array may be encoded for SSL transport or send directly. Any
+ * implementation may choose to buffer the bytes for performance.
+ *
+ * @param array this is the array of bytes to send to the client
+ */
+ void write(byte[] array) throws IOException;
+
+ /**
+ * This method is used to deliver the provided array of bytes to
+ * the underlying transport. Depending on the connection type the
+ * array may be encoded for SSL transport or send directly. Any
+ * implementation may choose to buffer the bytes for performance.
+ *
+ * @param array this is the array of bytes to send to the client
+ * @param off this is the offset within the array to send from
+ * @param len this is the number of bytes that are to be sent
+ */
+ void write(byte[] array, int off, int len) throws IOException;
+
+ /**
+ * This method is used to deliver the provided buffer of bytes to
+ * the underlying transport. Depending on the connection type the
+ * array may be encoded for SSL transport or send directly. Any
+ * implementation may choose to buffer the bytes for performance.
+ *
+ * @param buffer this is the buffer of bytes to send to the client
+ */
+ void write(ByteBuffer buffer) throws IOException;
+
+ /**
+ * This method is used to deliver the provided buffer of bytes to
+ * the underlying transport. Depending on the connection type the
+ * array may be encoded for SSL transport or send directly. Any
+ * implementation may choose to buffer the bytes for performance.
+ *
+ * @param buffer this is the buffer of bytes to send to the client
+ * @param off this is the offset within the buffer to send from
+ * @param len this is the number of bytes that are to be sent
+ */
+ void write(ByteBuffer buffer, int off, int len) throws IOException;
+
+ /**
+ * This method is used to flush the contents of the buffer to
+ * the client. This method will block until such time as all of
+ * the data has been sent to the client. If at any point there
+ * is an error sending the content an exception is thrown.
+ */
+ void flush() throws IOException;
+
+ /**
+ * This is used to close the sender and the underlying transport.
+ * If a close is performed on the sender then no more bytes can
+ * be read from or written to the transport and the client will
+ * received a connection close on their side.
+ */
+ void close() throws IOException;
+}
+
+
+
+
diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/Certificate.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/Certificate.java
new file mode 100644
index 0000000..05f9c91
--- /dev/null
+++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/Certificate.java
@@ -0,0 +1,65 @@
+/*
+ * Certificate.java June 2013
+ *
+ * Copyright (C) 2013, Niall Gallagher <niallg@users.sf.net>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.simpleframework.transport;
+
+import javax.security.cert.X509Certificate;
+
+/**
+ * The <code>Certificate</code> interface represents the certificate
+ * that is sent by a client during a secure HTTPS conversation. This
+ * may or may not contain an X509 certificate chain from the client.
+ * If it does not a <code>CertificateChallenge</code> may be used to
+ * issue a renegotiation of the connection. One completion of the
+ * renegotiation the challenge executes a completion operation.
+ *
+ * @author Niall Gallagher
+ *
+ * @see org.simpleframework.transport.CertificateChallenge
+ */
+public interface Certificate {
+
+ /**
+ * This will return the X509 certificate chain, if any, that
+ * has been sent by the client. A certificate chain is typically
+ * only send when the server explicitly requests the certificate
+ * on the initial connection or when it is challenged for.
+ *
+ * @return this returns the clients X509 certificate chain
+ */
+ X509Certificate[] getChain() throws Exception;
+
+ /**
+ * This returns a challenge for the certificate. A challenge is
+ * issued by providing a <code>Runnable</code> task which is to
+ * be executed when the challenge has completed. Typically this
+ * task should be used to drive completion of an HTTPS request.
+ *
+ * @return this returns a challenge for the client certificate
+ */
+ CertificateChallenge getChallenge() throws Exception;
+
+ /**
+ * This is used to determine if the X509 certificate chain is
+ * present for the request. If it is not present then a challenge
+ * can be used to request the certificate.
+ *
+ * @return true if the certificate chain is present
+ */
+ boolean isChainPresent() throws Exception;
+}
diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/CertificateChallenge.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/CertificateChallenge.java
new file mode 100644
index 0000000..5ed2743
--- /dev/null
+++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/CertificateChallenge.java
@@ -0,0 +1,73 @@
+/*
+ * CertificateChallenge.java June 2013
+ *
+ * Copyright (C) 2013, Niall Gallagher <niallg@users.sf.net>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.simpleframework.transport;
+
+import java.util.concurrent.Future;
+
+/**
+ * The <code>CertificateChallenge</code> object is used to challenge
+ * a client for their x509 certificate. Notification of a successful
+ * challenge for the certificate is done using a completion task.
+ * The task is executed when the SSL renegotiation completes with
+ * a client certificate.
+ * <p>
+ * For HTTPS the SSL renegotiation workflow used to challenge the
+ * client for their X509 certificate is rather bizzare. It starts
+ * with an initial challenge, where an SSL handshake is performed.
+ * This initial handshake typically completes but results in the
+ * TCP connection being closed by the client. Then a second
+ * handshake is performed by the client on a new TCP connection,
+ * this second handshake does not contain the certificate either.
+ * When the handshake is finished on this new connection the client
+ * will resubmit the original HTTP request. Again the server will
+ * have to challenge for the certificate, which should succeed and
+ * result in execution of the task provided.
+ * <p>
+ * An important point to note here, is that if the client closes
+ * the TCP connection on the first challenge, the completion task
+ * will not be executed, it will be ignored. Only a successful
+ * completion of a HTTPS renegotiation will result in execution
+ * of the provided task.
+ *
+ * @author Niall Gallagher
+ */
+public interface CertificateChallenge {
+
+ /**
+ * This method will challenge the client for their certificate.
+ * It does so by performing an SSL renegotiation. Successful
+ * completion of the SSL renegotiation results in the client
+ * providing their certificate, and execution of the task.
+ *
+ * @return this future containing the original certificate
+ */
+ Future<Certificate> challenge() throws Exception;
+
+ /**
+ * This method will challenge the client for their certificate.
+ * It does so by performing an SSL renegotiation. Successful
+ * completion of the SSL renegotiation results in the client
+ * providing their certificate, and execution of the task.
+ *
+ * @param completion task to be run on successful challenge
+ *
+ * @return this future containing the original certificate
+ */
+ Future<Certificate> challenge(Runnable completion) throws Exception;
+}
diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/Channel.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/Channel.java
new file mode 100644
index 0000000..02e6cbd
--- /dev/null
+++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/Channel.java
@@ -0,0 +1,128 @@
+/*
+ * Channel.java February 2007
+ *
+ * Copyright (C) 2007, Niall Gallagher <niallg@users.sf.net>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.simpleframework.transport;
+
+import java.nio.channels.SocketChannel;
+import java.util.Map;
+
+import org.simpleframework.transport.trace.Trace;
+
+/**
+ * The <code>Channel</code> interface represents a connected channel
+ * through which data can be sent and received. Typically a channel
+ * will have a connected TCP socket, which can be used to determine
+ * when the channel is read ready, and write ready. A channel can
+ * also contain a bag of attributes used to describe the connection.
+ * <p>
+ * Reading and writing to a channel is performed using two special
+ * interfaces. The first is the <code>ByteCursor</code> object which
+ * is used to read data from the channel in a non-blocking manner.
+ * This can also be used to reset data if it has read too much. To
+ * write the <code>ByteWriter</code> can be used, this provides a
+ * blocking interface much like a conventional output stream.
+ *
+ * @author Niall Gallagher
+ */
+public interface Channel {
+
+ /**
+ * This is used to determine if the channel is secure and that
+ * data read from and data written to the request is encrypted.
+ * Channels transferred over SSL are considered secure and will
+ * have this return true, otherwise it will return false.
+ *
+ * @return true if this is secure for reading and writing
+ */
+ boolean isSecure();
+
+ /**
+ * This is the connected socket channel associated with this. In
+ * order to determine if content can be read or written to or
+ * from the channel this socket can be used with a selector. This
+ * provides a means to react to I/O events as they occur rather
+ * than polling the channel which is generally less performant.
+ *
+ * @return this returns the connected socket channel
+ */
+ SocketChannel getSocket();
+
+ /**
+ * This is used to acquire the SSL certificate used for security.
+ * If the socket is connected to an SSL transport this returns an
+ * SSL certificate which was provided during the secure handshake
+ * between the client and server. If not certificates are present
+ * in the provided instance, a challenge can be issued.
+ *
+ * @return the SSL certificate provided by a secure transport
+ */
+ Certificate getCertificate();
+
+ /**
+ * This gets the <code>Trace</code> object associated with the
+ * channel. The trace is used to log various events for the life
+ * of the transaction such as low level read and write events
+ * as well as milestone events and errors.
+ *
+ * @return this returns the trace associated with the socket
+ */
+ Trace getTrace();
+
+ /**
+ * This provides a <code>ByteCursor</code> for this channel. The
+ * cursor provides a seekable view of the input buffer and will
+ * allow the server kernel to peek into the input buffer without
+ * having to take the data from the input. This allows overflow
+ * to be pushed back on to the cursor for subsequent reads.
+ *
+ * @return this returns the input cursor for the channel
+ */
+ ByteCursor getCursor();
+
+ /**
+ * This provides a <code>ByteWriter</code> for the channel. This
+ * is used to provide a blocking output mechanism for the channel.
+ * Enabling blocking reads ensures that output buffering can be
+ * limited to an extent, which ensures that memory remains low at
+ * high load periods. Writes to the sender may result in the data
+ * being copied and queued until the socket is write ready.
+ *
+ * @return this returns the output sender for this channel
+ */
+ ByteWriter getWriter();
+
+ /**
+ * This returns the <code>Map</code> of attributes used to hold
+ * connection information for the channel. The attributes here
+ * are taken from the pipeline attributes and may contain details
+ * such as SSL certificates or other such useful information.
+ *
+ * @return returns the attributes associated with the channel
+ */
+ Map getAttributes();
+
+ /**
+ * Because the channel represents a duplex means of communication
+ * there needs to be a means to close it down. This provides such
+ * a means. By closing the channel the cursor and sender will no
+ * longer send or recieve data to or from the network. The client
+ * will also be signaled that the connection has been severed.
+ */
+ void close();
+
+} \ No newline at end of file
diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/FlushScheduler.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/FlushScheduler.java
new file mode 100644
index 0000000..43a64ed
--- /dev/null
+++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/FlushScheduler.java
@@ -0,0 +1,197 @@
+/*
+ * FlushScheduler.java February 2008
+ *
+ * Copyright (C) 2008, Niall Gallagher <niallg@users.sf.net>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.simpleframework.transport;
+
+import static java.nio.channels.SelectionKey.OP_WRITE;
+import static org.simpleframework.transport.TransportEvent.WRITE_BLOCKING;
+import static org.simpleframework.transport.TransportEvent.WRITE_WAIT;
+
+import java.io.IOException;
+
+import org.simpleframework.transport.reactor.Operation;
+import org.simpleframework.transport.reactor.Reactor;
+import org.simpleframework.transport.trace.Trace;
+
+/**
+ * The <code>FlushScheduler</code> object is used to schedule a task
+ * for execution when it is write ready. This is used by the socket
+ * flusher to ensure that the writing thread can be blocked until
+ * such time as all the bytes required to be written are written.
+ * <p>
+ * All methods are invoked by a <code>SocketFlusher</code> object
+ * which is synchronized. This ensures that the methods of the
+ * scheduler are thread safe in that only one thread will access
+ * them at any given time. The lock used by the socket flusher can
+ * thus be safely as it will be synchronized on by the flusher.
+ *
+ * @author Niall Gallagher
+ *
+ * @see org.simpleframework.transport.SocketFlusher
+ */
+class FlushScheduler {
+
+ /**
+ * This is the operation that is scheduled for execution.
+ */
+ private Operation task;
+
+ /**
+ * This is the reactor to used to execute the operation.
+ */
+ private Reactor reactor;
+
+ /**
+ * This is the trace that listens to all transport events.
+ */
+ private Trace trace;
+
+ /**
+ * This is the lock that is used to signal a blocked thread.
+ */
+ private Object lock;
+
+ /**
+ * This is used to determine if the scheduler is running.
+ */
+ private volatile boolean running;
+
+ /**
+ * This is used to determine if the scheduler is interrupted.
+ */
+ private volatile boolean closed;
+
+ /**
+ * This is used to determine if there is currently a flush.
+ */
+ private volatile boolean flushing;
+
+ /**
+ * Constructor for the <code>FlushScheduler</code> object. This
+ * is* used to create a scheduler that will execute the provided
+ * task when the associated socket is write ready.
+ *
+ * @param socket this is the associated socket for the scheduler
+ * @param reactor this is the rector used to schedule execution
+ * @param task this is the task that is executed when writable
+ * @param lock this is the lock used to signal blocking threads
+ */
+ public FlushScheduler(Socket socket, Reactor reactor, Operation task, Object lock) {
+ this.trace = socket.getTrace();
+ this.reactor = reactor;
+ this.task = task;
+ this.lock = lock;
+ }
+
+ /**
+ * This is used to repeat schedule the operation for execution.
+ * This is executed if the operation has not fully completed
+ * its task. If the scheduler is not in a running state then
+ * this will not schedule the task for a repeat execution.
+ */
+ public void repeat() throws IOException {
+ if(closed) {
+ throw new TransportException("Socket closed");
+ }
+ if(running) {
+ trace.trace(WRITE_WAIT);
+ reactor.process(task, OP_WRITE);
+ }
+ }
+
+ /**
+ * This is used to schedule the task for execution. If this is
+ * given a boolean true to indicate that it wishes to block
+ * then this will block the calling thread until such time as
+ * the <code>ready</code> method is invoked.
+ *
+ * @param block indicates whether the thread should block
+ */
+ public void schedule(boolean block) throws IOException {
+ if(closed) {
+ throw new TransportException("Socket closed");
+ }
+ if(!running) {
+ trace.trace(WRITE_WAIT);
+ reactor.process(task, OP_WRITE);
+ running = true;
+ }
+ if(block) {
+ listen();
+ }
+ }
+
+ /**
+ * This is used to listen for a notification from the reactor to
+ * tell the thread that the write operation has completed. If
+ * the thread is interrupted upon this call then this will throw
+ * an <code>IOException</code> with the root cause.
+ */
+ private void listen() throws IOException {
+ if(flushing) {
+ throw new TransportException("Socket already flushing");
+ }
+ try {
+ if(!closed) {
+ try {
+ flushing = true;
+ trace.trace(WRITE_BLOCKING);
+ lock.wait(120000);
+ } finally {
+ flushing = false;
+ }
+ }
+ } catch(Exception e) {
+ throw new TransportException("Could not schedule for flush", e);
+ }
+ if(closed) {
+ throw new TransportException("Socket closed");
+ }
+ }
+
+ /**
+ * This is used to notify any waiting threads that they no longer
+ * need to wait. This is used when the flusher no longer needs
+ * the waiting thread to block. Such an occurrence happens when
+ * all shared data has been written or has been duplicated.
+ */
+ public void release() {
+ lock.notifyAll();
+ }
+
+ /**
+ * This is used to signal any blocking threads to wake up. When
+ * this is invoked blocking threads are signaled and they can
+ * return. This is typically done when the task has finished.
+ */
+ public void ready() {
+ lock.notifyAll();
+ running = false;
+ }
+
+ /**
+ * This is used to close the scheduler when the reactor is
+ * closed by the server. An close will happen when the server
+ * has been shutdown, it ensures there are no threads lingering
+ * waiting for a notification when the reactor has closed.
+ */
+ public void close() {
+ lock.notifyAll();
+ closed = true;
+ }
+}
diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/FlushSignaller.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/FlushSignaller.java
new file mode 100644
index 0000000..e7f2c4f
--- /dev/null
+++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/FlushSignaller.java
@@ -0,0 +1,120 @@
+/*
+ * FlushSignaller.java February 2008
+ *
+ * Copyright (C) 2008, Niall Gallagher <niallg@users.sf.net>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.simpleframework.transport;
+
+import static org.simpleframework.transport.TransportEvent.ERROR;
+
+import java.nio.channels.SocketChannel;
+
+import org.simpleframework.transport.reactor.Operation;
+import org.simpleframework.transport.trace.Trace;
+
+/**
+ * The <code>FlushSignaller</code> is an operation that performs
+ * writes operation asynchronously. This will basically determine
+ * if the socket is write ready and drain each queued buffer to
+ * the socket until there are no more pending buffers.
+ *
+ * @author Niall Gallagher
+ */
+class FlushSignaller implements Operation {
+
+ /**
+ * This is the writer that is used to write the data.
+ */
+ private final SocketFlusher writer;
+
+ /**
+ * This is the socket that this will be flushing.
+ */
+ private final Socket socket;
+
+ /**
+ * This is used to trace the activity for the operation.
+ */
+ private final Trace trace;
+
+ /**
+ * Constructor for the <code>FlushSignaller</code> object. This
+ * will create an operation that is used to flush the buffer
+ * queue to the underlying socket. This ensures that the data
+ * is written to the socket in the queued order.
+ *
+ * @param writer this is the writer to flush the data to
+ * @param socket this is the socket to be flushed
+ */
+ public FlushSignaller(SocketFlusher writer, Socket socket) {
+ this.trace = socket.getTrace();
+ this.socket = socket;
+ this.writer = writer;
+ }
+
+ /**
+ * This is used to acquire the trace object that is associated
+ * with the operation. A trace object is used to collection details
+ * on what operations are being performed. For instance it may
+ * contain information relating to I/O events or errors.
+ *
+ * @return this returns the trace associated with this operation
+ */
+ public Trace getTrace() {
+ return trace;
+ }
+
+ /**
+ * This returns the socket channel for the connected pipeline. It
+ * is this channel that is used to determine if there are bytes
+ * that can be written. When closed this is no longer selectable.
+ *
+ * @return this returns the connected channel for the pipeline
+ */
+ public SocketChannel getChannel() {
+ return socket.getChannel();
+ }
+
+ /**
+ * This is used to perform the drain of the pending buffer
+ * queue. This will drain each pending queue if the socket is
+ * write ready. If the socket is not write ready the operation
+ * is enqueued for selection and this returns. This ensures
+ * that all the data will eventually be delivered.
+ */
+ public void run() {
+ try {
+ writer.execute();
+ } catch(Exception cause) {
+ trace.trace(ERROR, cause);
+ cancel();
+ }
+ }
+
+ /**
+ * This is used to cancel the operation if it has timed out.
+ * If the delegate is waiting too long to flush the contents
+ * of the buffers to the underlying transport then the socket
+ * is closed and the flusher times out to avoid deadlock.
+ */
+ public void cancel() {
+ try {
+ writer.abort();
+ }catch(Exception cause){
+ trace.trace(ERROR, cause);
+ }
+ }
+}
diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/Handshake.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/Handshake.java
new file mode 100644
index 0000000..2a4b9a2
--- /dev/null
+++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/Handshake.java
@@ -0,0 +1,652 @@
+/*
+ * Handshake.java February 2007
+ *
+ * Copyright (C) 2007, Niall Gallagher <niallg@users.sf.net>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.simpleframework.transport;
+
+import static java.nio.channels.SelectionKey.OP_READ;
+import static java.nio.channels.SelectionKey.OP_WRITE;
+import static org.simpleframework.transport.PhaseType.COMMIT;
+import static org.simpleframework.transport.PhaseType.CONSUME;
+import static org.simpleframework.transport.PhaseType.PRODUCE;
+import static org.simpleframework.transport.TransportEvent.ERROR;
+import static org.simpleframework.transport.TransportEvent.HANDSHAKE_BEGIN;
+import static org.simpleframework.transport.TransportEvent.HANDSHAKE_DONE;
+import static org.simpleframework.transport.TransportEvent.HANDSHAKE_FAILED;
+import static org.simpleframework.transport.TransportEvent.READ;
+import static org.simpleframework.transport.TransportEvent.WRITE;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SocketChannel;
+import java.util.concurrent.Future;
+
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLEngineResult;
+import javax.net.ssl.SSLEngineResult.HandshakeStatus;
+
+import org.simpleframework.transport.reactor.Reactor;
+import org.simpleframework.transport.trace.Trace;
+
+/**
+ * The <code>Handshake</code> object is used to perform secure SSL
+ * negotiations on a pipeline or <code>Transport</code>. This can
+ * be used to perform an SSL handshake. To perform the negotiation
+ * this uses an SSL engine provided with the transport to direct
+ * the conversation. The SSL engine tells the negotiation what is
+ * expected next, whether this is a response to the client or a
+ * message from it. During the negotiation this may need to wait
+ * for either a write ready event or a read ready event. Event
+ * notification is done using the processor provided.
+ *
+ * @author Niall Gallagher
+ *
+ * @see org.simpleframework.transport.TransportProcessor
+ */
+class Handshake implements Negotiation {
+
+ /**
+ * This is the processor used to process the secure transport.
+ */
+ private final TransportProcessor processor;
+
+ /**
+ * This is the certificate associated with this negotiation.
+ */
+ private final NegotiationState state;
+
+ /**
+ * This is the socket channel used to read and write data to.
+ */
+ private final SocketChannel channel;
+
+ /**
+ * This is the transport dispatched when the negotiation ends.
+ */
+ private final Transport transport;
+
+ /**
+ * This is the reactor used to register for I/O notifications.
+ */
+ private final Reactor reactor;
+
+ /**
+ * This is the output buffer used to generate data to.
+ */
+ private final ByteBuffer output;
+
+ /**
+ * This is the input buffer used to read data from the socket.
+ */
+ private final ByteBuffer input;
+
+ /**
+ * This is an empty byte buffer used to generate a response.
+ */
+ private final ByteBuffer empty;
+
+ /**
+ * This is the SSL engine used to direct the conversation.
+ */
+ private final SSLEngine engine;
+
+ /**
+ * This is the trace that is used to monitor handshake events.
+ */
+ private final Trace trace;
+
+ /**
+ * This determines if the handshake is from the client side.
+ */
+ private final boolean client;
+
+ /**
+ * Constructor for the <code>Handshake</code> object. This is
+ * used to create an operation capable of performing negotiations
+ * for SSL connections. Typically this is used to perform request
+ * response negotiations, such as a handshake or termination.
+ *
+ * @param processor the processor used to dispatch the transport
+ * @param transport the transport to perform the negotiation for
+ * @param reactor this is the reactor used for I/O notifications
+ */
+ public Handshake(TransportProcessor processor, Transport transport, Reactor reactor) {
+ this(processor, transport, reactor, 20480);
+ }
+
+ /**
+ * Constructor for the <code>Handshake</code> object. This is
+ * used to create an operation capable of performing negotiations
+ * for SSL connections. Typically this is used to perform request
+ * response negotiations, such as a handshake or termination.
+ *
+ * @param transport the transport to perform the negotiation for
+ * @param processor the processor used to dispatch the transport
+ * @param reactor this is the reactor used for I/O notifications
+ * @param size the size of the buffers used for the negotiation
+ */
+ public Handshake(TransportProcessor processor, Transport transport, Reactor reactor, int size) {
+ this(processor, transport, reactor, size, false);
+ }
+
+ /**
+ * Constructor for the <code>Handshake</code> object. This is
+ * used to create an operation capable of performing negotiations
+ * for SSL connections. Typically this is used to perform request
+ * response negotiations, such as a handshake or termination.
+ *
+ * @param transport the transport to perform the negotiation for
+ * @param processor the processor used to dispatch the transport
+ * @param reactor this is the reactor used for I/O notifications
+ * @param client determines the side of the SSL handshake
+ */
+ public Handshake(TransportProcessor processor, Transport transport, Reactor reactor, boolean client) {
+ this(processor, transport, reactor, 20480, client);
+ }
+
+ /**
+ * Constructor for the <code>Handshake</code> object. This is
+ * used to create an operation capable of performing negotiations
+ * for SSL connections. Typically this is used to perform request
+ * response negotiations, such as a handshake or termination.
+ *
+ * @param transport the transport to perform the negotiation for
+ * @param processor the processor used to dispatch the transport
+ * @param reactor this is the reactor used for I/O notifications
+ * @param size the size of the buffers used for the negotiation
+ * @param client determines the side of the SSL handshake
+ */
+ public Handshake(TransportProcessor processor, Transport transport, Reactor reactor, int size, boolean client) {
+ this.state = new NegotiationState(this, transport);
+ this.output = ByteBuffer.allocate(size);
+ this.input = ByteBuffer.allocate(size);
+ this.channel = transport.getChannel();
+ this.engine = transport.getEngine();
+ this.trace = transport.getTrace();
+ this.empty = ByteBuffer.allocate(0);
+ this.processor = processor;
+ this.transport = transport;
+ this.reactor = reactor;
+ this.client = client;
+ }
+
+ /**
+ * This is used to acquire the trace object that is associated
+ * with the operation. A trace object is used to collection details
+ * on what operations are being performed. For instance it may
+ * contain information relating to I/O events or errors.
+ *
+ * @return this returns the trace associated with this operation
+ */
+ public Trace getTrace() {
+ return trace;
+ }
+
+ /**
+ * This returns the socket channel for the connected pipeline. It
+ * is this channel that is used to determine if there are bytes
+ * that can be read. When closed this is no longer selectable.
+ *
+ * @return this returns the connected channel for the pipeline
+ */
+ public SelectableChannel getChannel() {
+ return channel;
+ }
+
+ /**
+ * This is used to start the negotiation. Once started this will
+ * send a message to the other side, once sent the negotiation
+ * reads the response. However if the response is not yet ready
+ * this will schedule the negotiation for a selectable operation
+ * ensuring that it can resume execution when ready.
+ */
+ public void run() {
+ if(engine != null) {
+ trace.trace(HANDSHAKE_BEGIN);
+ engine.setUseClientMode(client);
+ input.flip();
+ }
+ begin();
+ }
+
+ /**
+ * This is used to terminate the negotiation. This is excecuted
+ * when the negotiation times out. When the negotiation expires it
+ * is rejected by the processor and must be canceled. Canceling
+ * is basically termination of the connection to free resources.
+ */
+ public void cancel() {
+ try {
+ terminate();
+ } catch(Exception cause) {
+ trace.trace(ERROR, cause);
+ }
+ }
+
+ /**
+ * This is used to start the negotation. Once started this will
+ * send a message to the other side, once sent the negotiation
+ * reads the response. However if the response is not yet ready
+ * this will schedule the negotiation for a selectable operation
+ * ensuring that it can resume execution when ready.
+ */
+ private void begin() {
+ try {
+ resume();
+ } catch(Exception cause) {
+ trace.trace(ERROR, cause);
+ cancel();
+ }
+ }
+
+ /**
+ * This is the main point of execution within the negotiation. It
+ * is where the negotiation is performed. Negotiations are done
+ * by performing a request response flow, governed by the SSL
+ * engine associated with the pipeline. Typically the client is
+ * the one to initiate the handshake and the server initiates the
+ * termination sequence. This may be executed several times
+ * depending on whether reading or writing blocks.
+ */
+ public void resume() throws IOException {
+ Runnable task = process();
+
+ if(task != null) {
+ task.run();
+ }
+ }
+
+ /**
+ * This is the main point of execution within the negotiation. It
+ * is where the negotiation is performed. Negotiations are done
+ * by performing a request response flow, governed by the SSL
+ * engine associated with the transport. Typically the client is
+ * the one to initiate the handshake and the server initiates the
+ * termination sequence. This may be executed several times
+ * depending on whether reading or writing blocks.
+ *
+ * @return this returns a task used to execute the next phase
+ */
+ private Runnable process() throws IOException {
+ PhaseType require = exchange();
+
+ if(require == CONSUME) {
+ return new Consumer(this, reactor, trace);
+ }
+ if(require == PRODUCE) {
+ return new Producer(this, reactor, trace);
+ }
+ return new Committer(this, reactor, trace);
+ }
+
+ /**
+ * This is the main point of execution within the negotiation. It
+ * is where the negotiation is performed. Negotiations are done
+ * by performing a request response flow, governed by the SSL
+ * engine associated with the transport. Typically the client is
+ * the one to initiate the handshake and the server initiates the
+ * termination sequence. This may be executed several times
+ * depending on whether reading or writing blocks.
+ *
+ * @return this returns what is expected next in the negotiation
+ */
+ private PhaseType exchange() throws IOException {
+ HandshakeStatus status = engine.getHandshakeStatus();
+
+ switch(status){
+ case NEED_WRAP:
+ return write();
+ case NOT_HANDSHAKING:
+ case NEED_UNWRAP:
+ return read();
+ }
+ return COMMIT;
+ }
+
+ /**
+ * This is used to perform the read part of the negotiation. The
+ * read part is where the other side sends information where it
+ * is consumed and is used to determine what action to take.
+ * Typically it is the SSL engine that determines what action is
+ * to be taken depending on the data send from the other side.
+ *
+ * @return the next action that should be taken by the handshake
+ */
+ private PhaseType read() throws IOException {
+ return read(5);
+ }
+
+ /**
+ * This is used to perform the read part of the negotiation. The
+ * read part is where the other side sends information where it
+ * is consumed and is used to determine what action to take.
+ * Typically it is the SSL engine that determines what action is
+ * to be taken depending on the data send from the other side.
+ *
+ * @param count this is the number of times a read can repeat
+ *
+ * @return the next action that should be taken by the handshake
+ */
+ private PhaseType read(int count) throws IOException {
+ while(count > 0) {
+ SSLEngineResult result = engine.unwrap(input, output);
+ HandshakeStatus status = result.getHandshakeStatus();
+
+ switch(status) {
+ case NOT_HANDSHAKING:
+ return COMMIT;
+ case NEED_WRAP:
+ return PRODUCE;
+ case FINISHED:
+ case NEED_UNWRAP:
+ return read(count-1);
+ case NEED_TASK:
+ execute();
+ }
+ }
+ return CONSUME;
+ }
+
+ /**
+ * This is used to perform the write part of the negotiation. The
+ * read part is where the this sends information to the other side
+ * and the other side interprets the data and determines what action
+ * to take. After a write the negotiation typically completes or
+ * waits for the next response from the other side.
+ *
+ * @return the next action that should be taken by the handshake
+ */
+ private PhaseType write() throws IOException {
+ return write(5);
+ }
+
+ /**
+ * This is used to perform the write part of the negotiation. The
+ * read part is where the this sends information to the other side
+ * and the other side interprets the data and determines what action
+ * to take. After a write the negotiation typically completes or
+ * waits for the next response from the other side.
+ *
+ * @param count this is the number of times a read can repeat
+ *
+ * @return the next action that should be taken by the handshake
+ */
+ private PhaseType write(int count) throws IOException {
+ while(count > 0) {
+ SSLEngineResult result = engine.wrap(empty, output);
+ HandshakeStatus status = result.getHandshakeStatus();
+
+ switch(status) {
+ case NOT_HANDSHAKING:
+ case FINISHED:
+ case NEED_UNWRAP:
+ return PRODUCE;
+ case NEED_WRAP:
+ return write(count-1);
+ case NEED_TASK:
+ execute();
+ }
+ }
+ return PRODUCE;
+ }
+
+ /**
+ * This is used to execute the delegated tasks. These tasks are
+ * used to digest the information received from the client in
+ * order to generate a response. This may need to execute several
+ * tasks from the associated SSL engine.
+ */
+ private void execute() throws IOException {
+ while(true) {
+ Runnable task = engine.getDelegatedTask();
+
+ if(task == null) {
+ break;
+ }
+ task.run();
+ }
+ }
+
+ /**
+ * This is used to receive data from the client. If at any
+ * point during the negotiation a message is required that
+ * can not be read immediately this is used to asynchronously
+ * read the data when a select operation is signalled.
+ *
+ * @return this returns true when the message has been read
+ */
+ public boolean receive() throws IOException {
+ int count = input.capacity();
+
+ if(count > 0) {
+ input.compact();
+ }
+ int size = channel.read(input);
+
+ if(trace != null) {
+ trace.trace(READ, size);
+ }
+ if(size < 0) {
+ throw new TransportException("Client closed connection");
+ }
+ if(count > 0) {
+ input.flip();
+ }
+ return size > 0;
+ }
+
+ /**
+ * Here we attempt to send all data within the output buffer. If
+ * all of the data is delivered to the other side then this will
+ * return true. If however there is content yet to be sent to
+ * the other side then this returns false, telling the negotiation
+ * that in order to resume it must attempt to send the content
+ * again after a write ready operation on the underlying socket.
+ *
+ * @return this returns true if all of the content is delivered
+ */
+ public boolean send() throws IOException {
+ int require = output.position();
+ int count = 0;
+
+ if(require > 0) {
+ output.flip();
+ }
+ while(count < require) {
+ int size = channel.write(output);
+
+ if(trace != null) {
+ trace.trace(WRITE, size);
+ }
+ if(size <= 0) {
+ break;
+ }
+ count += size;
+ }
+ if(require > 0) {
+ output.compact();
+ }
+ return count == require;
+ }
+
+ /**
+ * This method is invoked when the negotiation is done and the
+ * next phase of the connection is to take place. This will
+ * be invoked when the SSL handshake has completed and the new
+ * secure transport is to be handed to the processor.
+ */
+ private void dispatch() throws IOException {
+ Transport secure = new SecureTransport(transport, state, output, input);
+
+ if(processor != null) {
+ trace.trace(HANDSHAKE_DONE);
+ processor.process(secure);
+ }
+ }
+
+ /**
+ * This method is used to terminate the handshake. Termination
+ * typically occurs when there has been some error in the handshake
+ * or when there is a timeout on some event, such as waiting for
+ * for a read or write operation to occur. As a result the TCP
+ * channel is closed and any challenge future is cancelled.
+ */
+ private void terminate() throws IOException {
+ Future<Certificate> future = state.getFuture();
+
+ trace.trace(HANDSHAKE_FAILED);
+ transport.close();
+ future.cancel(true);
+ }
+
+ /**
+ * This is used to execute the completion task after a challenge
+ * for the clients X509 certificate. Execution of the completion
+ * task in this way allows any challanger to be notified that
+ * the handshake has complete.
+ */
+ private void complete() throws IOException {
+ Runnable task = state.getFuture();
+
+ if(task != null) {
+ task.run();
+ }
+ }
+
+ /**
+ * This method is invoked when the negotiation is done and the
+ * next phase of the connection is to take place. If a certificate
+ * challenge was issued then the completion task is executed, if
+ * this was the handshake for the initial connection a transport
+ * is created and handed to the processor.
+ */
+ public void commit() throws IOException {
+ if(!state.isChallenge()) {
+ dispatch();
+ } else {
+ complete();
+ }
+ }
+
+ /**
+ * The <code>Committer</code> task is used to transfer the transport
+ * created to the processor. This is executed when the SSL
+ * handshake is completed. It allows the transporter to use the
+ * newly created transport to read and write in plain text and
+ * to have the SSL transport encrypt and decrypt transparently.
+ */
+ private class Committer extends Phase {
+
+ /**
+ * Constructor for the <code>Committer</code> task. This is used to
+ * pass the transport object object to the processor when the
+ * SSL handshake has completed.
+ *
+ * @param state this is the underlying negotiation to use
+ * @param reactor this is the reactor used for I/O notifications
+ * @param trace the trace that is used to monitor the handshake
+ */
+ public Committer(Negotiation state, Reactor reactor, Trace trace) {
+ super(state, reactor, trace, OP_READ);
+ }
+
+ /**
+ * This is used to execute the task. It is up to the specific
+ * task implementation to decide what to do when executed. If
+ * the task needs to read or write data then it can attempt
+ * to perform the read or write, if it incomplete the it can
+ * be scheduled for execution with the reactor.
+ */
+ @Override
+ public void execute() throws IOException{
+ state.commit();
+ }
+ }
+
+ /**
+ * The <code>Consumer</code> task is used to schedule the negotiation
+ * for a read operation. This allows the negotiation to receive any
+ * messages generated by the client asynchronously. Once this has
+ * completed then it will resume the negotiation.
+ */
+ private class Consumer extends Phase {
+
+ /**
+ * Constructor for the <code>Consumer</code> task. This is used
+ * to create a task which will schedule a read operation for
+ * the negotiation. When the operation completes this will
+ * resume the negotiation.
+ *
+ * @param state this is the negotiation object that is used
+ * @param reactor this is the reactor used for I/O notifications
+ * @param trace the trace that is used to monitor the handshake
+ */
+ public Consumer(Negotiation state, Reactor reactor, Trace trace) {
+ super(state, reactor, trace, OP_READ);
+ }
+
+ /**
+ * This method is used to determine if the task is ready. This
+ * is executed when the select operation is signalled. When this
+ * is true the the task completes. If not then this will
+ * schedule the task again for the specified select operation.
+ *
+ * @return this returns true when the task has completed
+ */
+ @Override
+ protected boolean ready() throws IOException {
+ return state.receive();
+ }
+ }
+
+ /**
+ * The <code>Producer</code> is used to schedule the negotiation
+ * for a write operation. This allows the negotiation to send any
+ * messages generated during the negotiation asynchronously. Once
+ * this has completed then it will resume the negotiation.
+ */
+ private class Producer extends Phase {
+
+ /**
+ * Constructor for the <code>Producer</code> task. This is used
+ * to create a task which will schedule a write operation for
+ * the negotiation. When the operation completes this will
+ * resume the negotiation.
+ *
+ * @param state this is the negotiation object that is used
+ * @param reactor this is the reactor used for I/O notifications
+ * @param trace the trace that is used to monitor the handshake
+ */
+ public Producer(Negotiation state, Reactor reactor, Trace trace) {
+ super(state, reactor, trace, OP_WRITE);
+ }
+
+ /**
+ * This method is used to determine if the task is ready. This
+ * is executed when the select operation is signalled. When this
+ * is true the the task completes. If not then this will
+ * schedule the task again for the specified select operation.
+ *
+ * @return this returns true when the task has completed
+ */
+ @Override
+ protected boolean ready() throws IOException {
+ return state.send();
+ }
+ }
+}
diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/Negotiation.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/Negotiation.java
new file mode 100644
index 0000000..4140b34
--- /dev/null
+++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/Negotiation.java
@@ -0,0 +1,69 @@
+/*
+ * Negotiation.java February 2007
+ *
+ * Copyright (C) 2007, Niall Gallagher <niallg@users.sf.net>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.simpleframework.transport;
+
+import java.io.IOException;
+
+import org.simpleframework.transport.reactor.Operation;
+
+/**
+ * The <code>Negotiation</code> interface is used to represent an
+ * SSL negotiation. When an operation can not be completed this
+ * will allow a task to perform asynchronous operations and resume
+ * the negotiation when those operations can be fulfilled.
+ *
+ * @author Niall Gallagher
+ */
+interface Negotiation extends Operation {
+
+ /**
+ * This is used to resume the negotiation when an operation
+ * has completed. This will continue the decrypt and encrypt
+ * sequence of messages required to fulfil the negotiation.
+ */
+ void resume() throws IOException;
+
+ /**
+ * This method is invoked when the negotiation is done and
+ * the next phase of the connection is to take place. This
+ * will typically be invoked when an SSL handshake or
+ * termination exchange has completed successfully.
+ */
+ void commit() throws IOException;
+
+ /**
+ * This is used to send any messages the negotiation may have.
+ * If the negotiation can not send the information during its
+ * execution then this method will be executed when a select
+ * operation is signaled.
+ *
+ * @return this returns true when the message has been sent
+ */
+ boolean send() throws IOException;
+
+ /**
+ * This is used to receive data from the other side. If at any
+ * point during the negotiation a message is required that
+ * can not be read immediately this is used to asynchronously
+ * read the data when a select operation is signaled.
+ *
+ * @return this returns true when the message has been read
+ */
+ boolean receive() throws IOException;
+}
diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/NegotiationState.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/NegotiationState.java
new file mode 100644
index 0000000..160b5f9
--- /dev/null
+++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/NegotiationState.java
@@ -0,0 +1,337 @@
+/*
+ * NegotiationCertificate.java June 2013
+ *
+ * Copyright (C) 2013, Niall Gallagher <niallg@users.sf.net>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.simpleframework.transport;
+
+import static org.simpleframework.transport.TransportEvent.CERTIFICATE_CHALLENGE;
+import static org.simpleframework.transport.TransportEvent.ERROR;
+
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLSession;
+import javax.security.cert.X509Certificate;
+
+import org.simpleframework.transport.trace.Trace;
+
+/**
+ * The <code>NegotiationState</code> represents the certificate
+ * that is sent by a client during a secure HTTPS conversation. This
+ * may or may not contain an X509 certificate chain from the client.
+ * If it does not a <code>CertificateChallenge</code> may be used to
+ * issue a renegotiation of the connection. One completion of the
+ * renegotiation the challenge executes a completion operation.
+ *
+ * @author Niall Gallagher
+ */
+class NegotiationState implements Certificate {
+
+ /**
+ * This is used to hold the completion task for the challenge.
+ */
+ private final RunnableFuture<Certificate> future;
+
+ /**
+ * This is the handshake used to acquire the certificate details.
+ */
+ private final Negotiation negotiation;
+
+ /**
+ * This is the challenge used to request the client certificate.
+ */
+ private final Challenge challenge;
+
+ /**
+ * This is the runnable task that is executed on task completion.
+ */
+ private final Delegate delegate;
+
+ /**
+ * This is the socket representing the underlying TCP connection.
+ */
+ private final Socket socket;
+
+ /**
+ * Constructor for the <code>NegotiationCertificate</code> object.
+ * This creates an object used to provide certificate details and
+ * a means to challenge for certificate details for the connected
+ * client if required.
+ *
+ * @param negotiation the negotiation associated with this
+ * @param socket the underlying TCP connection to the client
+ */
+ public NegotiationState(Negotiation negotiation, Socket socket) {
+ this.delegate = new Delegate(socket);
+ this.future = new FutureTask<Certificate>(delegate, this);
+ this.challenge = new Challenge(socket);
+ this.negotiation = negotiation;
+ this.socket = socket;
+ }
+
+ /**
+ * This is used to determine if the state is in challenge mode.
+ * In challenge mode a challenge future will be executed on
+ * completion of the challenge. This will the completion task.
+ *
+ * @return this returns true if the state is in challenge mode
+ */
+ public boolean isChallenge() {
+ return delegate.isSet();
+ }
+
+ /**
+ * This returns the completion task associated with any challenge
+ * made for the client certificate. If this returns null then no
+ * challenge has been made for the client certificate.
+ *
+ * @return this returns the challenge completion task if any
+ */
+ public RunnableFuture<Certificate> getFuture() {
+ return future;
+ }
+
+ /**
+ * This returns a challenge for the certificate. A challenge is
+ * issued by providing a <code>Runnable</code> task which is to
+ * be executed when the challenge has completed. Typically this
+ * task should be used to drive completion of an HTTPS request.
+ *
+ * @return this returns a challenge for the client certificate
+ */
+ public CertificateChallenge getChallenge() throws Exception {
+ return challenge;
+ }
+
+ /**
+ * This will return the X509 certificate chain, if any, that
+ * has been sent by the client. A certificate chain is typically
+ * only send when the server explicitly requests the certificate
+ * on the initial connection or when it is challenged for.
+ *
+ * @return this returns the clients X509 certificate chain
+ */
+ public X509Certificate[] getChain() throws Exception {
+ SSLSession session = getSession();
+
+ if(session != null) {
+ return session.getPeerCertificateChain();
+ }
+ return null;
+ }
+
+ /**
+ * This is used to acquire the SSL session associated with the
+ * handshake. The session makes all of the details associated
+ * with the handshake available, including the cipher suites
+ * used and the SSL context used to create the session.
+ *
+ * @return the SSL session associated with the connection
+ */
+ public SSLSession getSession() throws Exception{
+ SSLEngine engine = socket.getEngine();
+
+ if(engine != null) {
+ return engine.getSession();
+ }
+ return null;
+ }
+
+ /**
+ * This is used to determine if the X509 certificate chain is
+ * present for the request. If it is not present then a challenge
+ * can be used to request the certificate.
+ *
+ * @return true if the certificate chain is present
+ */
+ public boolean isChainPresent() {
+ try {
+ return getChain() != null;
+ } catch(Exception e) {
+ return false;
+ }
+ }
+
+ /**
+ * The <code>Challenge</code> object is used to enable the server
+ * to challenge for the client X509 certificate if desired. It
+ * performs the challenge by performing an SSL renegotiation to
+ * request that the client sends the
+ */
+ private class Challenge implements CertificateChallenge {
+
+ /**
+ * This is the SSL engine that is used to begin the handshake.
+ */
+ private final SSLEngine engine;
+
+ /**
+ * This is used to trace the certificate challenge request.
+ */
+ private final Trace trace;
+
+ /**
+ * Constructor for the <code>Challenge</code> object. This can
+ * be used to challenge the client for their X509 certificate.
+ * It does this by performing an SSL renegotiation on the
+ * existing TCP connection.
+ *
+ * @param socket this is the TCP connection to the client
+ */
+ public Challenge(Socket socket) {
+ this.engine = socket.getEngine();
+ this.trace = socket.getTrace();
+ }
+
+ /**
+ * This method will challenge the client for their certificate.
+ * It does so by performing an SSL renegotiation. Successful
+ * completion of the SSL renegotiation results in the client
+ * providing their certificate, and execution of the task.
+ */
+ public Future<Certificate> challenge() {
+ return challenge(null);
+ }
+
+ /**
+ * This method will challenge the client for their certificate.
+ * It does so by performing an SSL renegotiation. Successful
+ * completion of the SSL renegotiation results in the client
+ * providing their certificate, and execution of the task.
+ *
+ * @param completion task to be run on successful challenge
+ */
+ public Future<Certificate> challenge(Runnable task) {
+ try {
+ if(!isChainPresent()) {
+ resume(task);
+ } else {
+ future.run();
+ }
+ } catch(Exception cause) {
+ trace.trace(ERROR, cause);
+ }
+ return future;
+ }
+
+ /**
+ * This method will challenge the client for their certificate.
+ * It does so by performing an SSL renegotiation. Successful
+ * completion of the SSL renegotiation results in the client
+ * providing their certificate, and execution of the task.
+ *
+ * @param completion task to be run on successful challenge
+ */
+ private void resume(Runnable task) {
+ try {
+ trace.trace(CERTIFICATE_CHALLENGE);
+ delegate.set(task);
+ engine.setNeedClientAuth(true);
+ engine.beginHandshake();
+ negotiation.resume();
+ } catch(Exception cause) {
+ trace.trace(ERROR, cause);
+ negotiation.cancel();
+ }
+ }
+ }
+
+ /**
+ * The <code>Delegate</code> is basically a settable runnable object.
+ * It enables the challenge to set an optional runnable that will
+ * be executed when the challenge has completed. If the challenge
+ * has not been given a completion task this runs straight through
+ * without any state change or action on the certificate.
+ */
+ private class Delegate implements Runnable {
+
+ /**
+ * This is the reference to the runnable that is to be executed.
+ */
+ private final AtomicReference<Runnable> task;
+
+ /**
+ * This is used to determine if the challenge is ready to run.
+ */
+ private final AtomicBoolean ready;
+
+ /**
+ * This is used to trace any errors when running the task.
+ */
+ private final Trace trace;
+
+ /**
+ * Constructor for the <code>Delegate</code> object. This is
+ * used to create a wrapper for the completion task so that it
+ * can be executed safely and have any errors traced.
+ *
+ * @param socket this socket the handshake is associated with
+ */
+ public Delegate(Socket socket) {
+ this.task = new AtomicReference<Runnable>();
+ this.ready = new AtomicBoolean();
+ this.trace = socket.getTrace();
+ }
+
+ /**
+ * This is used to determine if the delegate is ready to be
+ * used. It is ready only after the completion task has been
+ * set. When ready a challenge can be executed.
+ *
+ * @return this returns true if a completion task is set
+ */
+ public boolean isSet() {
+ return ready.get();
+ }
+
+ /**
+ * This is used to set the completion task that is to be executed
+ * when the challenge has finished. This can be set to null if
+ * no task is to be executed on completion.
+ *
+ * @param runnable the task to run when the challenge finishes
+ */
+ public void set(Runnable runnable) {
+ ready.set(true);
+ task.set(runnable);
+ }
+
+ /**
+ * This is used to run the completion task. If no completion
+ * task has been set this will run through without any change to
+ * the state of the certificate. All errors thrown by the task
+ * will be caught and traced.
+ */
+ public void run() {
+ try {
+ Runnable runnable = task.get();
+
+ if(runnable != null) {
+ runnable.run();
+ }
+ } catch(Exception cause) {
+ trace.trace(ERROR, cause);
+ } finally {
+ task.set(null);
+ }
+ }
+ }
+}
diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/OperationFactory.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/OperationFactory.java
new file mode 100644
index 0000000..343a12b
--- /dev/null
+++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/OperationFactory.java
@@ -0,0 +1,150 @@
+/*
+ * OperationFactory.java February 2007
+ *
+ * Copyright (C) 2007, Niall Gallagher <niallg@users.sf.net>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.simpleframework.transport;
+
+import java.io.IOException;
+
+import javax.net.ssl.SSLEngine;
+
+import org.simpleframework.transport.reactor.Operation;
+import org.simpleframework.transport.reactor.Reactor;
+
+/**
+ * The <code>OperationFactory</code> is used to create operations
+ * for the transport processor. Depending on the configuration of the
+ * pipeline object this will create different operations. Typically
+ * this will create an SSL handshake operation if the pipeline has
+ * an <code>SSLEngine</code> instance. This allows the transport
+ * processor to complete the handshake before handing the transport
+ * to the transporter for processing.
+ *
+ * @author Niall Gallagher
+ */
+class OperationFactory {
+
+ /**
+ * This is the processor used to process the created transport.
+ */
+ private final TransportProcessor processor;
+
+ /**
+ * This is the reactor used to register for I/O notifications.
+ */
+ private final Reactor reactor;
+
+ /**
+ * This is the threshold for the asynchronous buffers to use.
+ */
+ private final int threshold;
+
+ /**
+ * This is the size of the buffers to be used by the transport.
+ */
+ private final int buffer;
+
+ /**
+ * This determines if the SSL handshake is for the client side.
+ */
+ private final boolean client;
+
+ /**
+ * Constructor for the <code>OperationFactory</code> object. This
+ * uses the processor provided to hand off the created transport
+ * when it has been created. All operations created typically
+ * execute in an asynchronous thread.
+ *
+ * @param processor the processor used to dispatch the transport
+ * @param reactor this is the reactor used for I/O notifications
+ * @param buffer this is the initial size of the buffer to use
+ */
+ public OperationFactory(TransportProcessor processor, Reactor reactor, int buffer) {
+ this(processor, reactor, buffer, 20480);
+ }
+
+ /**
+ * Constructor for the <code>OperationFactory</code> object. This
+ * uses the processor provided to hand off the created transport
+ * when it has been created. All operations created typically
+ * execute in an asynchronous thread.
+ *
+ * @param processor the processor used to dispatch the transport
+ * @param reactor this is the reactor used for I/O notifications
+ * @param buffer this is the initial size of the buffer to use
+ * @param threshold maximum size of the output buffer to use
+ */
+ public OperationFactory(TransportProcessor processor, Reactor reactor, int buffer, int threshold) {
+ this(processor, reactor, buffer, threshold, false);
+ }
+
+ /**
+ * Constructor for the <code>OperationFactory</code> object. This
+ * uses the processor provided to hand off the created transport
+ * when it has been created. All operations created typically
+ * execute in an asynchronous thread.
+ *
+ * @param processor the processor used to dispatch the transport
+ * @param reactor this is the reactor used for I/O notifications
+ * @param buffer this is the initial size of the buffer to use
+ * @param threshold maximum size of the output buffer to use
+ * @param client determines if the SSL handshake is for a client
+ */
+ public OperationFactory(TransportProcessor processor, Reactor reactor, int buffer, int threshold, boolean client) {
+ this.processor = processor;
+ this.threshold = threshold;
+ this.reactor = reactor;
+ this.buffer = buffer;
+ this.client = client;
+ }
+
+ /**
+ * This method is used to create <code>Operation</code> object to
+ * process the next phase of the negotiation. The operations that
+ * are created using this factory ensure the processing can be
+ * done asynchronously, which reduces the overhead the connection
+ * thread has when handing the pipelines over for processing.
+ *
+ * @param socket this is the pipeline that is to be processed
+ *
+ * @return this returns the operation used for processing
+ */
+ public Operation getInstance(Socket socket) throws IOException {
+ return getInstance(socket, socket.getEngine());
+ }
+
+ /**
+ * This method is used to create <code>Operation</code> object to
+ * process the next phase of the negotiation. The operations that
+ * are created using this factory ensure the processing can be
+ * done asynchronously, which reduces the overhead the connection
+ * thread has when handing the pipelines over for processing.
+ *
+ * @param socket this is the pipeline that is to be processed
+ * @param engine this is the engine used for SSL negotiations
+ *
+ * @return this returns the operation used for processing
+ */
+ private Operation getInstance(Socket socket, SSLEngine engine) throws IOException {
+ Transport transport = new SocketTransport(socket, reactor, buffer, threshold);
+
+ if(engine != null) {
+ return new Handshake(processor, transport, reactor, client);
+ }
+ return new TransportDispatcher(processor, transport);
+ }
+}
diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/Phase.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/Phase.java
new file mode 100644
index 0000000..a2bb2cd
--- /dev/null
+++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/Phase.java
@@ -0,0 +1,165 @@
+/*
+ * Phase.java February 2007
+ *
+ * Copyright (C) 2007, Niall Gallagher <niallg@users.sf.net>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.simpleframework.transport;
+
+import static org.simpleframework.transport.TransportEvent.ERROR;
+
+import java.io.IOException;
+import java.nio.channels.SelectableChannel;
+
+import org.simpleframework.transport.reactor.Operation;
+import org.simpleframework.transport.reactor.Reactor;
+import org.simpleframework.transport.trace.Trace;
+
+/**
+ * The <code>Phase</code> object represents an asynchronous phase
+ * within the negotiation. This is typically used to either schedule
+ * an asynchronous read or write when it can not be performed
+ * directly. It ensures that the negotiation does not block the
+ * thread so that execution can be optimized of high concurrency.
+ *
+ * @author Niall Gallagher
+ *
+ * @see org.simpleframework.transport.Handshake
+ */
+abstract class Phase implements Operation {
+
+ /**
+ * This is the negotiation that this task will operate on.
+ */
+ protected final Negotiation state;
+
+ /**
+ * This is the reactor that is used to schedule execution.
+ */
+ protected final Reactor reactor;
+
+ /**
+ * This is the trace used to monitor the handshake socket.
+ */
+ protected final Trace trace;
+
+ /**
+ * This is the required operation for the task to complete.
+ */
+ protected final int require;
+
+ /**
+ * Constructor for the <code>Phase</code> object. This is used to
+ * create an operation that performs some phase of a negotiation.
+ * It allows the negotiation to schedule the read and write
+ * operations asynchronously.
+ *
+ * @param state this is the negotiation this task works on
+ * @param reactor this is the reactor used to schedule the task
+ * @param trace the trace that is used to monitor the handshake
+ * @param require this is the required operation for the task
+ */
+ public Phase(Negotiation state, Reactor reactor, Trace trace, int require) {
+ this.reactor = reactor;
+ this.require = require;
+ this.state = state;
+ this.trace = trace;
+ }
+
+ /**
+ * This is used to acquire the trace object that is associated
+ * with the operation. A trace object is used to collection details
+ * on what operations are being performed. For instance it may
+ * contain information relating to I/O events or errors.
+ *
+ * @return this returns the trace associated with this operation
+ */
+ public Trace getTrace() {
+ return trace;
+ }
+
+ /**
+ * This is the <code>SelectableChannel</code> which is used to
+ * determine if the operation should be executed. If the channel
+ * is ready for a given I/O event it can be run. For instance if
+ * the operation is used to perform some form of read operation
+ * it can be executed when ready to read data from the channel.
+ *
+ * @return this returns the channel used to govern execution
+ */
+ public SelectableChannel getChannel() {
+ return state.getChannel();
+ }
+
+ /**
+ * This is used to execute the task. It is up to the specific
+ * task implementation to decide what to do when executed. If
+ * the task needs to read or write data then it can attempt
+ * to perform the read or write, if it incomplete the it can
+ * be scheduled for execution with the reactor.
+ */
+ public void run() {
+ try {
+ execute();
+ }catch(Exception cause) {
+ trace.trace(ERROR, cause);
+ cancel();
+ }
+ }
+
+ /**
+ * This is used to cancel the operation if it has timed out. This
+ * is typically invoked when it has been waiting in a selector for
+ * an extended duration of time without any active operations on
+ * it. In such a case the reactor must purge the operation to free
+ * the memory and open channels associated with the operation.
+ */
+ public void cancel() {
+ try {
+ state.cancel();
+ }catch(Exception cause) {
+ trace.trace(ERROR, cause);
+ }
+ }
+
+ /**
+ * This is used to execute the task. It is up to the specific
+ * task implementation to decide what to do when executed. If
+ * the task needs to read or write data then it can attempt
+ * to perform the read or write, if it incomplete the it can
+ * be scheduled for execution with the reactor.
+ */
+ protected void execute() throws IOException {
+ boolean done = ready();
+
+ if(!done) {
+ reactor.process(this, require);
+ } else {
+ state.resume();
+ }
+ }
+
+ /**
+ * This method is used to determine if the task is ready. This is
+ * executed when the select operation is signaled. When this is
+ * true the the task completes. If not then this will schedule
+ * the task again for the specified select operation.
+ *
+ * @return this returns true when the task has completed
+ */
+ protected boolean ready() throws IOException {
+ return true;
+ }
+}
diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/PhaseType.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/PhaseType.java
new file mode 100644
index 0000000..dd202d6
--- /dev/null
+++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/PhaseType.java
@@ -0,0 +1,45 @@
+/*
+ * PhaseType.java February 2007
+ *
+ * Copyright (C) 2007, Niall Gallagher <niallg@users.sf.net>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.simpleframework.transport;
+
+/**
+ * The <code>PhaseType</code> enumeration is used to determine what
+ * phase of the negotiation the handshake is in. This allows the
+ * negotiation to control the selection for read and write ready
+ * operations. Also, status signals completion of the handshake.
+ *
+ * @author Niall Gallagher
+ */
+enum PhaseType {
+
+ /**
+ * Tells the negotiation that a read operations is needed.
+ */
+ CONSUME,
+
+ /**
+ * Tells the negotiation that a write operation is required.
+ */
+ PRODUCE,
+
+ /**
+ * Tells the negotiation that the the handshake is complete.
+ */
+ COMMIT
+} \ No newline at end of file
diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/SecureTransport.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/SecureTransport.java
new file mode 100644
index 0000000..2083873
--- /dev/null
+++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/SecureTransport.java
@@ -0,0 +1,428 @@
+/*
+ * SecureTransport.java February 2007
+ *
+ * Copyright (C) 2007, Niall Gallagher <niallg@users.sf.net>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.simpleframework.transport;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.util.Map;
+
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLEngineResult;
+import javax.net.ssl.SSLEngineResult.Status;
+
+import org.simpleframework.transport.trace.Trace;
+
+/**
+ * The <code>SecureTransport</code> object provides an implementation
+ * of a transport used to send and receive data over SSL. Data read
+ * from this transport is decrypted using an <code>SSLEngine</code>.
+ * Also, all data is written is encrypted with the same engine. This
+ * ensures that data can be send and received in a transparent way.
+ *
+ * @author Niall Gallagher
+ */
+class SecureTransport implements Transport {
+
+ /**
+ * This is the certificate associated with this SSL connection.
+ */
+ private Certificate certificate;
+
+ /**
+ * This is the transport used to send data over the socket.
+ */
+ private Transport transport;
+
+ /**
+ * This buffer is used to output the data for the SSL sent.
+ */
+ private ByteBuffer output;
+
+ /**
+ * This is the internal buffer used to exchange the SSL data.
+ */
+ private ByteBuffer input;
+
+ /**
+ * This is the internal buffer used to exchange the SSL data.
+ */
+ private ByteBuffer swap;
+
+ /**
+ * This is the SSL engine used to encrypt and decrypt data.
+ */
+ private SSLEngine engine;
+
+ /**
+ * This is the trace that is used to monitor socket activity.
+ */
+ private Trace trace;
+
+ /**
+ * This is used to determine if the transport was closed.
+ */
+ private boolean closed;
+
+ /**
+ * This is used to determine if the end of stream was reached.
+ */
+ private boolean finished;
+
+ /**
+ * Constructor for the <code>SecureTransport</code> object. This
+ * is used to create a transport for sending and receiving data
+ * over SSL. This must be created with a pipeline that has already
+ * performed the SSL handshake and is read to used.
+ *
+ * @param transport this is the transport to delegate operations to
+ * @param certificate this is the certificate for the connection
+ * @param input this is the input buffer used to read the data
+ * @param swap this is the swap buffer to be used for reading
+ */
+ public SecureTransport(Transport transport, Certificate certificate, ByteBuffer input, ByteBuffer swap) {
+ this(transport, certificate, input, swap, 20480);
+ }
+
+ /**
+ * Constructor for the <code>SecureTransport</code> object. This
+ * is used to create a transport for sending and receiving data
+ * over SSL. This must be created with a pipeline that has already
+ * performed the SSL handshake and is read to used.
+ *
+ * @param transport this is the transport to delegate operations to
+ * @param certificate this is the certificate for the connection
+ * @param input this is the input buffer used to read the data
+ * @param swap this is the swap buffer to be used for reading
+ * @param size this is the size of the buffers to be allocated
+ */
+ public SecureTransport(Transport transport, Certificate certificate, ByteBuffer input, ByteBuffer swap, int size) {
+ this.output = ByteBuffer.allocate(size);
+ this.engine = transport.getEngine();
+ this.trace = transport.getTrace();
+ this.certificate = certificate;
+ this.transport = transport;
+ this.input = input;
+ this.swap = swap;
+ }
+
+ /**
+ * This is used to acquire the SSL certificate used when the
+ * server is using a HTTPS connection. For plain text connections
+ * or connections that use a security mechanism other than SSL
+ * this will be null. This is only available when the connection
+ * makes specific use of an SSL engine to secure the connection.
+ *
+ * @return this returns the associated SSL certificate if any
+ */
+ public Certificate getCertificate() {
+ return certificate;
+ }
+
+ /**
+ * This is used to acquire the trace object that is associated
+ * with the socket. A trace object is used to collection details
+ * on what operations are being performed on the socket. For
+ * instance it may contain information relating to I/O events
+ * or more application specific events such as errors.
+ *
+ * @return this returns the trace associated with this socket
+ */
+ public Trace getTrace() {
+ return trace;
+ }
+
+ /**
+ * This is used to acquire the SSL engine used for HTTPS. If the
+ * pipeline is connected to an SSL transport this returns an SSL
+ * engine which can be used to establish the secure connection
+ * and send and receive content over that connection. If this is
+ * null then the pipeline represents a normal transport.
+ *
+ * @return the SSL engine used to establish a secure transport
+ */
+ public SSLEngine getEngine() {
+ return engine;
+ }
+
+ /**
+ * This method is used to get the <code>Map</code> of attributes
+ * by this pipeline. The attributes map is used to maintain details
+ * about the connection. Information such as security credentials
+ * to client details can be placed within the attribute map.
+ *
+ * @return this returns the map of attributes for this pipeline
+ */
+ public Map getAttributes() {
+ return transport.getAttributes();
+ }
+
+ /**
+ * This method is used to acquire the <code>SocketChannel</code>
+ * for the connection. This allows the server to acquire the input
+ * and output streams with which to communicate. It can also be
+ * used to configure the connection and perform various network
+ * operations that could otherwise not be performed.
+ *
+ * @return this returns the socket used by this HTTP pipeline
+ */
+ public SocketChannel getChannel() {
+ return transport.getChannel();
+ }
+
+ /**
+ * This is used to perform a non-blocking read on the transport.
+ * If there are no bytes available on the input buffers then
+ * this method will return zero and the buffer will remain the
+ * same. If there is data and the buffer can be filled then this
+ * will return the number of bytes read. Finally if the socket
+ * is closed this will return a -1 value.
+ *
+ * @param buffer this is the buffer to append the bytes to
+ *
+ * @return this returns the number of bytes that have been read
+ */
+ public int read(ByteBuffer buffer) throws IOException {
+ if(closed) {
+ throw new TransportException("Transport is closed");
+ }
+ if(finished) {
+ return -1;
+ }
+ int count = fill(buffer);
+
+ if(count <= 0) {
+ return process(buffer);
+ }
+ return count;
+ }
+
+ /**
+ * This is used to perform a non-blocking read on the transport.
+ * If there are no bytes available on the input buffers then
+ * this method will return zero and the buffer will remain the
+ * same. If there is data and the buffer can be filled then this
+ * will return the number of bytes read.
+ *
+ * @param buffer this is the buffer to append the bytes to
+ *
+ * @return this returns the number of bytes that have been read
+ */
+ private int process(ByteBuffer buffer) throws IOException {
+ int size = swap.position();
+
+ if(size >= 0) {
+ swap.compact();
+ }
+ int space = swap.remaining();
+
+ if(space > 0) {
+ size = transport.read(swap);
+
+ if(size < 0) {
+ finished = true;
+ }
+ }
+ if(size > 0 || space > 0) {
+ swap.flip();
+ receive();
+ }
+ return fill(buffer);
+ }
+
+ /**
+ * This is used to fill the provided buffer with data that has
+ * been read from the secure socket channel. This enables reading
+ * of the decrypted data in chunks that are smaller than the
+ * size of the input buffer used to contain the plain text data.
+ *
+ * @param buffer this is the buffer to append the bytes to
+ *
+ * @return this returns the number of bytes that have been read
+ */
+ private int fill(ByteBuffer buffer) throws IOException {
+ int space = buffer.remaining();
+ int count = input.position();
+
+ if(count > 0) {
+ if(count > space) {
+ count = space;
+ }
+ }
+ return fill(buffer, count);
+
+ }
+
+ /**
+ * This is used to fill the provided buffer with data that has
+ * been read from the secure socket channel. This enables reading
+ * of the decrypted data in chunks that are smaller than the
+ * size of the input buffer used to contain the plain text data.
+ *
+ * @param buffer this is the buffer to append the bytes to
+ * @param count this is the number of bytes that are to be read
+ *
+ * @return this returns the number of bytes that have been read
+ */
+ private int fill(ByteBuffer buffer, int count) throws IOException {
+ input.flip();
+
+ if(count > 0) {
+ count = append(buffer, count);
+ }
+ input.compact();
+ return count;
+ }
+
+ /**
+ * This will append bytes within the transport to the given buffer.
+ * Once invoked the buffer will contain the transport bytes, which
+ * will have been drained from the buffer. This effectively moves
+ * the bytes in the buffer to the end of the packet instance.
+ *
+ * @param buffer this is the buffer containing the bytes
+ * @param count this is the number of bytes that should be used
+ *
+ * @return returns the number of bytes that have been moved
+ */
+ private int append(ByteBuffer buffer, int count) throws IOException {
+ ByteBuffer segment = input.slice();
+
+ if(closed) {
+ throw new TransportException("Transport is closed");
+ }
+ int mark = input.position();
+ int size = mark + count;
+
+ if(count > 0) {
+ input.position(size);
+ segment.limit(count);
+ buffer.put(segment);
+ }
+ return count;
+ }
+
+ /**
+ * This is used to perform a non-blocking read on the transport.
+ * If there are no bytes available on the input buffers then
+ * this method will return zero and the buffer will remain the
+ * same. If there is data and the buffer can be filled then this
+ * will return the number of bytes read. Finally if the socket
+ * is closed this will return a -1 value.
+ */
+ private void receive() throws IOException {
+ int count = swap.remaining();
+
+ while(count > 0) {
+ SSLEngineResult result = engine.unwrap(swap, input);
+ Status status = result.getStatus();
+
+ switch(status) {
+ case BUFFER_OVERFLOW:
+ case BUFFER_UNDERFLOW:
+ return;
+ case CLOSED:
+ throw new TransportException("Transport error " + result);
+ }
+ count = swap.remaining();
+
+ if(count <= 0) {
+ break;
+ }
+ }
+ }
+
+ /**
+ * This method is used to deliver the provided buffer of bytes to
+ * the underlying transport. Depending on the connection type the
+ * array may be encoded for SSL transport or send directly. Any
+ * implementation may choose to buffer the bytes for performance.
+ *
+ * @param buffer this is the array of bytes to send to the client
+ */
+ public void write(ByteBuffer buffer) throws IOException {
+ if(closed) {
+ throw new TransportException("Transport is closed");
+ }
+ int capacity = output.capacity();
+ int ready = buffer.remaining();
+ int length = ready;
+
+ while(ready > 0) {
+ int size = Math.min(ready, capacity / 2);
+ int mark = buffer.position();
+
+ if(length * 2 > capacity) {
+ buffer.limit(mark + size);
+ }
+ send(buffer);
+ output.clear();
+ ready -= size;
+ }
+ }
+
+ /**
+ * This method is used to deliver the provided buffer of bytes to
+ * the underlying transport. Depending on the connection type the
+ * array may be encoded for SSL transport or send directly. Any
+ * implementation may choose to buffer the bytes for performance.
+ *
+ * @param buffer this is the array of bytes to send to the client
+ */
+ private void send(ByteBuffer buffer) throws IOException {
+ SSLEngineResult result = engine.wrap(buffer, output);
+ Status status = result.getStatus();
+
+ switch(status){
+ case BUFFER_OVERFLOW:
+ case BUFFER_UNDERFLOW:
+ case CLOSED:
+ throw new TransportException("Transport error " + status);
+ default:
+ output.flip();
+ }
+ transport.write(output);
+ }
+
+ /**
+ * This method is used to flush the contents of the buffer to
+ * the client. This method will block until such time as all of
+ * the data has been sent to the client. If at any point there
+ * is an error sending the content an exception is thrown.
+ */
+ public void flush() throws IOException {
+ if(closed) {
+ throw new TransportException("Transport is closed");
+ }
+ transport.flush();
+ }
+
+ /**
+ * This is used to close the sender and the underlying transport.
+ * If a close is performed on the sender then no more bytes can
+ * be read from or written to the transport and the client will
+ * received a connection close on their side.
+ */
+ public void close() throws IOException {
+ if(!closed) {
+ transport.close();
+ closed = true;
+ }
+ }
+}
diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/ServerCleaner.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/ServerCleaner.java
new file mode 100644
index 0000000..27aaf79
--- /dev/null
+++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/ServerCleaner.java
@@ -0,0 +1,86 @@
+/*
+ * ServerCleaner.java February 2009
+ *
+ * Copyright (C) 2009, Niall Gallagher <niallg@users.sf.net>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.simpleframework.transport;
+
+import org.simpleframework.common.thread.ConcurrentExecutor;
+import org.simpleframework.common.thread.Daemon;
+import org.simpleframework.transport.reactor.Reactor;
+
+/**
+ * The <code>ServerCleaner</code> object allows for the termination
+ * and resource recovery to be done asynchronously. This ensures that
+ * should a HTTP request be used to terminate the processor that it
+ * does not block waiting for the servicing thread pool to terminate
+ * causing a deadlock.
+ *
+ * @author Niall Gallagher
+ */
+class ServerCleaner extends Daemon {
+
+ /**
+ * This is the internal processor that is to be terminated.
+ */
+ private final TransportProcessor processor;
+
+ /**
+ * This is the thread pool implementation used by the server.
+ */
+ private final ConcurrentExecutor executor;
+
+ /**
+ * This is the internal write reactor that is terminated.
+ */
+ private final Reactor reactor;
+
+ /**
+ * Constructor for the <code>ServerCleaner</code> object. For an
+ * orderly termination of the processor, the processor and reactor
+ * provided to the constructor will be stopped asynchronously.
+ *
+ * @param processor this is the processor that is to be stopped
+ * @param executor this is the executor used by the server
+ * @param reactor this is the reactor that is to be closed
+ */
+ public ServerCleaner(TransportProcessor processor, ConcurrentExecutor executor, Reactor reactor) {
+ this.processor = processor;
+ this.executor = executor;
+ this.reactor = reactor;
+ }
+
+ /**
+ * When this method runs it will firstly stop the processor in
+ * a synchronous fashion. Once the <code>TransportProcessor</code>
+ * has stopped it will stop the <code>Reactor</code> ensuring that
+ * all threads will be released.
+ * <p>
+ * It is important to note that stopping the processor before
+ * stopping the reactor is required. This ensures that if there
+ * are any threads executing within the processor that require
+ * the reactor threads, they can complete without a problem.
+ */
+ public void run() {
+ try {
+ processor.stop();
+ executor.stop();
+ reactor.stop();
+ } catch(Exception e) {
+ return;
+ }
+ }
+} \ No newline at end of file
diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/Socket.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/Socket.java
new file mode 100644
index 0000000..b59cb0f
--- /dev/null
+++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/Socket.java
@@ -0,0 +1,89 @@
+/*
+ * Socket.java February 2001
+ *
+ * Copyright (C) 2001, Niall Gallagher <niallg@users.sf.net>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.simpleframework.transport;
+
+import java.nio.channels.SocketChannel;
+import java.util.Map;
+
+import javax.net.ssl.SSLEngine;
+
+import org.simpleframework.transport.trace.Trace;
+
+/**
+ * This is a <code>Socket</code> interface that is used to represent
+ * a socket. This has a map that allows attributes to be associated
+ * with the client connection. Attributes such as security details
+ * or other transport related details can be exposed by placing them
+ * in the socket map. The <code>Processor</code> can then use these
+ * attributes as required.
+ * <p>
+ * This provides the connected <code>SocketChannel</code> that can
+ * be used to read and write data asynchronously. The socket channel
+ * must be selectable and in non-blocking mode. If the socket is not
+ * in a non-blocking state the connection will not be processed.
+ *
+ * @author Niall Gallagher
+ */
+public interface Socket {
+
+ /**
+ * This is used to acquire the trace object that is associated
+ * with the socket. A trace object is used to collection details
+ * on what operations are being performed on the socket. For
+ * instance it may contain information relating to I/O events
+ * or more application specific events such as errors.
+ *
+ * @return this returns the trace associated with this socket
+ */
+ Trace getTrace();
+
+ /**
+ * This is used to acquire the SSL engine used for security. If
+ * the socket is connected to an SSL transport this returns an
+ * SSL engine which can be used to establish the secure connection
+ * and send and receive content over that connection. If this is
+ * null then the socket represents a normal transport.
+ *
+ * @return the SSL engine used to establish a secure transport
+ */
+ SSLEngine getEngine();
+
+ /**
+ * This method is used to acquire the <code>SocketChannel</code>
+ * for the connection. This allows the server to acquire the input
+ * and output streams with which to communicate. It can also be
+ * used to configure the connection and perform various network
+ * operations that could otherwise not be performed.
+ *
+ * @return this returns the socket used by this socket
+ */
+ SocketChannel getChannel();
+
+ /**
+ * This method is used to get the <code>Map</code> of attributes
+ * for this socket. The attributes map is used to maintain details
+ * about the connection. Information such as security credentials
+ * to client details can be placed within the attribute map.
+ *
+ * @return this returns the map of attributes for this socket
+ */
+ Map getAttributes();
+}
+
+
diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/SocketBuffer.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/SocketBuffer.java
new file mode 100644
index 0000000..d3cdc22
--- /dev/null
+++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/SocketBuffer.java
@@ -0,0 +1,308 @@
+/*
+ * SocketBuffer.java February 2014
+ *
+ * Copyright (C) 2008, Niall Gallagher <niallg@users.sf.net>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.simpleframework.transport;
+
+import static org.simpleframework.transport.TransportEvent.CLOSE;
+import static org.simpleframework.transport.TransportEvent.ERROR;
+import static org.simpleframework.transport.TransportEvent.WRITE;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+
+import org.simpleframework.transport.trace.Trace;
+
+/**
+ * The <code>SocketBuffer</code> represents a buffer that aggregates
+ * small fragments in to a single buffer before sending them. This
+ * is primarily used as a means to avoid sending many small packets
+ * rather than reasonable size ones for performance. This also
+ * enables a higher level of concurrency, as it will allow data
+ * that can't be sent over the socket to be buffered until it gets
+ * the signal that says it can be sent on.
+ *
+ * @author Niall Gallagher
+ */
+class SocketBuffer {
+
+ /**
+ * This is a small internal buffer to collect fragments.
+ */
+ private SocketBufferAppender appender;
+
+ /**
+ * This is the underlying socket to sent to the data over.
+ */
+ private SocketChannel channel;
+
+ /**
+ * This is a reference to the last buffer to be sent.
+ */
+ private ByteBuffer reference;
+
+ /**
+ * This is used to trace various events that occur.
+ */
+ private Trace trace;
+
+ /**
+ * This is the recommended minimum packet size to send.
+ */
+ private int chunk;
+
+ /**
+ * This is used to determine if the buffer was closed.
+ */
+ private boolean closed;
+
+ /**
+ * Constructor for the <code>SocketBuffer</code> object. This is
+ * used to create a buffer that will collect small fragments sent
+ * in to a more reasonably sized packet.
+ *
+ * @param socket this is the socket to write the data to
+ * @param chunk this is the minimum packet size to used
+ * @param limit this is the maximum size of the output buffer
+ */
+ public SocketBuffer(Socket socket, int chunk, int limit) {
+ this.appender = new SocketBufferAppender(socket, chunk, limit);
+ this.channel = socket.getChannel();
+ this.trace = socket.getTrace();
+ this.chunk = chunk;
+ }
+
+ /**
+ * This is used to determine if the buffer is ready to be written
+ * to. A buffer is ready when it does not hold a reference to
+ * any other buffer internally. The the <code>flush</code> method
+ * must return true for a buffer to be considered ready.
+ *
+ * @return returns true if the buffer is ready to write to
+ */
+ public synchronized boolean ready() throws IOException {
+ if(closed) {
+ throw new TransportException("Buffer has been closed");
+ }
+ if(reference != null) {
+ int remaining = reference.remaining();
+
+ if(remaining <= 0) {
+ reference = null;
+ return true;
+ }
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * This will write the bytes to underlying channel if the data is
+ * greater than the minimum buffer size. If it is less than the
+ * minimum size then it will be appended to the internal buffer.
+ * If it is larger than the maximum size of the internal buffer
+ * a reference is kept to it. This reference can only be cleared
+ * with the <code>flush</code> method, which will attempt to
+ * write the data to the channel, and buffer any remaining data
+ * if the underly connection is busy.
+ *
+ * @param data this is the data to write the the channel.
+ *
+ * @return this returns true if no reference was held
+ */
+ public synchronized boolean write(ByteBuffer duplicate) throws IOException {
+ if(closed) {
+ throw new TransportException("Buffer has been closed");
+ }
+ if(reference != null) {
+ throw new IOException("Buffer already pending write");
+ }
+ int count = appender.length();
+
+ if(count > 0) {
+ return merge(duplicate);
+ }
+ int remaining = duplicate.remaining();
+
+ if(remaining < chunk) {
+ appender.append(duplicate);// just save it..
+ return true;
+ }
+ if(!flush(duplicate)) { // attempt a write
+ int space = appender.space();
+
+ if(remaining < space) {
+ appender.append(duplicate);
+ return true;
+ }
+ reference = duplicate;
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * This method is used to perform a merge of the buffer to be sent
+ * with the current buffer. If the internal buffer is large enough
+ * to send after the merge then it will be sent. Also, if the
+ * remaining bytes in the buffer are large enough for a packet
+ * then that too will be sent over the socket.
+ *
+ * @param duplicate this is the buffer to be merged
+ *
+ * @return this returns true if no reference was held
+ */
+ private synchronized boolean merge(ByteBuffer duplicate) throws IOException {
+ if(closed) {
+ throw new TransportException("Buffer has been closed");
+ }
+ int count = appender.length();
+ int merged = appender.append(duplicate);
+ int payload = merged + count;
+
+ if(payload >= chunk) { // viable packet size
+ int written = appender.write(channel);
+
+ if(written < payload) {// count not fully flush buffer
+ reference = duplicate;
+ return false;
+ }
+ return write(duplicate); // we are back at zero
+ }
+ return true; // everything was buffered as chunk >= capacity
+ }
+
+ /**
+ * This method is used to fully flush the contents of the buffer to
+ * the underlying output stream. This will only ever return true
+ * if there are no references held and no data internally buffered.
+ * If before this method is invoked a reference to a byte buffer
+ * is held then this will attempt to merge it with the internal
+ * buffer so that the <code>ready</code> method can return true.
+ * This ensures that the writing thread does not need to block.
+ *
+ * @return this returns true if all of the bytes are sent
+ */
+ public synchronized boolean flush() throws IOException {
+ if(closed) {
+ throw new TransportException("Buffer has been closed");
+ }
+ int count = appender.length();
+
+ if(count > 0) {
+ int written = appender.write(channel);
+
+ if(written < count) {
+ compact();
+ return false; // we are still buffering
+ }
+ }
+ if(reference != null) {
+ if(!flush(reference)) {
+ compact();
+ return false;
+ }
+ reference = null;
+ }
+ return true; // no more data buffered
+ }
+
+ /**
+ * This write method will write the contents of the buffer to the
+ * provided byte channel. If the whole buffer can be be written
+ * then this will simply return the number of bytes that have.
+ * The number of bytes remaining within the packet after a write
+ * can be acquired from the <code>length</code> method. Once all
+ * of the bytes are written the packet must be closed.
+ *
+ * @param channel this is the channel to write the packet to
+ * @param segment this is the segment that is to be written
+ *
+ * @return this returns the number of bytes that were written
+ */
+ private synchronized boolean flush(ByteBuffer segment) throws IOException {
+ if(closed) {
+ throw new TransportException("Buffer has been closed");
+ }
+ int require = segment.remaining();
+ int count = 0;
+
+ while(count < require) {
+ int size = channel.write(segment);
+
+ if(size <= 0) {
+ break;
+ }
+ if(trace != null) {
+ trace.trace(WRITE, size);
+ }
+ count += size;
+ }
+ if(count == require) {
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * To ensure that we can release any references and thus avoid a
+ * blocking thread this method will attempt to merge references
+ * in to the internal buffer. Compacting in this manner is done
+ * only if the full reference can fit in to the available space.
+ */
+ private synchronized void compact() throws IOException {
+ if(closed) {
+ throw new TransportException("Buffer has been closed");
+ }
+ if(reference != null) {
+ int remaining = reference.remaining();
+ int space = appender.space();
+
+ if(remaining < space) {
+ appender.append(reference); // try to release the buffer
+ reference = null;
+ }
+ }
+ }
+
+ /**
+ * This is used to close the writer and the underlying socket.
+ * If a close is performed on the writer then no more bytes
+ * can be read from or written to the writer and the client
+ * will receive a connection close on their side. This also
+ * ensures that the TCP FIN ACK is sent before the actual
+ * channel is closed. This is required for a clean shutdown.
+ */
+ public synchronized void close() throws IOException {
+ if(closed) {
+ throw new TransportException("Buffer has been closed");
+ }
+ if(!closed) {
+ try{
+ closed = true;
+ trace.trace(CLOSE);
+ channel.socket().shutdownOutput();
+ }catch(Throwable cause){
+ trace.trace(ERROR, cause);
+ }
+ channel.close();
+ }
+ }
+}
+
+
diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/SocketBufferAppender.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/SocketBufferAppender.java
new file mode 100644
index 0000000..1b9c279
--- /dev/null
+++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/SocketBufferAppender.java
@@ -0,0 +1,289 @@
+/*
+ * SocketBufferAppender.java February 2008
+ *
+ * Copyright (C) 2008, Niall Gallagher <niallg@users.sf.net>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.simpleframework.transport;
+
+import static org.simpleframework.transport.TransportEvent.WRITE;
+import static org.simpleframework.transport.TransportEvent.WRITE_BUFFER;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.channels.ByteChannel;
+import java.nio.charset.Charset;
+
+import org.simpleframework.transport.trace.Trace;
+
+/**
+ * The <code>SocketBufferAppender</code> represents a buffer fragment
+ * collector. This provides write access to a direct byte buffer which
+ * is used to collect fragments. Once a sufficient amount of data
+ * has been collected by this then can be written out to a channel.
+ *
+ * @author Niall Gallagher
+ */
+class SocketBufferAppender {
+
+ /**
+ * This is the buffer used to store the contents of the buffer.
+ */
+ private ByteBuffer buffer;
+
+ /**
+ * This is the trace used to watch the buffering events.
+ */
+ private Trace trace;
+
+ /**
+ * This represents the the initial size of the buffer to use.
+ */
+ private int chunk;
+
+ /**
+ * This represents the largest this appender can grow to.
+ */
+ private int limit;
+
+ /**
+ * Constructor for the <code>SocketBufferAppender</code> object. This
+ * is used to create an appender that can collect smaller fragments
+ * in to a larger buffer so that it can be delivered more efficiently.
+ *
+ * @param socket this is the socket to append data for
+ * @param chunk this is the initial size of the buffer
+ * @param limit this is the maximum size of the buffer
+ */
+ public SocketBufferAppender(Socket socket, int chunk, int limit) {
+ this.buffer = ByteBuffer.allocateDirect(chunk);
+ this.trace = socket.getTrace();
+ this.chunk = chunk;
+ this.limit = limit;
+ }
+
+ /**
+ * This is used to determine how much space is left to append
+ * data to this buffer. This is typically equivalent to capacity
+ * minus the length. However in the event that the buffer uses
+ * a private memory store that can not be written to then this
+ * can return zero regardless of the capacity and length.
+ *
+ * @return the space left within the buffer to append data to
+ */
+ public int space() {
+ return buffer.remaining();
+ }
+
+ /**
+ * This represents the capacity of the backing store. The buffer
+ * is full when length is equal to capacity and it can typically
+ * be appended to when the length is less than the capacity. The
+ * only exception is when <code>space</code> returns zero, which
+ * means that the buffer can not have bytes appended to it.
+ *
+ * @return this is the capacity of other backing byte storage
+ */
+ public int capacity() {
+ return buffer.capacity();
+ }
+
+ /**
+ * This is used to determine how mnay bytes remain within this
+ * buffer. It represents the number of write ready bytes, so if
+ * the length is greater than zero the buffer can be written to
+ * a byte channel. When length is zero the buffer can be closed.
+ *
+ * @return this is the number of bytes remaining in this buffer
+ */
+ public int length() {
+ return capacity() - space();
+ }
+
+ /**
+ * This is used to encode the underlying byte sequence to text.
+ * Converting the byte sequence to text can be useful when either
+ * debugging what exactly is being sent. Also, for transports
+ * that require string delivery of buffers this can be used.
+ *
+ * @return this returns the bytes sequence as a string object
+ */
+ public String encode() throws IOException {
+ return encode("UTF-8");
+ }
+
+ /**
+ * This is used to encode the underlying byte sequence to text.
+ * Converting the byte sequence to text can be useful when either
+ * debugging what exactly is being sent. Also, for transports
+ * that require string delivery of buffers this can be used.
+ *
+ * @param encoding this is the character set to use for encoding
+ *
+ * @return this returns the bytes sequence as a string object
+ */
+ public String encode(String encoding) throws IOException {
+ ByteBuffer segment = buffer.duplicate();
+
+ if(segment != null) {
+ segment.flip();
+ }
+ return encode(encoding, segment);
+ }
+
+ /**
+ * This is used to encode the underlying byte sequence to text.
+ * Converting the byte sequence to text can be useful when either
+ * debugging what exactly is being sent. Also, for transports
+ * that require string delivery of buffers this can be used.
+ *
+ * @param encoding this is the character set to use for encoding
+ * @param segment this is the buffer that is to be encoded
+ *
+ * @return this returns the bytes sequence as a string object
+ */
+ private String encode(String encoding, ByteBuffer segment) throws IOException {
+ Charset charset = Charset.forName(encoding);
+ CharBuffer text = charset.decode(segment);
+
+ return text.toString();
+ }
+
+ /**
+ * This will append bytes within the given buffer to the buffer.
+ * Once invoked the buffer will contain the buffer bytes, which
+ * will have been drained from the buffer. This effectively moves
+ * the bytes in the buffer to the end of the buffer instance.
+ *
+ * @param data this is the buffer containing the bytes
+ *
+ * @return returns the number of bytes that have been moved
+ */
+ public int append(ByteBuffer data) throws IOException {
+ int require = data.remaining();
+ int space = space();
+
+ if(require > space) {
+ require = space;
+ }
+ return append(data, require);
+ }
+
+ /**
+ * This will append bytes within the given buffer to the buffer.
+ * Once invoked the buffer will contain the buffer bytes, which
+ * will have been drained from the buffer. This effectively moves
+ * the bytes in the buffer to the end of the buffer instance.
+ *
+ * @param data this is the buffer containing the bytes
+ * @param count this is the number of bytes that should be used
+ *
+ * @return returns the number of bytes that have been moved
+ */
+ public int append(ByteBuffer data, int count) throws IOException {
+ ByteBuffer segment = data.slice();
+ int mark = data.position();
+ int size = mark + count;
+
+ if(count > 0) {
+ if(trace != null) {
+ trace.trace(WRITE_BUFFER, count);
+ }
+ data.position(size);
+ segment.limit(count);
+ buffer.put(segment);
+ }
+ return count;
+ }
+
+ /**
+ * This write method will write the contents of the buffer to the
+ * provided byte channel. If the whole buffer can be be written
+ * then this will simply return the number of bytes that have.
+ * The number of bytes remaining within the buffer after a write
+ * can be acquired from the <code>length</code> method. Once all
+ * of the bytes are written the buffer must be closed.
+ *
+ * @param channel this is the channel to write the buffer to
+ *
+ * @return this returns the number of bytes that were written
+ */
+ public int write(ByteChannel channel) throws IOException {
+ int size = length();
+
+ if(size <= 0) {
+ return 0;
+ }
+ return write(channel, size);
+ }
+
+ /**
+ * This write method will write the contents of the buffer to the
+ * provided byte channel. If the whole buffer can be be written
+ * then this will simply return the number of bytes that have.
+ * The number of bytes remaining within the buffer after a write
+ * can be acquired from the <code>length</code> method. Once all
+ * of the bytes are written the buffer must be closed.
+ *
+ * @param channel this is the channel to write the buffer to
+ * @param count the number of bytes to write to the channel
+ *
+ * @return this returns the number of bytes that were written
+ */
+ public int write(ByteChannel channel, int count) throws IOException {
+ if(count > 0) {
+ buffer.flip();
+ } else {
+ return 0;
+ }
+ return write(channel, buffer);
+ }
+
+ /**
+ * This write method will write the contents of the buffer to the
+ * provided byte channel. If the whole buffer can be be written
+ * then this will simply return the number of bytes that have.
+ * The number of bytes remaining within the buffer after a write
+ * can be acquired from the <code>length</code> method. Once all
+ * of the bytes are written the buffer must be closed.
+ *
+ * @param channel this is the channel to write the buffer to
+ * @param segment this is the buffer that is to be written
+ *
+ * @return this returns the number of bytes that were written
+ */
+ private int write(ByteChannel channel, ByteBuffer segment) throws IOException {
+ int require = segment.remaining();
+ int count = 0;
+
+ while(count < require) {
+ int size = channel.write(segment);
+
+ if(size <= 0) {
+ break;
+ }
+ if(trace != null) {
+ trace.trace(WRITE, size);
+ }
+ count += size;
+ }
+ if(count >= 0) {
+ segment.compact();
+ }
+ return count;
+ }
+}
+
diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/SocketBufferWriter.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/SocketBufferWriter.java
new file mode 100644
index 0000000..346aef3
--- /dev/null
+++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/SocketBufferWriter.java
@@ -0,0 +1,103 @@
+/*
+ * SocketBufferWriter.java February 2008
+ *
+ * Copyright (C) 2008, Niall Gallagher <niallg@users.sf.net>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.simpleframework.transport;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.simpleframework.transport.reactor.Reactor;
+
+/**
+ * The <code>SocketBufferWriter</code> is used to represent the means
+ * to write buffers to an underlying transport. This manages all of
+ * the selection required to determine if the socket is write ready.
+ * If the buffer to be written is to block then this will wait
+ * until all queue buffers are fully written.
+ *
+ * @author Niall Gallagher
+ */
+class SocketBufferWriter {
+
+ /**
+ * This is the flusher that is used to asynchronously flush.
+ */
+ private final SocketFlusher flusher;
+
+ /**
+ * This is the writer that is used to queue the buffers.
+ */
+ private final SocketBuffer writer;
+
+ /**
+ * Constructor for the <code>SocketBufferWriter</code> object. This
+ * is used to create a writer that can write buffers to the socket
+ * in such a way that it write either asynchronously or block
+ * the calling thread until such time as the buffers are written.
+ *
+ * @param socket this is the pipeline that this writes to
+ * @param reactor this is the writer used to scheduler writes
+ * @param buffer this is the initial size of the output buffer
+ * @param threshold this is the maximum size of the buffer
+ */
+ public SocketBufferWriter(Socket socket, Reactor reactor, int buffer, int threshold) throws IOException {
+ this.writer = new SocketBuffer(socket, buffer, threshold);
+ this.flusher = new SocketFlusher(writer, socket, reactor);
+ }
+
+ /**
+ * This method is used to deliver the provided buffer of bytes to
+ * the underlying transport. This will not modify the data that
+ * is to be written, this will simply queue the buffers in the
+ * order that they are provided.
+ *
+ * @param buffer this is the array of bytes to send to the client
+ */
+ public void write(ByteBuffer buffer) throws IOException {
+ boolean done = writer.write(buffer); // returns true if we can buffer
+
+ if(!done) {
+ flusher.flush(); // we could not fully write or buffer the data so we must flush
+ }
+ }
+
+ /**
+ * This method is used to flush all of the queued buffers to
+ * the client. This method will not block but will simply flush
+ * any data to the underlying transport. Internally the data
+ * will be queued for delivery to the connected entity.
+ */
+ public void flush() throws IOException {
+ boolean done = writer.flush(); // returns true only if everything is delivered
+
+ if(!done) {
+ flusher.flush(); // here we will block for an op write event if the buffer contains a reference
+ }
+ }
+
+ /**
+ * This is used to close the writer and the underlying socket.
+ * If a close is performed on the writer then no more bytes
+ * can be read from or written to the writer and the client
+ * will receive a connection close on their side.
+ */
+ public void close() throws IOException {
+ flusher.close();
+ writer.close();
+ }
+}
diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/SocketFlusher.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/SocketFlusher.java
new file mode 100644
index 0000000..a10cee7
--- /dev/null
+++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/SocketFlusher.java
@@ -0,0 +1,142 @@
+/*
+ * SocketFlusher.java February 2007
+ *
+ * Copyright (C) 2007, Niall Gallagher <niallg@users.sf.net>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.simpleframework.transport;
+
+import java.io.IOException;
+
+import org.simpleframework.transport.reactor.Reactor;
+
+/**
+ * The <code>SocketFlusher</code> flushes bytes to the underlying
+ * socket channel. This allows asynchronous writes to the socket
+ * to be managed in such a way that there is order to the way data
+ * is delivered over the socket. This uses a selector to dispatch
+ * flush invocations to the underlying socket when the socket is
+ * write ready. This allows the writing thread to continue without
+ * having to wait for all the data to be written to the socket.
+ *
+ * @author Niall Gallagher
+ *
+ * @see org.simpleframework.transport.SocketBufferWriter
+ */
+class SocketFlusher {
+
+ /**
+ * This is the signaller used to determine when to flush.
+ */
+ private FlushSignaller signaller;
+
+ /**
+ * This is the scheduler used to block and signal the writer.
+ */
+ private FlushScheduler scheduler;
+
+ /**
+ * This is the writer used to queue the buffers written.
+ */
+ private SocketBuffer buffer;
+
+ /**
+ * This is used to determine if the socket flusher is closed.
+ */
+ private boolean closed;
+
+ /**
+ * Constructor for the <code>SocketFlusher</code> object. This is
+ * used to flush buffers to the underlying socket asynchronously.
+ * When finished flushing all of the buffered data this signals
+ * any threads that are blocking waiting for the write to finish.
+ *
+ * @param buffer this is used to write the buffered buffers
+ * @param reactor this is used to perform asynchronous writes
+ * @param socket this is the socket used to select with
+ */
+ public SocketFlusher(SocketBuffer buffer, Socket socket, Reactor reactor) throws IOException {
+ this.signaller = new FlushSignaller(this, socket);
+ this.scheduler = new FlushScheduler(socket, reactor, signaller, this);
+ this.buffer = buffer;
+ }
+
+ /**
+ * Here in this method we schedule a flush when the underlying
+ * writer is write ready. This allows the writer thread to return
+ * without having to fully flush the content to the underlying
+ * transport. If there are references queued this will block.
+ */
+ public synchronized void flush() throws IOException {
+ if(closed) {
+ throw new TransportException("Flusher is closed");
+ }
+ boolean block = !buffer.ready();
+
+ if(!closed) {
+ scheduler.schedule(block);
+ }
+ }
+
+ /**
+ * This is executed when the flusher is to write all of the data to
+ * the underlying socket. In this situation the writes are attempted
+ * in a non blocking way, if the task does not complete then this
+ * will simply enqueue the writing task for OP_WRITE and leave the
+ * method. This returns true if all the buffers are written.
+ */
+ public synchronized void execute() throws IOException {
+ boolean ready = buffer.flush();
+
+ if(!ready) {
+ boolean block = !buffer.ready();
+
+ if(!block && !closed) {
+ scheduler.release();
+ }
+ scheduler.repeat();
+ } else{
+ scheduler.ready();
+ }
+ }
+
+ /**
+ * This is used to abort the flushing process when the reactor has
+ * been stopped. An abort to the flusher typically happens when the
+ * server has been shutdown. It prevents threads lingering waiting
+ * for a I/O operation which prevents the server from shutting down.
+ */
+ public synchronized void abort() throws IOException {
+ scheduler.close();
+ buffer.close();
+ }
+
+ /**
+ * This is used to close the flusher ensuring that all of the
+ * data within the writer will be flushed regardless of the
+ * amount of data within the writer that needs to be written. If
+ * the writer does not block then this waits to be finished.
+ */
+ public synchronized void close() throws IOException {
+ boolean ready = buffer.flush();
+
+ if(!closed) {
+ closed = true;
+ }
+ if(!ready) {
+ scheduler.schedule(true);
+ }
+ }
+}
diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/SocketProcessor.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/SocketProcessor.java
new file mode 100644
index 0000000..a5c52b3
--- /dev/null
+++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/SocketProcessor.java
@@ -0,0 +1,61 @@
+/*
+ * SocketProcessor.java February 2001
+ *
+ * Copyright (C) 2001, Niall Gallagher <niallg@users.sf.net>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.simpleframework.transport;
+
+import java.io.IOException;
+
+/**
+ * The <code>SocketProcessor</code> interface represents a processor that
+ * is used to accept <code>Socket</code> objects. Implementations of
+ * this object will typically hand the socket over for processing either
+ * by some form of protocol handler or message processor. If the socket
+ * contains an <code>SSLEngine</code> an SSL hand shake may be performed
+ * before any messages on the socket are interpreted.
+ *
+ * @author Niall Gallagher
+ *
+ * @see org.simpleframework.transport.connect.SocketConnection
+ */
+public interface SocketProcessor {
+
+ /**
+ * Used to process the <code>Socket</code> which is a full duplex
+ * TCP connection to a higher layer the application. It is this
+ * layer that is responsible for interpreting a protocol or handling
+ * messages in some manner. In the case of HTTP this will initiate
+ * the consumption of a HTTP request after any SSL handshake is
+ * finished if the connection is secure.
+ *
+ * @param socket this is the connected HTTP socket to process
+ */
+ void process(Socket socket) throws IOException;
+
+ /**
+ * This method is used to stop the <code>SocketProcessor</code> such
+ * that it will accept no more sockets. Stopping the server ensures
+ * that all resources occupied will be released. This is required
+ * so that all threads are stopped, and all memory is released.
+ * <p>
+ * Typically this method is called once all connections to the
+ * server have been stopped. As a final act of shutting down the
+ * entire server all threads must be stopped, this allows collection
+ * of unused memory and the closing of file and socket resources.
+ */
+ void stop() throws IOException;
+}
diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/SocketTransport.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/SocketTransport.java
new file mode 100644
index 0000000..b0ba04a
--- /dev/null
+++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/SocketTransport.java
@@ -0,0 +1,262 @@
+/*
+ * SocketTransport.java February 2007
+ *
+ * Copyright (C) 2007, Niall Gallagher <niallg@users.sf.net>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.simpleframework.transport;
+
+import static org.simpleframework.transport.TransportEvent.READ;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.util.Map;
+
+import javax.net.ssl.SSLEngine;
+
+import org.simpleframework.transport.reactor.Reactor;
+import org.simpleframework.transport.trace.Trace;
+
+/**
+ * The <code>SocketTransport</code> object offers a transport that can
+ * send and receive bytes in a non-blocking manner. The contract of
+ * the <code>Transport</code> is that it must either write the data
+ * it is asked to write or it must queue that data for delivery. For
+ * the vast majority of cases data is written directly to the socket
+ * without any need for queuing or selection for write ready events.
+ * <p>
+ * In the event that the client TCP window is full and writing would
+ * block this makes use of a queue of buffers which can be used to
+ * append data to. The buffers are lazily instantiated so the memory
+ * required is created only in the rare case that they are needed.
+ * Once a buffer is full it is queued to an asynchronous thread where
+ * the buffer queue is drained and sent to the client when the TCP
+ * window of the client is capable of accepting it.
+ * <p>
+ * In order to improve the network performance of this transport the
+ * default packet size sent to the TCP stack is four kilobytes. This
+ * ensures that the fragments of response delivered to the TCP layer
+ * are sufficiently large for optimum network performance.
+ *
+ * @author Niall Gallagher
+ */
+public class SocketTransport implements Transport {
+
+ /**
+ * This is the writer that is used to flush the buffer queue.
+ */
+ private SocketBufferWriter writer;
+
+ /**
+ * This is the underlying byte channel used to send the data.
+ */
+ private SocketChannel channel;
+
+ /**
+ * This is the socket that this transport is representing.
+ */
+ private Socket socket;
+
+ /**
+ * This is the trace used to monitor all transport events.
+ */
+ private Trace trace;
+
+ /**
+ * This is used to determine if the transport has been closed.
+ */
+ private boolean closed;
+
+ /**
+ * Constructor for the <code>SocketTransport</code> object. This
+ * requires a reactor to perform asynchronous writes and also the
+ * pipeline which is used to read and write data. This transport
+ * will use a queue of buffers which are lazily initialized so as
+ * to only allocate the memory on demand.
+ *
+ * @param socket this is used to read and write the data
+ * @param reactor this is used to perform asynchronous writes
+ */
+ public SocketTransport(Socket socket, Reactor reactor) throws IOException {
+ this(socket, reactor, 4096);
+ }
+
+ /**
+ * Constructor for the <code>SocketTransport</code> object. This
+ * requires a reactor to perform asynchronous writes and also the
+ * pipeline which is used to read and write data. This transport
+ * will use a queue of buffers which are lazily initialized so as
+ * to only allocate the memory on demand.
+ *
+ * @param socket this is used to read and write the data
+ * @param reactor this is used to perform asynchronous writes
+ * @param buffer this is the size of the output buffer to use
+ */
+ public SocketTransport(Socket socket, Reactor reactor, int buffer) throws IOException {
+ this(socket, reactor, buffer, 20480);
+ }
+
+ /**
+ * Constructor for the <code>SocketTransport</code> object. This
+ * requires a reactor to perform asynchronous writes and also the
+ * pipeline which is used to read and write data. This transport
+ * will use a queue of buffers which are lazily initialized so as
+ * to only allocate the memory on demand.
+ *
+ * @param socket this is used to read and write the data
+ * @param reactor this is used to perform asynchronous writes
+ * @param buffer this is the size of the output buffer to use
+ * @param threshold this is the maximum size of the output buffer
+ */
+ public SocketTransport(Socket socket, Reactor reactor, int buffer, int threshold) throws IOException {
+ this.writer = new SocketBufferWriter(socket, reactor, buffer, threshold);
+ this.channel = socket.getChannel();
+ this.trace = socket.getTrace();
+ this.socket = socket;
+ }
+
+ /**
+ * This is used to acquire the SSL certificate used when the
+ * server is using a HTTPS connection. For plain text connections
+ * or connections that use a security mechanism other than SSL
+ * this will be null. This is only available when the connection
+ * makes specific use of an SSL engine to secure the connection.
+ *
+ * @return this returns the associated SSL certificate if any
+ */
+ public Certificate getCertificate() {
+ return null;
+ }
+
+ /**
+ * This is used to acquire the trace object that is associated
+ * with the socket. A trace object is used to collection details
+ * on what operations are being performed on the socket. For
+ * instance it may contain information relating to I/O events
+ * or more application specific events such as errors.
+ *
+ * @return this returns the trace associated with this socket
+ */
+ public Trace getTrace() {
+ return trace;
+ }
+
+ /**
+ * This method is used to get the <code>Map</code> of attributes
+ * by this pipeline. The attributes map is used to maintain details
+ * about the connection. Information such as security credentials
+ * to client details can be placed within the attribute map.
+ *
+ * @return this returns the map of attributes for this pipeline
+ */
+ public Map getAttributes() {
+ return socket.getAttributes();
+ }
+
+ /**
+ * This is used to acquire the SSL engine used for https. If the
+ * pipeline is connected to an SSL transport this returns an SSL
+ * engine which can be used to establish the secure connection
+ * and send and receive content over that connection. If this is
+ * null then the pipeline represents a normal transport.
+ *
+ * @return the SSL engine used to establish a secure transport
+ */
+ public SSLEngine getEngine() {
+ return socket.getEngine();
+ }
+
+ /**
+ * This method is used to acquire the <code>SocketChannel</code>
+ * for the connection. This allows the server to acquire the input
+ * and output streams with which to communicate. It can also be
+ * used to configure the connection and perform various network
+ * operations that could otherwise not be performed.
+ *
+ * @return this returns the socket used by this HTTP pipeline
+ */
+ public SocketChannel getChannel() {
+ return socket.getChannel();
+ }
+
+ /**
+ * This is used to perform a non-blocking read on the transport.
+ * If there are no bytes available on the input buffers then
+ * this method will return zero and the buffer will remain the
+ * same. If there is data and the buffer can be filled then this
+ * will return the number of bytes read. Finally if the socket
+ * is closed this will return a -1 value.
+ *
+ * @param data this is the buffer to append the bytes to
+ *
+ * @return this returns the number of bytes that were read
+ */
+ public int read(ByteBuffer data) throws IOException {
+ if(closed) {
+ throw new TransportException("Transport is closed");
+ }
+ int count = channel.read(data);
+
+ if(trace != null) {
+ trace.trace(READ, count);
+ }
+ return count;
+ }
+
+ /**
+ * This method is used to deliver the provided buffer of bytes to
+ * the underlying transport. Depending on the connection type the
+ * array may be encoded for SSL transport or send directly. This
+ * will buffer the bytes within the internal buffer to ensure
+ * that the response fragments are sufficiently large for the
+ * network. Smaller packets result poorer performance.
+ *
+ * @param data this is the array of bytes to send to the client
+ */
+ public void write(ByteBuffer data) throws IOException{
+ if(closed) {
+ throw new TransportException("Transport is closed");
+ }
+ writer.write(data);
+ }
+
+ /**
+ * This is used to flush the internal buffer to the underlying
+ * socket. Flushing with this method is always non-blocking, so
+ * if the socket is not write ready and the buffer can be queued
+ * it will be queued and the calling thread will return.
+ */
+ public void flush() throws IOException {
+ if(closed) {
+ throw new TransportException("Transport is closed");
+ }
+ writer.flush();
+ }
+
+ /**
+ * This method is used to flush the internal buffer and close
+ * the underlying socket. This method will not complete until
+ * all buffered data is written and the underlying socket is
+ * closed at which point this can be disposed of.
+ */
+ public void close() throws IOException {
+ if(!closed) {
+ writer.flush();
+ writer.close();
+ closed = true;
+ }
+ }
+}
diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/SocketWrapper.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/SocketWrapper.java
new file mode 100644
index 0000000..805ab91
--- /dev/null
+++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/SocketWrapper.java
@@ -0,0 +1,144 @@
+/*
+ * SocketWrapper.java February 2001
+ *
+ * Copyright (C) 2001, Niall Gallagher <niallg@users.sf.net>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.simpleframework.transport;
+
+import java.nio.channels.SocketChannel;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.net.ssl.SSLEngine;
+
+import org.simpleframework.transport.trace.Trace;
+
+/**
+ * This is a <code>SocketWrapper</code> objects that represents a TCP
+ * socket connections. This contains a map that allows attributes to be
+ * associated with the client connection. Attributes such as security
+ * certificates or other transport related details can be exposed to
+ * the <code>Request</code> using the socket attribute map.
+ * <p>
+ * This provides the connected <code>SocketChannel</code> that can be
+ * used to receive and response to HTTP requests. The socket channel
+ * must be selectable and in non-blocking mode. If the socket is not
+ * in a non-blocking state the connection will not be processed.
+ *
+ * @author Niall Gallagher
+ */
+public class SocketWrapper implements Socket {
+
+ /**
+ * This is the socket that provides the input and output.
+ */
+ private final SocketChannel channel;
+
+ /**
+ * This is used to encrypt content for secure connections.
+ */
+ private final SSLEngine engine;
+
+ /**
+ * This can be used to trace specific events for the socket.
+ */
+ private final Trace trace;
+
+ /**
+ * This is used to store the attributes for the socket.
+ */
+ private final Map map;
+
+ /**
+ * This creates a <code>SocketWrapper</code> from a socket channel.
+ * Any implementations of the object may use this constructor to
+ * ensure that all the data is initialized.
+ *
+ * @param channel the socket channel that is used as the transport
+ * @param trace used to trace specific events for the socket
+ */
+ public SocketWrapper(SocketChannel channel, Trace trace) {
+ this(channel, trace, null);
+ }
+
+ /**
+ * This creates a <code>SecureSocket</code> from a socket channel.
+ * Any implementations of the object may use this constructor to
+ * ensure that all the data is initialized.
+ *
+ * @param channel the socket channel that is used as the transport
+ * @param trace used to trace specific events for the socket
+ * @param engine this is the SSL engine used for secure transport
+ */
+ public SocketWrapper(SocketChannel channel, Trace trace, SSLEngine engine) {
+ this.map = new HashMap();
+ this.channel = channel;
+ this.engine = engine;
+ this.trace = trace;
+ }
+
+ /**
+ * This is used to acquire the trace object that is associated
+ * with the socket. A trace object is used to collection details
+ * on what operations are being performed on the socket. For
+ * instance it may contain information relating to I/O events
+ * or more application specific events such as errors.
+ *
+ * @return this returns the trace associated with this socket
+ */
+ public Trace getTrace() {
+ return trace;
+ }
+
+ /**
+ * This is used to acquire the SSL engine used for HTTPS. If the
+ * socket is connected to an SSL transport this returns an SSL
+ * engine which can be used to establish the secure connection
+ * and send and receive content over that connection. If this is
+ * null then the socket represents a normal transport.
+ *
+ * @return the SSL engine used to establish a secure transport
+ */
+ public SSLEngine getEngine() {
+ return engine;
+ }
+
+ /**
+ * This method is used to acquire the <code>SocketChannel</code>
+ * for the connection. This allows the server to acquire the input
+ * and output streams with which to communicate. It can also be
+ * used to configure the connection and perform various network
+ * operations that could otherwise not be performed.
+ *
+ * @return this returns the socket used by this HTTP socket
+ */
+ public SocketChannel getChannel() {
+ return channel;
+ }
+
+ /**
+ * This method is used to get the <code>Map</code> of attributes
+ * by this socket. The attributes map is used to maintain details
+ * about the connection. Information such as security credentials
+ * to client details can be placed within the attribute map.
+ *
+ * @return this returns the map of attributes for this socket
+ */
+ public Map getAttributes() {
+ return map;
+ }
+}
+
diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/Transport.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/Transport.java
new file mode 100644
index 0000000..cc499f3
--- /dev/null
+++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/Transport.java
@@ -0,0 +1,91 @@
+/*
+ * Transport.java February 2007
+ *
+ * Copyright (C) 2007, Niall Gallagher <niallg@users.sf.net>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.simpleframework.transport;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * The <code>Transport</code> interface represents a low level means
+ * to deliver content to the connected client. Typically this will
+ * be a connected, non-blocking, TCP connection. However, for tests
+ * and other purposes this may be adapted. The general contract of
+ * the transport is that it provides non-blocking reads and blocking
+ * writes. Blocking writes are required to ensure that memory does
+ * not build up in output buffers during high load periods.
+ *
+ * @author Niall Gallagher
+ */
+public interface Transport extends Socket {
+
+ /**
+ * This is used to acquire the SSL certificate used when the
+ * server is using a HTTPS connection. For plain text connections
+ * or connections that use a security mechanism other than SSL
+ * this will be null. This is only available when the connection
+ * makes specific use of an SSL engine to secure the connection.
+ *
+ * @return this returns the associated SSL certificate if any
+ */
+ Certificate getCertificate() throws IOException;
+
+ /**
+ * This is used to perform a non-blocking read on the transport.
+ * If there are no bytes available on the input buffers then
+ * this method will return zero and the buffer will remain the
+ * same. If there is data and the buffer can be filled then this
+ * will return the number of bytes read. Finally if the socket
+ * is closed this will return a -1 value.
+ *
+ * @param buffer this is the buffer to append the bytes to
+ *
+ * @return this returns the number of bytes that have been read
+ */
+ int read(ByteBuffer buffer) throws IOException;
+
+ /**
+ * This method is used to deliver the provided buffer of bytes to
+ * the underlying transport. Depending on the connection type the
+ * array may be encoded for SSL transport or send directly. Any
+ * implementation may choose to buffer the bytes for performance.
+ *
+ * @param buffer this is the buffer of bytes to send to the client
+ */
+ void write(ByteBuffer buffer) throws IOException;
+
+ /**
+ * This method is used to flush the contents of the buffer to
+ * the client. This method will block not block but will simply
+ * flush any data to the underlying transport. Internally the
+ * data will be queued for delivery to the connected entity.
+ */
+ void flush() throws IOException;
+
+ /**
+ * This is used to close the transport and the underlying socket.
+ * If a close is performed on the transport then no more bytes
+ * can be read from or written to the transport and the client
+ * will receive a connection close on their side.
+ */
+ void close() throws IOException;
+}
+
+
+
+
diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/TransportChannel.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/TransportChannel.java
new file mode 100644
index 0000000..2d5ff1a
--- /dev/null
+++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/TransportChannel.java
@@ -0,0 +1,195 @@
+/*
+ * TransportChannel.java February 2007
+ *
+ * Copyright (C) 2007, Niall Gallagher <niallg@users.sf.net>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.simpleframework.transport;
+
+import static org.simpleframework.transport.TransportEvent.ERROR;
+
+import java.io.IOException;
+import java.nio.channels.SocketChannel;
+import java.util.Map;
+
+import javax.net.ssl.SSLEngine;
+
+import org.simpleframework.transport.trace.Trace;
+
+/**
+ * The <code>TransportChannel</code> provides a means to deliver and
+ * receive content over a transport. This essentially provides two
+ * adapters which enable simpler communication with the underlying
+ * transport. They hide the complexities involved with buffering and
+ * resetting data written to and read from the socket.
+ *
+ * @author Niall Gallagher
+ *
+ * @see org.simpleframework.transport.TransportProcessor
+ */
+public class TransportChannel implements Channel {
+
+ /**
+ * This is the certificate associated with this SSL channel.
+ */
+ private final Certificate certificate;
+
+ /**
+ * This represents the underlying transport that is to be used.
+ */
+ private final Transport transport;
+
+ /**
+ * This is the engine that is used to secure the transport.
+ */
+ private final SSLEngine engine;
+
+ /**
+ * This is used to provide a cursor view on the input.
+ */
+ private final ByteCursor cursor;
+
+ /**
+ * This is used to provide a blocking means for sending data.
+ */
+ private final ByteWriter writer;
+
+ /**
+ * This is the trace used to monitor events on the channel.
+ */
+ private final Trace trace;
+
+ /**
+ * Constructor for the <code>TransportChannel</code> object. The
+ * transport channel basically wraps a channel and provides a
+ * means to send and receive data using specialized adapters.
+ * These adapters provide a simpler means for communicating over
+ * the network to the connected client.
+ *
+ * @param transport this is the underlying transport to be used
+ */
+ public TransportChannel(Transport transport) throws IOException {
+ this.cursor = new TransportCursor(transport);
+ this.writer = new TransportWriter(transport);
+ this.certificate = transport.getCertificate();
+ this.engine = transport.getEngine();
+ this.trace = transport.getTrace();
+ this.transport = transport;
+ }
+
+ /**
+ * This is used to determine if the channel is secure and that
+ * data read from and data written to the request is encrypted.
+ * Channels transferred over SSL are considered secure and will
+ * have this return true, otherwise it will return false.
+ *
+ * @return true if this is secure for reading and writing
+ */
+ public boolean isSecure() {
+ return engine != null;
+ }
+
+ /**
+ * This is used to acquire the SSL certificate used for security.
+ * If the socket is connected to an SSL transport this returns an
+ * SSL certificate which was provided during the secure handshake
+ * between the client and server. If not certificates are present
+ * in the provided instance, a challenge can be issued.
+ *
+ * @return the SSL certificate provided by a secure transport
+ */
+ public Certificate getCertificate() {
+ return certificate;
+ }
+
+ /**
+ * This gets the <code>Trace</code> object associated with the
+ * channel. The trace is used to log various events for the life
+ * of the transaction such as low level read and write events
+ * as well as milestone events and errors.
+ *
+ * @return this returns the trace associated with the socket
+ */
+ public Trace getTrace() {
+ return trace;
+ }
+
+ /**
+ * This is the connected socket channel associated with this. In
+ * order to determine if content can be read or written to or
+ * from the channel this socket can be used with a selector. This
+ * provides a means to react to I/O events as they occur rather
+ * than polling the channel which is generally less performant.
+ *
+ * @return this returns the connected socket channel
+ */
+ public SocketChannel getSocket() {
+ return transport.getChannel();
+ }
+
+ /**
+ * This returns the <code>Map</code> of attributes used to hold
+ * connection information for the channel. The attributes here
+ * are taken from the pipeline attributes and may contain details
+ * such as SSL certificates or other such useful information.
+ *
+ * @return returns the attributes associated with the channel
+ */
+ public Map getAttributes() {
+ return transport.getAttributes();
+ }
+
+ /**
+ * This provides a <code>ByteCursor</code> for this channel. The
+ * cursor provides a seekable view of the input buffer and will
+ * allow the server kernel to peek into the input buffer without
+ * having to take the data from the input. This allows overflow
+ * to be pushed back on to the cursor for subsequent reads.
+ *
+ * @return this returns the input cursor for the channel
+ */
+ public ByteCursor getCursor() {
+ return cursor;
+ }
+
+ /**
+ * This provides a <code>ByteWriter</code> for the channel. This
+ * is used to provide a blocking output mechanism for the channel.
+ * Enabling blocking reads ensures that output buffering can be
+ * limited to an extent, which ensures that memory remains low at
+ * high load periods. Writes to the sender may result in the data
+ * being copied and queued until the socket is write ready.
+ *
+ * @return this returns the output sender for this channel
+ */
+ public ByteWriter getWriter() {
+ return writer;
+ }
+
+ /**
+ * Because the channel represents a duplex means of communication
+ * there needs to be a means to close it down. This provides such
+ * a means. By closing the channel the cursor and sender will no
+ * longer send or receive data to or from the network. The client
+ * will also be signalled that the connection has been severed.
+ */
+ public void close() {
+ try {
+ transport.close();
+ }catch(Exception cause) {
+ trace.trace(ERROR, cause);
+ }
+ }
+}
diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/TransportCursor.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/TransportCursor.java
new file mode 100644
index 0000000..d25cb90
--- /dev/null
+++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/TransportCursor.java
@@ -0,0 +1,260 @@
+/*
+ * TransportCursor.java February 2007
+ *
+ * Copyright (C) 2007, Niall Gallagher <niallg@users.sf.net>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.simpleframework.transport;
+
+import java.io.IOException;
+
+/**
+ * The <code>TransportCursor</code> object represents a cursor that
+ * can read and buffer data from an underlying transport. If the
+ * number of bytes read from the cursor is more than required for
+ * the HTTP request then those bytes can be pushed back in to the
+ * cursor using the <code>reset</code> method. This will only allow
+ * the last read to be reset within the cursor safely.
+ *
+ * @author Niall Gallagher
+ *
+ * @see org.simpleframework.transport.Transport
+ */
+public class TransportCursor implements ByteCursor {
+
+ /**
+ * This is the stream for the bytes read by this cursor object.
+ */
+ private ByteReader reader;
+
+ /**
+ * This is the buffer used to collect the bytes pushed back.
+ */
+ private byte[] buffer;
+
+ /**
+ * This is the number of bytes that have been pushed back.
+ */
+ private int count;
+
+ /**
+ * This is the mark from the last read from this cursor object.
+ */
+ private int mark;
+
+ /**
+ * This is the position to read data from the internal buffer.
+ */
+ private int pos;
+
+ /**
+ * This is the maximum number of bytes that can be pushed back.
+ */
+ private int limit;
+
+ /**
+ * Constructor for the <code>TransportCursor</code> object. This
+ * requires a transport to read the bytes from. By default this
+ * will create a buffer of of the specified size to read the
+ * input in to which enabled bytes to be buffered internally.
+ *
+ * @param transport this is the underlying transport to use
+ */
+ public TransportCursor(Transport transport) {
+ this(transport, 2048);
+ }
+
+ /**
+ * Constructor for the <code>TransportCursor</code> object. This
+ * requires a transport to read the bytes from. By default this
+ * will create a buffer of of the specified size to read the
+ * input in to which enabled bytes to be buffered internally.
+ *
+ * @param transport this is the underlying transport to use
+ * @param size this is the size of the internal buffer to use
+ */
+ public TransportCursor(Transport transport, int size) {
+ this.reader = new TransportReader(transport, size);
+ this.buffer = new byte[0];
+ this.limit = size;
+ }
+
+ /**
+ * Determines whether the cursor is still open. The cursor is
+ * considered open if there are still bytes to read. If there is
+ * still bytes buffered and the underlying transport is closed
+ * then the cursor is still considered open.
+ *
+ * @return true if there is nothing more to be read from this
+ */
+ public boolean isOpen() throws IOException {
+ return reader.isOpen();
+ }
+
+ /**
+ * Determines whether the cursor is ready for reading. When the
+ * cursor is ready then it guarantees that some amount of bytes
+ * can be read from the underlying stream without blocking.
+ *
+ * @return true if some data can be read without blocking
+ */
+ public boolean isReady() throws IOException {
+ return ready() > 0;
+ }
+
+ /**
+ * Provides the number of bytes that can be read from the stream
+ * without blocking. This is typically the number of buffered or
+ * available bytes within the stream. When this reaches zero then
+ * the cursor may perform a blocking read.
+ *
+ * @return the number of bytes that can be read without blocking
+ */
+ public int ready() throws IOException {
+ if(count > 0) {
+ return count;
+ }
+ return reader.ready();
+ }
+
+ /**
+ * Reads a block of bytes from the underlying stream. This will
+ * read up to the requested number of bytes from the underlying
+ * stream. If there are no ready bytes on the stream this can
+ * return zero, representing the fact that nothing was read.
+ *
+ * @param data this is the array to read the bytes in to
+ *
+ * @return this returns the number of bytes read from the stream
+ */
+ public int read(byte[] data) throws IOException {
+ return read(data, 0, data.length);
+ }
+
+ /**
+ * Reads a block of bytes from the underlying stream. This will
+ * read up to the requested number of bytes from the underlying
+ * stream. If there are no ready bytes on the stream this can
+ * return zero, representing the fact that nothing was read.
+ *
+ * @param data this is the array to read the bytes in to
+ * @param off this is the offset to begin writing the bytes to
+ * @param len this is the number of bytes that are requested
+ *
+ * @return this returns the number of bytes read from the stream
+ */
+ public int read(byte[] data, int off, int len) throws IOException {
+ if(count <= 0) {
+ mark = pos;
+ return reader.read(data, off, len);
+ }
+ int size = Math.min(count, len);
+
+ if(size > 0) {
+ System.arraycopy(buffer, pos, data, off, size);
+ mark = pos;
+ pos += size;
+ count -= size;
+ }
+ return size;
+ }
+
+ /**
+ * Pushes the provided data on to the cursor. Data pushed on to
+ * the cursor will be the next data read from the cursor. This
+ * complements the <code>reset</code> method which will reset
+ * the cursors position on a stream. Allowing data to be pushed
+ * on to the cursor allows more flexibility.
+ *
+ * @param data this is the data to be pushed on to the cursor
+ */
+ public void push(byte[] data) throws IOException {
+ push(data, 0, data.length);
+ }
+
+ /**
+ * Pushes the provided data on to the cursor. Data pushed on to
+ * the cursor will be the next data read from the cursor. This
+ * complements the <code>reset</code> method which will reset
+ * the cursors position on a stream. Allowing data to be pushed
+ * on to the cursor allows more flexibility.
+ *
+ * @param data this is the data to be pushed on to the cursor
+ * @param off this is the offset to begin reading the bytes
+ * @param len this is the number of bytes that are to be used
+ */
+ public void push(byte[] data, int off, int len) throws IOException {
+ int size = buffer.length;
+
+ if(size < len + count) {
+ expand(len + count);
+ }
+ int start = pos - len;
+
+ if(len > 0) {
+ System.arraycopy(data, off, buffer, start, len);
+ mark = start;
+ pos = start;
+ count += len;
+ }
+ }
+
+ /**
+ * This is used to ensure that there is enough space in the buffer
+ * to allow for more bytes to be added. If the buffer is already
+ * larger than the required capacity the this will do nothing.
+ *
+ * @param capacity the minimum size needed for the buffer
+ */
+ private void expand(int capacity) throws IOException {
+ if(capacity > limit) {
+ throw new TransportException("Capacity limit exceeded");
+ }
+ byte[] temp = new byte[capacity];
+ int start = capacity - count;
+ int shift = pos - mark
+ ;
+ if(count > 0) {
+ System.arraycopy(buffer, pos, temp, start, count);
+ }
+ pos = capacity - count;
+ mark = pos - shift;
+ buffer = temp;
+ }
+
+ /**
+ * Moves the cursor backward within the stream. This ensures
+ * that any bytes read from the last read can be pushed back
+ * in to the stream so that they can be read again. This will
+ * throw an exception if the reset can not be performed.
+ *
+ * @param size this is the number of bytes to reset back
+ *
+ * @return this is the number of bytes that have been reset
+ */
+ public int reset(int size) throws IOException {
+ if(mark == pos) {
+ return reader.reset(size);
+ }
+ if(pos - size < mark) {
+ size = pos - mark;
+ }
+ if(size > 0) {
+ count += size;
+ pos -= size;
+ }
+ return size;
+ }
+}
diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/TransportDispatcher.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/TransportDispatcher.java
new file mode 100644
index 0000000..ec48140
--- /dev/null
+++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/TransportDispatcher.java
@@ -0,0 +1,114 @@
+/*
+ * TransportDispatcher.java February 2007
+ *
+ * Copyright (C) 2007, Niall Gallagher <niallg@users.sf.net>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.simpleframework.transport;
+
+import java.nio.channels.SocketChannel;
+
+import org.simpleframework.transport.reactor.Operation;
+import org.simpleframework.transport.trace.Trace;
+
+/**
+ * The <code>TransportDispatcher</code> operation is used transfer a
+ * transport to the processor so it can be processed. This is used so
+ * that when a transport is given to the processor it can be dispatched
+ * in another thread to the processor. This is needed so that the
+ * connection thread is occupied only briefly.
+ *
+ * @author Niall Gallagher
+ */
+class TransportDispatcher implements Operation {
+
+ /**
+ * This is the processor used to transfer the transport to.
+ */
+ private final TransportProcessor processor;
+
+ /**
+ * This is the transport to be passed to the processor.
+ */
+ private final Transport transport;
+
+ /**
+ * Constructor for the <code>TransportDispatcher</code> object. This
+ * is used to transfer a transport to a processor. Transferring the
+ * transport using an operation ensures that the thread that is
+ * used to process the transport is not occupied for long.
+ *
+ * @param transport this is the transport this exchange uses
+ * @param processor this is the negotiation to dispatch to
+ */
+ public TransportDispatcher(TransportProcessor processor, Transport transport) {
+ this.transport = transport;
+ this.processor = processor;
+ }
+
+ /**
+ * This is the <code>SelectableChannel</code> which is used to
+ * determine if the operation should be executed. If the channel
+ * is ready for a given I/O event it can be run. For instance if
+ * the operation is used to perform some form of read operation
+ * it can be executed when ready to read data from the channel.
+ *
+ * @return this returns the channel used to govern execution
+ */
+ public SocketChannel getChannel() {
+ return transport.getChannel();
+ }
+
+ /**
+ * This is used to acquire the trace object that is associated
+ * with the operation. A trace object is used to collection details
+ * on what operations are being performed. For instance it may
+ * contain information relating to I/O events or errors.
+ *
+ * @return this returns the trace associated with this operation
+ */
+ public Trace getTrace() {
+ return transport.getTrace();
+ }
+
+ /**
+ * This is used to transfer the transport to the processor. This
+ * will typically be executed asynchronously so that it does not
+ * delay the thread that passes the <code>Transport</code> to the
+ * transport processor, ensuring quicker processing.
+ */
+ public void run() {
+ try {
+ processor.process(transport);
+ }catch(Exception e) {
+ cancel();
+ }
+ }
+
+ /**
+ * This is used to cancel the operation if it has timed out. This
+ * is typically invoked when it has been waiting in a selector for
+ * an extended duration of time without any active operations on
+ * it. In such a case the reactor must purge the operation to free
+ * the memory and open channels associated with the operation.
+ */
+ public void cancel() {
+ try {
+ transport.close();
+ }catch(Exception e) {
+ return;
+ }
+ }
+}
diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/TransportEvent.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/TransportEvent.java
new file mode 100644
index 0000000..97be771
--- /dev/null
+++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/TransportEvent.java
@@ -0,0 +1,91 @@
+/*
+ * TransportEvent.java October 2012
+ *
+ * Copyright (C) 2007, Niall Gallagher <niallg@users.sf.net>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.simpleframework.transport;
+
+/**
+ * The <code>TransportEvent</code> enum represents various events that
+ * can occur with the transport. Events that are available here are
+ * typically those that refer to low level I/O operations within the
+ * server. If a <code>Trace</code> has been associated with the socket
+ * connection then it will receive these events as they occur.
+ *
+ * @author Niall Gallagher
+ */
+public enum TransportEvent {
+
+ /**
+ * This event represents a read operation on the underlying socket.
+ */
+ READ,
+
+ /**
+ * This event occurs when there is no more data available to read.
+ */
+ READ_WAIT,
+
+ /**
+ * This event represents a write operation on the underlying socket.
+ */
+ WRITE,
+
+ /**
+ * This event represents a write buffer operation on the underlying socket.
+ */
+ WRITE_BUFFER,
+
+ /**
+ * This event occurs when no more data can be sent over the socket.
+ */
+ WRITE_WAIT,
+
+ /**
+ * This event occurs when a thread must wait for a write to finish.
+ */
+ WRITE_BLOCKING,
+
+ /**
+ * This event occurs with HTTPS when a new SSL handshake starts.
+ */
+ HANDSHAKE_BEGIN,
+
+ /**
+ * This event occurs with HTTPS when a SSL handshake has finished.
+ */
+ HANDSHAKE_DONE,
+
+ /**
+ * This event occurs when a server challenges for an X509 certificate.
+ */
+ CERTIFICATE_CHALLENGE,
+
+ /**
+ * This event indicates that the handshake failed in some way.
+ */
+ HANDSHAKE_FAILED,
+
+ /**
+ * This event occurs when the underlying connection is terminated.
+ */
+ CLOSE,
+
+ /**
+ * This event occurs when there is an error with the transport.
+ */
+ ERROR
+}
diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/TransportException.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/TransportException.java
new file mode 100644
index 0000000..c6394ee
--- /dev/null
+++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/TransportException.java
@@ -0,0 +1,55 @@
+/*
+ * TransportException.java February 2007
+ *
+ * Copyright (C) 2007, Niall Gallagher <niallg@users.sf.net>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.simpleframework.transport;
+
+import java.io.IOException;
+
+/**
+ * The <code>TransportException</code> object is thrown when there
+ * is a problem with the transport. Typically this is done thrown if
+ * there is a problem reading or writing to the transport.
+ *
+ * @author Niall Gallagher
+ */
+public class TransportException extends IOException {
+
+ /**
+ * Constructor for the <code>TransportException</code> object. If
+ * there is a problem sending or reading from a transport then it
+ * will throw a transport exception to report the error.
+ *
+ * @param message this is the message associated with the error
+ */
+ public TransportException(String message) {
+ super(message);
+ }
+
+ /**
+ * Constructor for the <code>TransportException</code> object. If
+ * there is a problem sending or reading from a transport then it
+ * will throw a transport exception to report the error.
+ *
+ * @param message this is the message associated with the error
+ * @param cause this is the cause of the producer exception
+ */
+ public TransportException(String message, Throwable cause) {
+ super(message);
+ initCause(cause);
+ }
+}
diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/TransportProcessor.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/TransportProcessor.java
new file mode 100644
index 0000000..13f505b
--- /dev/null
+++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/TransportProcessor.java
@@ -0,0 +1,63 @@
+/*
+ * TransportProcessor.java February 2007
+ *
+ * Copyright (C) 2001, Niall Gallagher <niallg@users.sf.net>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.simpleframework.transport;
+
+import java.io.IOException;
+
+/**
+ * This is the <code>TransportProcessor</code> used to process the
+ * provided transport in a higher layer. It is the responsibility of
+ * the delegate to handle protocols and message processing. In the
+ * case of HTTP this will process requests for a container. The
+ * transport provided can be either a direct transport or provide
+ * some form of secure encoding such as SSL.
+ *
+ * @author Niall Gallagher
+ *
+ * @see org.simpleframework.transport.Transport
+ */
+public interface TransportProcessor {
+
+ /**
+ * This is used to process a <code>Transport</code> instance in
+ * a higher layer that can handle a protocol. A transport can be
+ * a direct transport or a secure transport providing SSL. At this
+ * point any SSL handshake will have already completed.
+ * <p>
+ * Typical usage of this method is to accept multiple transport
+ * objects, each representing a unique TCP channel to the client,
+ * and process requests from those transports concurrently.
+ *
+ * @param transport the transport to process requests from
+ */
+ void process(Transport transport) throws IOException;
+
+ /**
+ * This method is used to stop the <code>TransportProcessor</code>
+ * such that it will accept no more pipelines. Stopping the connector
+ * ensures that all resources occupied will be released. This is
+ * required so that all threads are stopped and released.
+ * <p>
+ * Typically this method is called once all connections to the
+ * server have been stopped. As a final act of shutting down the
+ * entire server all threads must be stopped, this allows collection
+ * of unused memory and the closing of file and socket resources.
+ */
+ void stop() throws IOException;
+}
diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/TransportReader.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/TransportReader.java
new file mode 100644
index 0000000..713c162
--- /dev/null
+++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/TransportReader.java
@@ -0,0 +1,229 @@
+/*
+ * TransportReader.java February 2007
+ *
+ * Copyright (C) 2007, Niall Gallagher <niallg@users.sf.net>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.simpleframework.transport;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * The <code>TransportReader</code> object represents a reader that
+ * can read and buffer data from an underlying transport. If the
+ * number of bytes read from the reader is more than required for
+ * the HTTP request then those bytes can be pushed back in to the
+ * cursor using the <code>reset</code> method. This will only allow
+ * the last read to be reset within the cursor safely.
+ *
+ * @author Niall Gallagher
+ *
+ * @see org.simpleframework.transport.Transport
+ */
+class TransportReader implements ByteReader {
+
+ /**
+ * This is the underlying transport to read the bytes from.
+ */
+ private Transport transport;
+
+ /**
+ * This is used to store the bytes read from the transport.
+ */
+ private ByteBuffer buffer;
+
+ /**
+ * This is used to determine if the transport has been closed.
+ */
+ private boolean closed;
+
+ /**
+ * This represents the number of bytes that are ready to read.
+ */
+ private int count;
+
+ /**
+ * Constructor for the <code>TransportReader</code> object. This
+ * requires a transport to read the bytes from. By default this
+ * will create a buffer of two kilobytes to read the input in to
+ * which ensures several requests can be read at once.
+ *
+ * @param transport this is the underlying transport to use
+ */
+ public TransportReader(Transport transport) {
+ this(transport, 2048);
+ }
+
+ /**
+ * Constructor for the <code>TransportReader</code> object. This
+ * requires a transport to read the bytes from. By default this
+ * will create a buffer of of the specified size to read the
+ * input in to which enabled bytes to be buffered internally.
+ *
+ * @param transport this is the underlying transport to use
+ * @param size this is the size of the internal buffer to use
+ */
+ public TransportReader(Transport transport, int size) {
+ this.buffer = ByteBuffer.allocate(size);
+ this.transport = transport;
+ }
+
+ /**
+ * Determines whether the source is still open. The source is
+ * considered open if there are still bytes to read. If there is
+ * still bytes buffered and the underlying transport is closed
+ * then the source is still considered open.
+ *
+ * @return true if there is nothing more to be read from this
+ */
+ public boolean isOpen() throws IOException {
+ return count != -1;
+ }
+
+ /**
+ * Determines whether the source is ready for reading. When the
+ * source is ready then it guarantees that some amount of bytes
+ * can be read from the underlying stream without blocking.
+ *
+ * @return true if some data can be read without blocking
+ */
+ public boolean isReady() throws IOException {
+ return ready() > 0;
+ }
+
+ /**
+ * Reads a block of bytes from the underlying stream. This will
+ * read up to the requested number of bytes from the underlying
+ * stream. If there are no ready bytes on the stream this can
+ * return zero, representing the fact that nothing was read.
+ *
+ * @param data this is the array to read the bytes in to
+ *
+ * @return this returns the number of bytes read from the stream
+ */
+ public int read(byte[] data) throws IOException {
+ return read(data, 0, data.length);
+ }
+
+ /**
+ * Reads a block of bytes from the underlying stream. This will
+ * read up to the requested number of bytes from the underlying
+ * stream. If there are no ready bytes on the stream this can
+ * return zero, representing the fact that nothing was read.
+ *
+ * @param data this is the array to read the bytes in to
+ * @param off this is the offset to begin writing the bytes to
+ * @param len this is the number of bytes that are requested
+ *
+ * @return this returns the number of bytes read from the stream
+ */
+ public int read(byte[] data, int off, int len) throws IOException {
+ if(count <= 0) { // has the channel ended
+ return count;
+ }
+ int size = Math.min(len, count); // get the minimum
+
+ if(size > 0) {
+ buffer.get(data, off, size); // get the bytes
+ count -= size;
+ }
+ return Math.max(0, size);
+ }
+
+ /**
+ * Provides the number of bytes that can be read from the stream
+ * without blocking. This is typically the number of buffered or
+ * available bytes within the stream. When this reaches zero then
+ * the source may perform a blocking read.
+ *
+ * @return the number of bytes that can be read without blocking
+ */
+ public int ready() throws IOException {
+ if(count < 0) {
+ return count;
+ }
+ if(count > 0) { // if the are ready bytes don't read
+ return count;
+ }
+ return peek();
+ }
+
+ /**
+ * Provides the number of bytes that can be read from the stream
+ * without blocking. This is typically the number of buffered or
+ * available bytes within the stream. When this reaches zero then
+ * the source may perform a blocking read.
+ *
+ * @return the number of bytes that can be read without blocking
+ */
+ private int peek() throws IOException {
+ if(count <= 0) { // reset the buffer for filling
+ buffer.clear();
+ }
+ if(count > 0) {
+ buffer.compact(); // compact the buffer
+ }
+ count += transport.read(buffer); // how many were read
+
+ if(count > 0) {
+ buffer.flip(); // if there is something then flip
+ }
+ if(count < 0) { // close when stream is fully read
+ close();
+ }
+ return count;
+ }
+
+ /**
+ * Moves the source backward within the stream. This ensures
+ * that any bytes read from the last read can be pushed back
+ * in to the stream so that they can be read again. This will
+ * throw an exception if the reset can not be performed.
+ *
+ * @param size this is the number of bytes to reset back
+ *
+ * @return this is the number of bytes that have been reset
+ */
+ public int reset(int size) throws IOException {
+ int mark = buffer.position();
+
+ if(size > mark) {
+ size = mark;
+ }
+ if(mark > 0) {
+ buffer.position(mark - size);
+ count += size;
+ }
+ return size;
+ }
+
+ /**
+ * This is used to close the underlying transport. This is used
+ * when the transport returns a negative value, indicating that
+ * the client has closed the connection on the other side. If
+ * this is invoked the read method returns -1 and the reader
+ * is no longer open, further bytes can no longer be read.
+ */
+ public void close() throws IOException {
+ if(!closed) {
+ transport.close();
+ closed = true;
+ count = -1;
+ }
+ }
+}
+
+
diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/TransportSocketProcessor.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/TransportSocketProcessor.java
new file mode 100644
index 0000000..04e3c9b
--- /dev/null
+++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/TransportSocketProcessor.java
@@ -0,0 +1,163 @@
+/*
+ * TransportSocketProcessor.java February 2007
+ *
+ * Copyright (C) 2007, Niall Gallagher <niallg@users.sf.net>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.simpleframework.transport;
+
+import java.io.IOException;
+
+import org.simpleframework.common.thread.ConcurrentExecutor;
+import org.simpleframework.common.thread.Daemon;
+import org.simpleframework.transport.reactor.ExecutorReactor;
+import org.simpleframework.transport.reactor.Operation;
+import org.simpleframework.transport.reactor.Reactor;
+
+/**
+ * The <code>TransportSocketProcessor</code> is used to convert sockets
+ * to transports. This acts as an adapter to a transport processor
+ * which converts a connected socket to a <code>Transport</code> that
+ * can be used to read and write data. Depending on whether there is
+ * an <code>SSLEngine</code> associated with the socket or not, there
+ * could be an SSL handshake performed.
+ *
+ * @author Niall Gallagher
+ */
+public class TransportSocketProcessor implements SocketProcessor {
+
+ /**
+ * This is the executor used to execute the I/O operations.
+ */
+ private final ConcurrentExecutor executor;
+
+ /**
+ * This is the factory used to create the required operations.
+ */
+ private final OperationFactory factory;
+
+ /**
+ * This is the processor used to process transport objects.
+ */
+ private final Reactor reactor;
+
+ /**
+ * This is used to clean the internals of the processor.
+ */
+ private final Daemon cleaner;
+
+ /**
+ * Constructor for the <code>TransportSocketProcessor</code> object.
+ * The transport processor is used to process plain connections
+ * and wrap those connections in a <code>Transport</code> that
+ * can be used to send and receive data to and from.
+ *
+ * @param processor this is used to process transports
+ */
+ public TransportSocketProcessor(TransportProcessor processor) throws IOException {
+ this(processor, 8);
+ }
+
+ /**
+ * Constructor for the <code>TransportSocketProcessor</code> object.
+ * The transport processor is used to process plain connections
+ * and wrap those connections in a <code>Transport</code> that
+ * can be used to send and receive data to and from.
+ *
+ * @param processor this is used to process transports
+ * @param threads this is the number of threads this will use
+ */
+ public TransportSocketProcessor(TransportProcessor processor, int threads) throws IOException {
+ this(processor, threads, 4096);
+ }
+
+ /**
+ * Constructor for the <code>TransportSocketProcessor</code> object.
+ * The transport processor is used to process plain connections
+ * and wrap those connections in a <code>Transport</code> that
+ * can be used to send and receive data to and from.
+ *
+ * @param processor this is used to process transports
+ * @param threads this is the number of threads this will use
+ * @param buffer this is the initial size of the output buffer
+ */
+ public TransportSocketProcessor(TransportProcessor processor, int threads, int buffer) throws IOException {
+ this(processor, threads, buffer, 20480);
+ }
+
+ /**
+ * Constructor for the <code>TransportSocketProcessor</code> object.
+ * The transport processor is used to process plain connections
+ * and wrap those connections in a <code>Transport</code> that
+ * can be used to send and receive data to and from.
+ *
+ * @param processor this is used to process transports
+ * @param threads this is the number of threads this will use
+ * @param buffer this is the initial size of the output buffer
+ * @param threshold this is the maximum size of the output buffer
+ */
+ public TransportSocketProcessor(TransportProcessor processor, int threads, int buffer, int threshold) throws IOException {
+ this(processor, threads, buffer, threshold, false);
+ }
+
+ /**
+ * Constructor for the <code>TransportSocketProcessor</code> object.
+ * The transport processor is used to process plain connections
+ * and wrap those connections in a <code>Transport</code> that
+ * can be used to send and receive data to and from.
+ *
+ * @param processor this is used to process transports
+ * @param threads this is the number of threads this will use
+ * @param buffer this is the initial size of the output buffer
+ * @param threshold this is the maximum size of the output buffer
+ * @param client determines if the SSL handshake is for a client
+ */
+ public TransportSocketProcessor(TransportProcessor processor, int threads, int buffer, int threshold, boolean client) throws IOException {
+ this.executor = new ConcurrentExecutor(Operation.class, threads);
+ this.reactor = new ExecutorReactor(executor);
+ this.factory = new OperationFactory(processor, reactor, buffer, threshold, client);
+ this.cleaner = new ServerCleaner(processor, executor, reactor);
+ }
+
+ /**
+ * Used to connect the <code>Socket</code> which is a full duplex
+ * TCP connection to a higher layer the application. It is this
+ * layer that is responsible for interpreting a protocol or handling
+ * messages in some manner. In the case of HTTP this will initiate
+ * the consumption of a HTTP request after any SSL handshake is
+ * finished if the connection is secure.
+ *
+ * @param socket this is the connected HTTP pipeline to process
+ */
+ public void process(Socket socket) throws IOException {
+ Operation task = factory.getInstance(socket);
+
+ if(task != null) {
+ reactor.process(task);
+ }
+ }
+
+ /**
+ * This is implemented to shut down the server asynchronously. It
+ * will start a process to perform the shutdown. Asynchronous
+ * shutdown allows a server resource executed via a HTTP request
+ * can stop the server without any danger of killing itself or
+ * even worse causing a deadlock.
+ */
+ public void stop() throws IOException {
+ cleaner.start();
+ executor.stop();
+ }
+ }
diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/TransportWriter.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/TransportWriter.java
new file mode 100644
index 0000000..c6eb436
--- /dev/null
+++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/TransportWriter.java
@@ -0,0 +1,150 @@
+/*
+ * TransportWriter.java February 2007
+ *
+ * Copyright (C) 2001, Niall Gallagher <niallg@users.sf.net>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.simpleframework.transport;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * The <code>TransportWriter</code> object is used to write bytes to
+ * and underlying transport. This is essentially an adapter between
+ * an <code>OutputStream</code> and the underlying transport. Each
+ * byte array segment written to the underlying transport is wrapped
+ * in a bytes buffer so that it can be sent by the transport layer.
+ *
+ * @author Niall Gallagher
+ *
+ * @see org.simpleframework.transport.Transport
+ */
+public class TransportWriter implements ByteWriter {
+
+ /**
+ * This is used to determine if the transport has been closed.
+ */
+ private final AtomicBoolean closed;
+
+ /**
+ * This is the underlying transport to write the bytes to.
+ */
+ private final Transport transport;
+
+ /**
+ * Constructor for the <code>TransportWriter</code> object. This
+ * is used to create an adapter for the transport such that a
+ * byte array can be used to write bytes to the array.
+ *
+ * @param transport the underlying transport to write bytes to
+ */
+ public TransportWriter(Transport transport) {
+ this.closed = new AtomicBoolean();
+ this.transport = transport;
+ }
+
+ /**
+ * This method is used to deliver the provided array of bytes to
+ * the underlying transport. Depending on the connection type the
+ * array may be encoded for SSL transport or write directly. Any
+ * implementation may choose to buffer the bytes for performance.
+ *
+ * @param array this is the array of bytes to write to the client
+ */
+ public void write(byte[] array) throws IOException {
+ write(array, 0, array.length);
+ }
+
+ /**
+ * This method is used to deliver the provided array of bytes to
+ * the underlying transport. Depending on the connection type the
+ * array may be encoded for SSL transport or write directly. Any
+ * implementation may choose to buffer the bytes for performance.
+ *
+ * @param array this is the array of bytes to write to the client
+ * @param off this is the offset within the array to write from
+ * @param len this is the number of bytes that are to be sent
+ */
+ public void write(byte[] array, int off, int len) throws IOException {
+ ByteBuffer buffer = ByteBuffer.wrap(array, off, len);
+
+ if(len > 0) {
+ write(buffer);
+ }
+ }
+
+ /**
+ * This method is used to deliver the provided buffer of bytes to
+ * the underlying transport. Depending on the connection type the
+ * array may be encoded for SSL transport or write directly. Any
+ * implementation may choose to buffer the bytes for performance.
+ *
+ * @param buffer this is the buffer of bytes to write to the client
+ */
+ public void write(ByteBuffer buffer) throws IOException {
+ int mark = buffer.position();
+ int size = buffer.limit();
+
+ if(mark > size) {
+ throw new IOException("Buffer position greater than limit");
+ }
+ write(buffer, 0, size - mark);
+ }
+
+ /**
+ * This method is used to deliver the provided buffer of bytes to
+ * the underlying transport. Depending on the connection type the
+ * array may be encoded for SSL transport or write directly. Any
+ * implementation may choose to buffer the bytes for performance.
+ *
+ * @param buffer this is the buffer of bytes to write to the client
+ * @param off this is the offset within the buffer to write from
+ * @param len this is the number of bytes that are to be sent
+ */
+ public void write(ByteBuffer buffer, int off, int len) throws IOException {
+ int mark = buffer.position();
+ int limit = buffer.limit();
+
+ if(limit - mark > len) {
+ buffer.limit(mark + len); // reduce usable size
+ }
+ transport.write(buffer);
+ buffer.limit(limit);
+ }
+
+ /**
+ * This method is used to flush the contents of the buffer to
+ * the client. This method will block until such time as all of
+ * the data has been sent to the client. If at any point there
+ * is an error writing the content an exception is thrown.
+ */
+ public void flush() throws IOException {
+ transport.flush();
+ }
+
+ /**
+ * This is used to close the writer and the underlying transport.
+ * If a close is performed on the writer then no more bytes can
+ * be read from or written to the transport and the client will
+ * received a connection close on their side.
+ */
+ public void close() throws IOException {
+ if(!closed.getAndSet(true)) {
+ transport.close();
+ }
+ }
+} \ No newline at end of file
diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/connect/Connection.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/connect/Connection.java
new file mode 100644
index 0000000..490ce6d
--- /dev/null
+++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/connect/Connection.java
@@ -0,0 +1,73 @@
+/*
+ * Connection.java October 2002
+ *
+ * Copyright (C) 2002, Niall Gallagher <niallg@users.sf.net>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.simpleframework.transport.connect;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.SocketAddress;
+
+import javax.net.ssl.SSLContext;
+
+/**
+ * The <code>Connection</code> object is used to manage connections
+ * from a server socket. In order to achieve this it spawns a task
+ * to listen for incoming connect requests. When a TCP connection
+ * request arrives it hands off the <code>SocketChannel</code> to
+ * the <code>SocketProcessor</code> which processes the request.
+ * <p>
+ * This handles connections from a <code>ServerSocketChannel</code>
+ * object so that features such as SSL can be used by a server that
+ * uses this package. The background acceptor process will terminate
+ * if the connection is closed.
+ *
+ * @author Niall Gallagher
+ *
+ * @see org.simpleframework.transport.SocketProcessor
+ */
+public interface Connection extends Closeable {
+
+ /**
+ * This creates a new background task that will listen to the
+ * specified <code>ServerAddress</code> for incoming TCP connect
+ * requests. When an connection is accepted it is handed to the
+ * internal <code>Server</code> implementation as a pipeline. The
+ * background task is a non daemon task to ensure the server is
+ * kept active, to terminate the connection this can be closed.
+ *
+ * @param address this is the address used to accept connections
+ *
+ * @return this returns the actual local address that is used
+ */
+ SocketAddress connect(SocketAddress address) throws IOException;
+
+ /**
+ * This creates a new background task that will listen to the
+ * specified <code>ServerAddress</code> for incoming TCP connect
+ * requests. When an connection is accepted it is handed to the
+ * internal <code>Server</code> implementation as a pipeline. The
+ * background task is a non daemon task to ensure the server is
+ * kept active, to terminate the connection this can be closed.
+ *
+ * @param address this is the address used to accept connections
+ * @param context this is used for secure SSL connections
+ *
+ * @return this returns the actual local address that is used
+ */
+ SocketAddress connect(SocketAddress address, SSLContext context) throws IOException;
+}
diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/connect/ConnectionEvent.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/connect/ConnectionEvent.java
new file mode 100644
index 0000000..0a29ba8
--- /dev/null
+++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/connect/ConnectionEvent.java
@@ -0,0 +1,42 @@
+/*
+ * ConnectionEvent.java October 2012
+ *
+ * Copyright (C) 2007, Niall Gallagher <niallg@users.sf.net>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.simpleframework.transport.connect;
+
+/**
+ * The <code>ConnectionEvent</code> enum represents various events that
+ * can occur with a new connection. When a new connection is accepted
+ * then the accept event is dispatched to a <code>Trace</code> object
+ * if one has been associated with the connection.
+ *
+ * @author Niall Gallagher
+ *
+ * @see org.simpleframework.transport.trace.Trace
+ */
+public enum ConnectionEvent {
+
+ /**
+ * This event occurs when the server accepts a new connection.
+ */
+ ACCEPT,
+
+ /**
+ * This event occurs when there is an error with the connection.
+ */
+ ERROR
+}
diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/connect/ConnectionException.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/connect/ConnectionException.java
new file mode 100644
index 0000000..87e5196
--- /dev/null
+++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/connect/ConnectionException.java
@@ -0,0 +1,58 @@
+/*
+ * ConnectionException.java February 2007
+ *
+ * Copyright (C) 2007, Niall Gallagher <niallg@users.sf.net>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.simpleframework.transport.connect;
+
+import java.io.IOException;
+
+/**
+ * The <code>ConnectionException</code> is thrown if there is a problem
+ * establishing a connection to the server. Such a problem can occur
+ * if the server has been stopped when a new connection arrives. This
+ * can also be thrown if some other connection related issue occurs.
+ *
+ * @author Niall Gallagher
+ */
+class ConnectionException extends IOException {
+
+ /**
+ * Constructor for the <code>ConnectionException</code> object. This
+ * is used to represent an exception that is thrown when an error
+ * occurs during the connect process. Typically this is thrown if
+ * there is a problem connecting or accepting from a socket.
+ *
+ * @param message this is the message describing the exception
+ */
+ public ConnectionException(String message) {
+ super(message);
+ }
+
+ /**
+ * Constructor for the <code>ConnectionException</code> object. This
+ * is used to represent an exception that is thrown when an error
+ * occurs during the connect process. Typically this is thrown if
+ * there is a problem connecting or accepting from a socket.
+ *
+ * @param message this is the message describing the exception
+ * @param cause this is the cause of the producer exception
+ */
+ public ConnectionException(String message, Throwable cause) {
+ super(message);
+ initCause(cause);
+ }
+}
diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/connect/SocketAcceptor.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/connect/SocketAcceptor.java
new file mode 100644
index 0000000..ca6fc92
--- /dev/null
+++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/connect/SocketAcceptor.java
@@ -0,0 +1,315 @@
+/*
+ * Acceptor.java October 2002
+ *
+ * Copyright (C) 2002, Niall Gallagher <niallg@users.sf.net>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.simpleframework.transport.connect;
+
+import static org.simpleframework.transport.connect.ConnectionEvent.ACCEPT;
+import static org.simpleframework.transport.connect.ConnectionEvent.ERROR;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.net.SocketAddress;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+
+import org.simpleframework.transport.SocketProcessor;
+import org.simpleframework.transport.Socket;
+import org.simpleframework.transport.SocketWrapper;
+import org.simpleframework.transport.reactor.Operation;
+import org.simpleframework.transport.trace.Trace;
+import org.simpleframework.transport.trace.TraceAnalyzer;
+
+/**
+ * The <code>SocketAcceptor</code> object is used to accept incoming
+ * TCP connections from a specified socket address. This is used by
+ * the <code>Connection</code> object as a background process to
+ * accept the connections and hand them to a socket connector.
+ * <p>
+ * This is capable of processing SSL connections created by the
+ * internal server socket. All SSL connections are forced to finish
+ * the SSL handshake before being dispatched to the server. This
+ * ensures that there are no problems with reading the request.
+ *
+ * @author Niall Gallagher
+ *
+ * @see org.simpleframework.transport.connect.SocketConnection
+ */
+class SocketAcceptor implements Operation {
+
+ /**
+ * This is the server socket channel used to accept connections.
+ */
+ private final ServerSocketChannel listener;
+
+ /**
+ * The handler that manages the incoming TCP connections.
+ */
+ private final SocketProcessor processor;
+
+ /**
+ * This is the server socket to bind the socket address to.
+ */
+ private final ServerSocket socket;
+
+ /**
+ * If provided the SSL context is used to create SSL engines.
+ */
+ private final SSLContext context;
+
+ /**
+ * This is the tracing analyzer used to trace accepted sockets.
+ */
+ private final TraceAnalyzer analyzer;
+
+ /**
+ * This is the local address to bind the listen socket to.
+ */
+ private final SocketAddress address;
+
+ /**
+ * This is used to collect trace events with the acceptor.
+ */
+ private final Trace trace;
+
+ /**
+ * Constructor for the <code>SocketAcceptor</code> object. This
+ * accepts new TCP connections from the specified server socket.
+ * Each of the connections that is accepted is configured for
+ * performance for the application.
+ *
+ * @param address this is the address to accept connections from
+ * @param processor this is used to initiate the HTTP processing
+ * @param analyzer this is the tracing analyzer to be used
+ */
+ public SocketAcceptor(SocketAddress address, SocketProcessor processor, TraceAnalyzer analyzer) throws IOException {
+ this(address, processor, analyzer, null);
+ }
+
+ /**
+ * Constructor for the <code>SocketAcceptor</code> object. This
+ * accepts new TCP connections from the specified server socket.
+ * Each of the connections that is accepted is configured for
+ * performance for the applications.
+ *
+ * @param address this is the address to accept connections from
+ * @param processor this is used to initiate the HTTP processing
+ * @param analyzer this is the tracing analyzer to be used
+ * @param context this is the SSL context used for secure HTTPS
+ */
+ public SocketAcceptor(SocketAddress address, SocketProcessor processor, TraceAnalyzer analyzer, SSLContext context) throws IOException {
+ this.listener = ServerSocketChannel.open();
+ this.trace = analyzer.attach(listener);
+ this.socket = listener.socket();
+ this.context = context;
+ this.analyzer = analyzer;
+ this.processor = processor;
+ this.address = address;
+ }
+
+ /**
+ * This is used to acquire the local socket address that this is
+ * listening to. This required in case the socket address that
+ * is specified is an emphemeral address, that is an address that
+ * is assigned dynamically when a port of 0 is specified.
+ *
+ * @return this returns the address for the listening address
+ */
+ public SocketAddress getAddress() {
+ return socket.getLocalSocketAddress();
+ }
+
+ /**
+ * This is used to acquire the trace object that is associated
+ * with the operation. A trace object is used to collection details
+ * on what operations are being performed. For instance it may
+ * contain information relating to I/O events or errors.
+ *
+ * @return this returns the trace associated with this operation
+ */
+ public Trace getTrace() {
+ return trace;
+ }
+
+ /**
+ * This is the <code>SelectableChannel</code> which is used to
+ * determine if the operation should be executed. If the channel
+ * is ready for a given I/O event it can be run. For instance if
+ * the operation is used to perform some form of read operation
+ * it can be executed when ready to read data from the channel.
+ *
+ * @return this returns the channel used to govern execution
+ */
+ public SelectableChannel getChannel() {
+ return listener;
+ }
+
+ /**
+ * This is used to configure the server socket for non-blocking
+ * mode. It will also bind the server socket to the socket port
+ * specified in the <code>SocketAddress</code> object. Once done
+ * the acceptor is ready to accept newly arriving connections.
+ *
+ * @param address this is the server socket address to bind to
+ */
+ public void bind() throws IOException {
+ listener.configureBlocking(false);
+ socket.setReuseAddress(true);
+ socket.bind(address, 100);
+ }
+
+ /**
+ * This is used to accept a new TCP connections. When the socket
+ * is ready to accept a connection this method is invoked. It will
+ * then create a HTTP pipeline object using the accepted socket
+ * and if provided with an <code>SSLContext</code> it will also
+ * provide an <code>SSLEngine</code> which is handed to the
+ * processor to handle the HTTP requests.
+ */
+ public void run() {
+ try {
+ accept();
+ } catch(Exception cause) {
+ pause();
+ }
+ }
+
+ /**
+ * This is used to throttle the acceptor when there is an error
+ * such as exhaustion of file descriptors. This will prevent the
+ * CPU from being hogged by the acceptor on such occasions. If
+ * the thread can not be put to sleep then this will freeze.
+ */
+ private void pause() {
+ try {
+ Thread.sleep(10);
+ } catch(Exception cause) {
+ trace.trace(ERROR, cause);
+ }
+ }
+
+ /**
+ * This is used to cancel the operation if the reactor decides to
+ * reject it for some reason. Typically this method will never be
+ * invoked as this operation never times out. However, should the
+ * reactor cancel the operation this will close the socket.
+ */
+ public void cancel() {
+ try {
+ close();
+ } catch(Throwable cause) {
+ trace.trace(ERROR, cause);
+ }
+ }
+
+ /**
+ * The main processing done by this object is done using a thread
+ * calling the <code>run</code> method. Here the TCP connections
+ * are accepted from the <code>ServerSocketChannel</code> which
+ * creates the socket objects. Each socket is then encapsulated in
+ * to a pipeline and dispatched to the processor for processing.
+ *
+ * @throws IOException if there is a problem accepting the socket
+ */
+ private void accept() throws IOException {
+ SocketChannel channel = listener.accept();
+
+ while(channel != null) {
+ Trace trace = analyzer.attach(channel);
+
+ configure(channel);
+
+ if(context == null) {
+ process(channel, trace, null);
+ } else {
+ process(channel, trace);
+ }
+ channel = listener.accept();
+ }
+ }
+
+ /**
+ * This method is used to configure the accepted channel. This
+ * will disable Nagles algorithm to improve the performance of the
+ * channel, also this will ensure the accepted channel disables
+ * blocking to ensure that it works within the processor object.
+ *
+ * @param channel this is the channel that is to be configured
+ */
+ private void configure(SocketChannel channel) throws IOException {
+ channel.socket().setTcpNoDelay(true);
+ channel.configureBlocking(false);
+ }
+
+ /**
+ * This method is used to dispatch the socket for processing. The
+ * socket will be configured and connected to the client, this
+ * will hand processing to the <code>Server</code> which will
+ * create the pipeline instance used to wrap the socket object.
+ *
+ * @param channel this is the connected socket to be processed
+ * @param trace this is the trace to associate with the socket
+ */
+ private void process(SocketChannel channel, Trace trace) throws IOException {
+ SSLEngine engine = context.createSSLEngine();
+
+ try {
+ process(channel, trace, engine);
+ } catch(Exception cause) {
+ trace.trace(ERROR, cause);
+ channel.close();
+ }
+ }
+
+ /**
+ * This method is used to dispatch the socket for processing. The
+ * socket will be configured and connected to the client, this
+ * will hand processing to the <code>Server</code> which will
+ * create the pipeline instance used to wrap the socket object.
+ *
+ * @param channel this is the connected socket to be processed
+ * @param trace this is the trace to associate with the socket
+ * @param engine this is the SSL engine used for secure HTTPS
+ */
+ private void process(SocketChannel channel, Trace trace, SSLEngine engine) throws IOException {
+ Socket socket = new SocketWrapper(channel, trace, engine);
+
+ try {
+ trace.trace(ACCEPT);
+ processor.process(socket);
+ } catch(Exception cause) {
+ trace.trace(ERROR, cause);
+ channel.close();
+ }
+ }
+
+ /**
+ * This is used to close the server socket channel so that the
+ * port that it is bound to is released. This allows the acceptor
+ * to close off the interface to the server. Ensuring the socket
+ * is closed allows it to be recreated at a later point.
+ *
+ * @throws IOException thrown if the socket can not be closed
+ */
+ public void close() throws IOException {
+ listener.close();
+ }
+}
diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/connect/SocketAnalyzer.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/connect/SocketAnalyzer.java
new file mode 100644
index 0000000..be6b95c
--- /dev/null
+++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/connect/SocketAnalyzer.java
@@ -0,0 +1,84 @@
+/*
+ * SocketAnalyzer.java February 2012
+ *
+ * Copyright (C) 2007, Niall Gallagher <niallg@users.sf.net>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.simpleframework.transport.connect;
+
+import java.nio.channels.SelectableChannel;
+
+import org.simpleframework.transport.trace.TraceAnalyzer;
+import org.simpleframework.transport.trace.Trace;
+
+/**
+ * The <code>SocketAnalyzer</code> is used to wrap an analyzer object.
+ * Wrapping an analyzer in this way ensures that even if the analyzer
+ * is badly written there is little chance that it will affect the
+ * operation of the server. All <code>Trace</code> objects returned
+ * from this will catch all exceptions within the created trace.
+ *
+ * @author Niall Gallagher
+ */
+class SocketAnalyzer implements TraceAnalyzer {
+
+ /**
+ * This is the analyzer that is used to create the trace objects.
+ */
+ private final TraceAnalyzer analyzer;
+
+ /**
+ * Constructor for the <code>SocketAnalyzer</code> object. This will
+ * be given the analyzer that is to be used to create traces. This
+ * can be a null value, in which case the trace provided will be
+ * a simple empty void that swallows all trace events.
+ *
+ * @param analyzer the analyzer that is to be wrapped by this
+ */
+ public SocketAnalyzer(TraceAnalyzer analyzer) {
+ this.analyzer = analyzer;
+ }
+
+ /**
+ * This method is used to attach a trace to the specified channel.
+ * Attaching a trace basically means associating events from that
+ * trace with the specified socket. It ensures that the events
+ * from a specific channel can be observed in isolation.
+ *
+ * @param channel this is the channel to associate with the trace
+ *
+ * @return this returns a trace associated with the channel
+ */
+ public Trace attach(SelectableChannel channel) {
+ Trace trace = null;
+
+ if(analyzer != null) {
+ trace = analyzer.attach(channel);
+ }
+ return new SocketTrace(trace);
+ }
+
+ /**
+ * This is used to stop the analyzer and clear all trace information.
+ * Stopping the analyzer is typically done when the server is stopped
+ * and is used to free any resources associated with the analyzer. If
+ * an analyzer does not hold information this method can be ignored.
+ */
+ public void stop() {
+ if(analyzer != null) {
+ analyzer.stop();
+ }
+ }
+} \ No newline at end of file
diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/connect/SocketConnection.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/connect/SocketConnection.java
new file mode 100644
index 0000000..c462284
--- /dev/null
+++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/connect/SocketConnection.java
@@ -0,0 +1,141 @@
+/*
+ * SocketConnection.java October 2002
+ *
+ * Copyright (C) 2002, Niall Gallagher <niallg@users.sf.net>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.simpleframework.transport.connect;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+
+import javax.net.ssl.SSLContext;
+
+import org.simpleframework.transport.SocketProcessor;
+import org.simpleframework.transport.trace.TraceAnalyzer;
+
+/**
+ * The <code>SocketConnection</code>is used to manage connections
+ * from a server socket. In order to achieve this it spawns a task
+ * to listen for incoming connect requests. When a TCP connection
+ * request arrives it hands off the <code>SocketChannel</code> to
+ * the <code>SocketProcessor</code> which processes the request.
+ * <p>
+ * This handles connections from a <code>ServerSocketChannel</code>
+ * object so that features such as SSL can be used by a server that
+ * uses this package. The background acceptor process will terminate
+ * if the connection is closed.
+ *
+ * @author Niall Gallagher
+ *
+ * @see org.simpleframework.transport.SocketProcessor
+ */
+public class SocketConnection implements Connection {
+
+ /**
+ * This is used to maintain the active connection end points.
+ */
+ private SocketListenerManager manager;
+
+ /**
+ * The processor is used to process connected HTTP pipelines.
+ */
+ private SocketProcessor processor;
+
+ /**
+ * This is used to determine if the connection has been closed.
+ */
+ private boolean closed;
+
+ /**
+ * Constructor for the <code>SocketConnection</code> object. This
+ * will create a new connection that accepts incoming connections
+ * and hands these connections as <code>Socket</code> objects
+ * to the specified connector. This in turn will deliver request
+ * and response objects to the internal container.
+ *
+ * @param processor this is the connector that receives requests
+ */
+ public SocketConnection(SocketProcessor processor) throws IOException {
+ this(processor, null);
+ }
+
+ /**
+ * Constructor for the <code>SocketConnection</code> object. This
+ * will create a new connection that accepts incoming connections
+ * and hands these connections as <code>Socket</code> objects
+ * to the specified processor. This in turn will deliver request
+ * and response objects to the internal container.
+ *
+ * @param processor this is the connector that receives requests
+ * @param analyzer this is used to create a trace for the socket
+ */
+ public SocketConnection(SocketProcessor processor, TraceAnalyzer analyzer) throws IOException {
+ this.manager = new SocketListenerManager(processor, analyzer);
+ this.processor = processor;
+ }
+
+ /**
+ * This creates a new background task that will listen to the
+ * specified <code>ServerAddress</code> for incoming TCP connect
+ * requests. When an connection is accepted it is handed to the
+ * internal socket connector.
+ *
+ * @param address this is the address used to accept connections
+ *
+ * @return this returns the actual local address that is used
+ */
+ public SocketAddress connect(SocketAddress address) throws IOException {
+ if(closed) {
+ throw new ConnectionException("Connection is closed");
+ }
+ return manager.listen(address);
+ }
+
+ /**
+ * This creates a new background task that will listen to the
+ * specified <code>ServerAddress</code> for incoming TCP connect
+ * requests. When an connection is accepted it is handed to the
+ * internal socket connector.
+ *
+ * @param address this is the address used to accept connections
+ * @param context this is used for secure SSL connections
+ *
+ * @return this returns the actual local address that is used
+ */
+ public SocketAddress connect(SocketAddress address, SSLContext context) throws IOException {
+ if(closed) {
+ throw new ConnectionException("Connection is closed");
+ }
+ return manager.listen(address, context);
+ }
+
+ /**
+ * This is used to close the connection and the server socket
+ * used to accept connections. This will perform a close of all
+ * connected server sockets that have been created from using
+ * the <code>connect</code> method. The connection can be
+ * reused after the existing server sockets have been closed.
+ *
+ * @throws IOException thrown if there is a problem closing
+ */
+ public void close() throws IOException {
+ if(!closed) {
+ manager.close();
+ processor.stop();
+ }
+ closed = true;
+ }
+} \ No newline at end of file
diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/connect/SocketListener.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/connect/SocketListener.java
new file mode 100644
index 0000000..5718273
--- /dev/null
+++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/connect/SocketListener.java
@@ -0,0 +1,125 @@
+/*
+ * SocketListener.java October 2002
+ *
+ * Copyright (C) 2002, Niall Gallagher <niallg@users.sf.net>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.simpleframework.transport.connect;
+
+import static java.nio.channels.SelectionKey.OP_ACCEPT;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.SocketAddress;
+
+import javax.net.ssl.SSLContext;
+
+import org.simpleframework.transport.SocketProcessor;
+import org.simpleframework.transport.reactor.SynchronousReactor;
+import org.simpleframework.transport.reactor.Reactor;
+import org.simpleframework.transport.trace.TraceAnalyzer;
+
+/**
+ * The <code>SocketListener</code> object is represents the interface
+ * to the server that the clients can connect to. This is responsible
+ * for making call backs to the <code>SocketAcceptor</code> when there
+ * is a new connection waiting to be accepted. When the connection
+ * is to be closed the interface object can be closed.
+ *
+ * @author Niall Gallagher
+ */
+class SocketListener implements Closeable {
+
+ /**
+ * This is the acceptor that is used to accept the connections.
+ */
+ private final SocketAcceptor acceptor;
+
+ /**
+ * This is the reactor used to notify the acceptor of sockets.
+ */
+ private final Reactor reactor;
+
+ /**
+ * Constructor for the <code>SocketListener</code> object. This
+ * needs a socket address and a processor to hand created sockets
+ * to. This creates a <code>Reactor</code> which will notify the
+ * acceptor when there is a new connection waiting to be accepted.
+ *
+ * @param address this is the address to listen for new sockets
+ * @param processor this is the processor that sockets are handed to
+ * @param analyzer this is used to create a trace to monitor events
+ */
+ public SocketListener(SocketAddress address, SocketProcessor processor, TraceAnalyzer analyzer) throws IOException {
+ this(address, processor, analyzer, null);
+ }
+
+ /**
+ * Constructor for the <code>SocketListener</code> object. This
+ * needs a socket address and a processor to hand created sockets
+ * to. This creates a <code>Reactor</code> which will notify the
+ * acceptor when there is a new connection waiting to be accepted.
+ *
+ * @param address this is the address to listen for new sockets
+ * @param processor this is the processor that sockets are handed to
+ * @param analyzer this is used to create a trace to monitor events
+ * @param context this is the SSL context used for secure HTTPS
+ */
+ public SocketListener(SocketAddress address, SocketProcessor processor, TraceAnalyzer analyzer, SSLContext context) throws IOException {
+ this.acceptor = new SocketAcceptor(address, processor, analyzer, context);
+ this.reactor = new SynchronousReactor();
+ }
+
+ /**
+ * This is used to acquire the local socket address that this is
+ * listening to. This required in case the socket address that
+ * is specified is an emphemeral address, that is an address that
+ * is assigned dynamically when a port of 0 is specified.
+ *
+ * @return this returns the address for the listening address
+ */
+ public SocketAddress getAddress() {
+ return acceptor.getAddress();
+ }
+
+ /**
+ * This is used to register the socket acceptor to listen for
+ * new connections that are ready to be accepted. Once this is
+ * registered it will remain registered until the interface is
+ * closed, at which point the socket is closed.
+ */
+ public void process() throws IOException {
+ try {
+ acceptor.bind();
+ reactor.process(acceptor, OP_ACCEPT);
+ } catch(Exception cause) {
+ throw new ConnectionException("Listen error", cause);
+ }
+ }
+
+ /**
+ * This is used to close the connection and the server socket
+ * used to accept connections. This will perform a close of the
+ * connected server socket and the dispatching thread.
+ */
+ public void close() throws IOException {
+ try {
+ acceptor.close();
+ reactor.stop();
+ } catch(Exception cause) {
+ throw new ConnectionException("Close error", cause);
+ }
+ }
+}
diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/connect/SocketListenerManager.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/connect/SocketListenerManager.java
new file mode 100644
index 0000000..b0330f1
--- /dev/null
+++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/connect/SocketListenerManager.java
@@ -0,0 +1,127 @@
+/*
+ * SocketListenerManager.java February 2007
+ *
+ * Copyright (C) 2007, Niall Gallagher <niallg@users.sf.net>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.simpleframework.transport.connect;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+import javax.net.ssl.SSLContext;
+
+import org.simpleframework.transport.SocketProcessor;
+import org.simpleframework.transport.trace.TraceAnalyzer;
+
+/**
+ * The <code>SocketListenerManager</code> contains all the listeners
+ * that have been created for a connection. This set is used to hold
+ * and manage the listeners that have been created for a connection.
+ * All listeners will be closed if the listener manager is closed.
+ * This ensures all resources held by the manager can be released.
+ *
+ * @author Niall Gallagher
+ *
+ * @see org.simpleframework.transport.connect.SocketConnection
+ */
+class SocketListenerManager implements Closeable {
+
+ /**
+ * This is the set of active socket listeners for this manager.
+ */
+ private final Set<SocketListener> listeners;
+
+ /**
+ * This is the processor that listeners will dispatch sockets to.
+ */
+ private final SocketProcessor processor;
+
+ /**
+ * This is the analyzer used to create a trace for the sockets.
+ */
+ private final TraceAnalyzer analyzer;
+
+ /**
+ * Constructor for the <code>SocketListenerManager</code> object.
+ * This is used to create a manager that will enable listeners to
+ * be created to listen to specified sockets for incoming TCP
+ * connections, which will be converted to socket objects.
+ *
+ * @param processor this is the processor to hand sockets to
+ * @param analyzer this is the agent used to trace socket events
+ */
+ public SocketListenerManager(SocketProcessor processor, TraceAnalyzer analyzer) {
+ this.listeners = new CopyOnWriteArraySet<SocketListener>();
+ this.analyzer = new SocketAnalyzer(analyzer);
+ this.processor = processor;
+ }
+
+ /**
+ * This creates a new background task that will listen to the
+ * specified <code>ServerAddress</code> for incoming TCP connect
+ * requests. When an connection is accepted it is handed to the
+ * internal socket connector.
+ *
+ * @param address this is the address used to accept connections
+ *
+ * @return this returns the actual local address that is used
+ */
+ public SocketAddress listen(SocketAddress address) throws IOException {
+ return listen(address, null);
+ }
+
+ /**
+ * This creates a new background task that will listen to the
+ * specified <code>ServerAddress</code> for incoming TCP connect
+ * requests. When an connection is accepted it is handed to the
+ * internal socket connector.
+ *
+ * @param address this is the address used to accept connections
+ * @param context this is used for secure SSL connections
+ *
+ * @return this returns the actual local address that is used
+ */
+ public SocketAddress listen(SocketAddress address, SSLContext context) throws IOException {
+ SocketListener listener = new SocketListener(address, processor, analyzer, context);
+
+ if(processor != null) {
+ listener.process();
+ listeners.add(listener);
+ }
+ return listener.getAddress();
+ }
+
+ /**
+ * This is used to close all the listeners that have been
+ * added to the connection. Closing all the listeners in the
+ * set ensures that there are no lingering threads or sockets
+ * consumed by the connection after the connection is closed.
+ *
+ * @throws IOException thrown if there is an error closing
+ */
+ public void close() throws IOException {
+ for(Closeable listener : listeners) {
+ listener.close();
+ }
+ if(analyzer != null) {
+ analyzer.stop();
+ }
+ listeners.clear();
+ }
+} \ No newline at end of file
diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/connect/SocketTrace.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/connect/SocketTrace.java
new file mode 100644
index 0000000..1805533
--- /dev/null
+++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/connect/SocketTrace.java
@@ -0,0 +1,75 @@
+/*
+ * SocketTrace.java February 2012
+ *
+ * Copyright (C) 2007, Niall Gallagher <niallg@users.sf.net>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.simpleframework.transport.connect;
+
+import org.simpleframework.transport.trace.Trace;
+
+/**
+ * The <code>SocketTrace</code> is used to wrap an trace for safety.
+ * Wrapping an trace in this way ensures that even if the trace is
+ * badly written there is little chance that it will affect the
+ * operation of the server.
+ *
+ * @author Niall Gallagher
+ */
+class SocketTrace implements Trace {
+
+ /**
+ * This is the actual trace that is being wrapped by this.
+ */
+ private final Trace trace;
+
+ /**
+ * Constructor for the <code>SocketTrace</code> object. This will
+ * create a trace object that wraps the one provided. If the
+ * provided trace is null then this will simply ignore all events.
+ *
+ * @param trace this is the trace that is to be wrapped by this
+ */
+ public SocketTrace(Trace trace) {
+ this.trace = trace;
+ }
+
+ /**
+ * This method is used to accept an event that occurred on the socket
+ * associated with this trace. Typically the event is a symbolic
+ * description of the event such as an enum or a string.
+ *
+ * @param event this is the event that occurred on the socket
+ */
+ public void trace(Object event) {
+ if(trace != null) {
+ trace.trace(event);
+ }
+ }
+
+ /**
+ * This method is used to accept an event that occurred on the socket
+ * associated with this trace. Typically the event is a symbolic
+ * description of the event such as an enum or a string.
+ *
+ * @param event this is the event that occurred on the socket
+ * @param value provides additional information such as an exception
+ */
+ public void trace(Object event, Object value) {
+ if(trace != null) {
+ trace.trace(event, value);
+ }
+ }
+} \ No newline at end of file
diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/Action.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/Action.java
new file mode 100644
index 0000000..4989eae
--- /dev/null
+++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/Action.java
@@ -0,0 +1,71 @@
+/*
+ * Action.java February 2007
+ *
+ * Copyright (C) 2007, Niall Gallagher <niallg@users.sf.net>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.simpleframework.transport.reactor;
+
+import java.nio.channels.SelectableChannel;
+
+/**
+ * The <code>Action</code> object is used to represent an action that
+ * the distributor is to process. This contains the operation and
+ * the required I/O events as an integer bit mask. When an operation
+ * is considered ready it will be handed to an executor to execute.
+ *
+ * @author Niall Gallagher
+ */
+interface Action extends Runnable {
+
+ /**
+ * This is used to get the expiry for the operation. The expiry
+ * represents some static time in the future when the action will
+ * expire if it does not become ready. This is used to cancel the
+ * operation so that it does not remain in the distributor.
+ *
+ * @return the remaining time this operation will wait for
+ */
+ long getExpiry();
+
+ /**
+ * This returns the I/O operations that the action is interested
+ * in as an integer bit mask. When any of these operations are
+ * ready the distributor will execute the provided operation.
+ *
+ * @return the integer bit mask of interested I/O operations
+ */
+ int getInterest();
+
+ /**
+ * This is the <code>SelectableChannel</code> which is used to
+ * determine if the operation should be executed. If the channel
+ * is ready for a given I/O event it can be run. For instance if
+ * the operation is used to perform some form of read operation
+ * it can be executed when ready to read data from the channel.
+ *
+ * @return this returns the channel used to govern execution
+ */
+ SelectableChannel getChannel();
+
+ /**
+ * This is used to acquire the <code>Operation</code> that is to
+ * be executed when the required operations are ready. It is the
+ * responsibility of the distributor to invoke the operation.
+ *
+ * @return the operation to be executed when it is ready
+ */
+ Operation getOperation();
+}
diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/ActionDistributor.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/ActionDistributor.java
new file mode 100644
index 0000000..65dc8d2
--- /dev/null
+++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/ActionDistributor.java
@@ -0,0 +1,741 @@
+/*
+ * ActionDistributor.java February 2007
+ *
+ * Copyright (C) 2007, Niall Gallagher <niallg@users.sf.net>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.simpleframework.transport.reactor;
+
+import static java.nio.channels.SelectionKey.OP_READ;
+import static java.nio.channels.SelectionKey.OP_WRITE;
+import static org.simpleframework.transport.reactor.ReactorEvent.CHANNEL_CLOSED;
+import static org.simpleframework.transport.reactor.ReactorEvent.CLOSE_SELECTOR;
+import static org.simpleframework.transport.reactor.ReactorEvent.ERROR;
+import static org.simpleframework.transport.reactor.ReactorEvent.EXECUTE_ACTION;
+import static org.simpleframework.transport.reactor.ReactorEvent.INVALID_KEY;
+import static org.simpleframework.transport.reactor.ReactorEvent.READ_INTEREST_READY;
+import static org.simpleframework.transport.reactor.ReactorEvent.REGISTER_INTEREST;
+import static org.simpleframework.transport.reactor.ReactorEvent.REGISTER_READ_INTEREST;
+import static org.simpleframework.transport.reactor.ReactorEvent.REGISTER_WRITE_INTEREST;
+import static org.simpleframework.transport.reactor.ReactorEvent.SELECT;
+import static org.simpleframework.transport.reactor.ReactorEvent.SELECT_CANCEL;
+import static org.simpleframework.transport.reactor.ReactorEvent.SELECT_EXPIRED;
+import static org.simpleframework.transport.reactor.ReactorEvent.UPDATE_INTEREST;
+import static org.simpleframework.transport.reactor.ReactorEvent.UPDATE_READ_INTEREST;
+import static org.simpleframework.transport.reactor.ReactorEvent.UPDATE_WRITE_INTEREST;
+import static org.simpleframework.transport.reactor.ReactorEvent.WRITE_INTEREST_READY;
+
+import java.io.IOException;
+import java.nio.channels.Channel;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executor;
+
+import org.simpleframework.common.thread.Daemon;
+import org.simpleframework.transport.trace.Trace;
+
+ /**
+ * The <code>ActionDistributor</code> is used to execute operations
+ * that have an interested I/O event ready. This acts much like a
+ * scheduler would in that it delays the execution of the operations
+ * until such time as the associated <code>SelectableChannel</code>
+ * has an interested I/O event ready.
+ * <p>
+ * This distributor has two modes, one mode is used to cancel the
+ * channel once an I/O event has occurred. This means that the channel
+ * is removed from the <code>Selector</code> so that the selector
+ * does not break when asked to select again. cancelling the channel
+ * is useful when the operation execution may not fully read the
+ * payload or when the operation takes a significant amount of time.
+ *
+ * @see org.simpleframework.transport.reactor.ExecutorReactor
+ */
+class ActionDistributor extends Daemon implements OperationDistributor {
+
+ /**
+ * This is used to determine the operations that need cancelling.
+ */
+ private Map<Channel, ActionSet> executing;
+
+ /**
+ * This is used to keep track of actions currently in selection.
+ */
+ private Map<Channel, ActionSet> selecting;
+
+ /**
+ * This is the queue that is used to invalidate channels.
+ */
+ private Queue<Channel> invalid;
+
+ /**
+ * This is the queue that is used to provide the operations.
+ */
+ private Queue<Action> pending;
+
+ /**
+ * This is the selector used to select for interested events.
+ */
+ private ActionSelector selector;
+
+ /**
+ * This is used to execute the operations that are ready.
+ */
+ private Executor executor;
+
+ /**
+ * This is used to signal when the distributor has closed.
+ */
+ private Latch latch;
+
+ /**
+ * This is the duration in milliseconds the operation expires in.
+ */
+ private long expiry;
+
+ /**
+ * This is time in milliseconds when the next expiry will occur.
+ */
+ private long update;
+
+ /**
+ * This is used to determine the mode the distributor uses.
+ */
+ private boolean cancel;
+
+ /**
+ * Constructor for the <code>ActionDistributor</code> object. This
+ * will create a distributor that distributes operations when those
+ * operations show that they are ready for a given I/O event. The
+ * interested I/O events are provided as a bitmask taken from the
+ * actions of the <code>SelectionKey</code>. Distribution of the
+ * operations is passed to the provided executor object.
+ *
+ * @param executor this is the executor used to execute operations
+ */
+ public ActionDistributor(Executor executor) throws IOException {
+ this(executor, true);
+ }
+
+ /**
+ * Constructor for the <code>ActionDistributor</code> object. This
+ * will create a distributor that distributes operations when those
+ * operations show that they are ready for a given I/O event. The
+ * interested I/O events are provided as a bitmask taken from the
+ * actions of the <code>SelectionKey</code>. Distribution of the
+ * operations is passed to the provided executor object.
+ *
+ * @param executor this is the executor used to execute operations
+ * @param cancel should the channel be removed from selection
+ */
+ public ActionDistributor(Executor executor, boolean cancel) throws IOException {
+ this(executor, cancel, 120000);
+ }
+
+ /**
+ * Constructor for the <code>ActionDistributor</code> object. This
+ * will create a distributor that distributes operations when those
+ * operations show that they are ready for a given I/O event. The
+ * interested I/O events are provided as a bitmask taken from the
+ * actions of the <code>SelectionKey</code>. Distribution of the
+ * operations is passed to the provided executor object.
+ *
+ * @param executor this is the executor used to execute operations
+ * @param cancel should the channel be removed from selection
+ * @param expiry this the maximum idle time for an operation
+ */
+ public ActionDistributor(Executor executor, boolean cancel, long expiry) throws IOException {
+ this.selecting = new LinkedHashMap<Channel, ActionSet>();
+ this.executing = new LinkedHashMap<Channel, ActionSet>();
+ this.pending = new ConcurrentLinkedQueue<Action>();
+ this.invalid = new ConcurrentLinkedQueue<Channel>();
+ this.selector = new ActionSelector();
+ this.latch = new Latch();
+ this.executor = executor;
+ this.cancel = cancel;
+ this.expiry = expiry;
+ this.start();
+ }
+
+ /**
+ * This is used to process the <code>Operation</code> object. This
+ * will wake up the selector if it is currently blocked selecting
+ * and register the operations associated channel. Once the
+ * selector is awake it will acquire the operation from the queue
+ * and register the associated <code>SelectableChannel</code> for
+ * selection. The operation will then be executed when the channel
+ * is ready for the interested I/O events.
+ *
+ * @param task this is the task that is scheduled for distribution
+ * @param require this is the bit-mask value for interested events
+ */
+ public void process(Operation task, int require) throws IOException {
+ Action action = new ExecuteAction(task, require, expiry);
+
+ if(!isActive()) {
+ throw new IOException("Distributor is closed");
+ }
+ pending.offer(action);
+ selector.wake();
+ }
+
+ /**
+ * This is used to close the distributor such that it cancels all
+ * of the registered channels and closes down the selector. This
+ * is used when the distributor is no longer required, after the
+ * close further attempts to process operations will fail.
+ */
+ public void close() throws IOException {
+ stop();
+ selector.wake();
+ latch.close();
+ }
+
+ /**
+ * This returns the number of channels that are currently selecting
+ * with this distributor. When busy this can get quite high, however
+ * it must return to zero as soon as all tasks have completed.
+ *
+ * @return return the number of channels currently selecting
+ */
+ public int size() {
+ return selecting.size();
+ }
+
+ /**
+ * Performs the execution of the distributor. Each distributor runs
+ * on an asynchronous thread to the <code>Reactor</code> which is
+ * used to perform the selection on a set of channels. Each time
+ * there is a new operation to be processed this will take the
+ * operation from the ready queue, cancel all outstanding channels,
+ * and register the operations associated channel for selection.
+ */
+ public void run() {
+ try {
+ execute();
+ } finally {
+ purge();
+ }
+ }
+
+ /**
+ * Performs the execution of the distributor. Each distributor runs
+ * on an asynchronous thread to the <code>Reactor</code> which is
+ * used to perform the selection on a set of channels. Each time
+ * there is a new operation to be processed this will take the
+ * operation from the ready queue, cancel all outstanding channels,
+ * and register the operations associated channel for selection.
+ */
+ private void execute() {
+ while(isActive()) {
+ try {
+ register();
+ cancel();
+ expire();
+ distribute();
+ validate();
+ } catch(Exception cause) {
+ report(cause);
+ }
+ }
+ }
+
+ /**
+ * This will purge all the actions from the distributor when the
+ * distributor ends. If there are any threads waiting on the close
+ * to finish they are signalled when all operations are purged.
+ * This will allow them to return ensuring no operations linger.
+ */
+ private void purge() {
+ try {
+ register();
+ cancel();
+ clear();
+ } catch(Exception cause) {
+ report(cause);
+ }
+ }
+
+ /**
+ * This method is called to ensure that if there is a global
+ * error that each action will know about it. Such an issue could
+ * be file handle exhaustion or an out of memory error. It is
+ * also possible that a poorly behaving action could cause an
+ * issue which should be know the the entire system.
+ *
+ * @param cause this is the exception to report
+ */
+ private void report(Exception cause) {
+ Set<Channel> channels = selecting.keySet();
+
+ for(Channel channel : channels) {
+ ActionSet set = selecting.get(channel);
+ Action[] list = set.list();
+
+ for(Action action : list) {
+ Operation operation = action.getOperation();
+ Trace trace = operation.getTrace();
+
+ try {
+ trace.trace(ERROR, cause);
+ } catch(Exception e) {
+ invalid.offer(channel);
+ }
+ }
+ }
+ invalid.clear();
+ }
+
+ /**
+ * Here we perform an expire which will take all of the registered
+ * sockets and expire it. This ensures that the operations can be
+ * executed within the executor and the cancellation of the sockets
+ * can be performed. Once this method has finished then all of
+ * the operations will have been scheduled for execution.
+ */
+ private void clear() throws IOException {
+ List<ActionSet> sets = selector.registeredSets();
+
+ for(ActionSet set : sets) {
+ Action[] list = set.list();
+
+ for(Action action : list) {
+ Operation task = action.getOperation();
+ Trace trace = task.getTrace();
+
+ try {
+ trace.trace(CLOSE_SELECTOR);
+ expire(set, Long.MAX_VALUE);
+ } catch(Exception cause) {
+ trace.trace(ERROR, cause);
+ }
+ }
+ }
+ selector.close();
+ latch.signal();
+ }
+
+ /**
+ * This method is used to expire registered operations that remain
+ * idle within the selector. Operations specify a time at which
+ * point they wish to be cancelled if the I/O event they wait on
+ * has not arisen. This will enables the cancelled operation to be
+ * cancelled so that the resources it occupies can be released.
+ */
+ private void expire() throws IOException {
+ List<ActionSet> sets = selector.registeredSets();
+
+ if(cancel) {
+ long time = System.currentTimeMillis();
+
+ if(update <= time) {
+ for(ActionSet set : sets) {
+ expire(set, time);
+ }
+ update = time +10000;
+ }
+ }
+ }
+
+ /**
+ * This method is used to expire registered operations that remain
+ * idle within the selector. Operations specify a time at which
+ * point they wish to be if the I/O event they wait on
+ * has not arisen. This will enables the cancelled operation to be
+ * cancelled so that the resources it occupies can be released.
+ *
+ * @param set this is the selection set check for expired actions
+ * @param time this is the time to check the expiry against
+ */
+ private void expire(ActionSet set, long time) throws IOException {
+ Action[] actions = set.list();
+ SelectionKey key = set.key();
+
+ if(key.isValid()) {
+ int mask = key.interestOps();
+
+ for(Action action : actions) {
+ int interest = action.getInterest();
+ long expiry = action.getExpiry();
+
+ if(expiry < time) {
+ expire(set, action);
+ mask &= ~interest;
+ }
+ }
+ update(set, mask);
+ }
+ }
+
+ /**
+ * This is used to update the interested operations of a set of
+ * actions. If there are no interested operations the set will be
+ * cancelled, otherwise the selection key will be updated with the
+ * new operations provided by the bitmask.
+ *
+ * @param set this is the action set that is to be updated
+ * @param interest this is the bitmask containing the operations
+ */
+ private void update(ActionSet set, int interest) throws IOException {
+ SelectionKey key = set.key();
+
+ if(interest == 0) {
+ Channel channel = key.channel();
+
+ selecting.remove(channel);
+ key.cancel();
+ } else {
+ key.interestOps(interest);
+ }
+ }
+
+ /**
+ * This method is used to expire registered operations that remain
+ * idle within the selector. Operations specify a time at which
+ * point they wish to be cancelled if the I/O event they wait on
+ * has not arisen. This will enables the cancelled operation to be
+ * cancelled so that the resources it occupies can be released.
+ *
+ * @param set this is the action set containing the actions
+ * @param action this is the actual action to be cancelled
+ */
+ private void expire(ActionSet set, Action action) throws IOException {
+ Action cancel = new CancelAction(action);
+
+ if(set != null) {
+ Operation task = action.getOperation();
+ Trace trace = task.getTrace();
+ int interest = action.getInterest();
+
+ try {
+ trace.trace(SELECT_EXPIRED, interest);
+ set.remove(interest);
+ execute(cancel);
+ } catch(Exception cause) {
+ trace.trace(ERROR, cause);
+ }
+ }
+ }
+
+ /**
+ * This method is used to perform simple validation. It ensures
+ * that directly after the processing loop any channels that
+ * are registered that have been cancelled or are closed will
+ * be removed from the selecting map and rejected.
+ */
+ private void validate() throws IOException {
+ Set<Channel> channels = selecting.keySet();
+
+ for(Channel channel : channels) {
+ ActionSet set = selecting.get(channel);
+ SelectionKey key = set.key();
+
+ if(!key.isValid()) {
+ invalid.offer(channel);
+ }
+ }
+ for(Channel channel : invalid) {
+ invalidate(channel);
+ }
+ invalid.clear();
+ }
+
+ /**
+ * This method is used to remove the channel from the selecting
+ * registry. It is rare that this will every happen, however it
+ * is important that tasks are cleared out in this manner as it
+ * could lead to a memory leak if left for a long time.
+ *
+ * @param channel this is the channel being validated
+ */
+ private void invalidate(Channel channel) throws IOException {
+ ActionSet set = selecting.remove(channel);
+ Action[] list = set.list();
+
+ for(Action action : list) {
+ Operation task = action.getOperation();
+ Trace trace = task.getTrace();
+
+ try {
+ trace.trace(INVALID_KEY);
+ execute(action); // reject
+ } catch(Exception cause) {
+ trace.trace(ERROR, cause);
+ }
+ }
+ }
+
+ /**
+ * This is used to cancel any selection keys that have previously
+ * been selected with an interested I/O event. Performing a cancel
+ * here ensures that on a the next select the associated channel
+ * is not considered, this ensures the select does not break.
+ */
+ private void cancel() throws IOException {
+ Collection<ActionSet> list = executing.values();
+
+ for(ActionSet set : list) {
+ Action[] actions = set.list();
+
+ for(Action action : actions) {
+ Operation task = action.getOperation();
+ Trace trace = task.getTrace();
+
+ trace.trace(SELECT_CANCEL);
+ }
+ set.cancel();
+ set.clear();
+ }
+ executing.clear();
+ }
+
+ /**
+ * Here all the enqueued <code>Operation</code> objects will be
+ * registered for selection. Each operations channel is used for
+ * selection on the interested I/O events. Once the I/O event
+ * occurs for the channel the operation is scheduled for execution.
+ */
+ private void register() throws IOException {
+ while(!pending.isEmpty()) {
+ Action action = pending.poll();
+
+ if(action != null) {
+ SelectableChannel channel = action.getChannel();
+ ActionSet set = executing.remove(channel);
+
+ if(set == null) {
+ set = selecting.get(channel);
+ }
+ if(set != null) {
+ update(action, set);
+ } else {
+ register(action);
+ }
+ }
+ }
+ }
+
+ /**
+ * Here the specified <code>Operation</code> object is registered
+ * with the selector. If the associated channel had previously
+ * been cancelled it is removed from the cancel map to ensure it
+ * is not removed from the selector when cancellation is done.
+ *
+ * @param action this is the operation that is to be registered
+ */
+ private void register(Action action) throws IOException {
+ SelectableChannel channel = action.getChannel();
+ Operation task = action.getOperation();
+ Trace trace = task.getTrace();
+
+ try {
+ if(channel.isOpen()) {
+ trace.trace(SELECT);
+ select(action);
+ } else {
+ trace.trace(CHANNEL_CLOSED);
+ selecting.remove(channel);
+ execute(action); // reject
+ }
+ }catch(Exception cause) {
+ trace.trace(ERROR, cause);
+ }
+ }
+
+ /**
+ * Here the specified <code>Operation</code> object is registered
+ * with the selector. If the associated channel had previously
+ * been cancelled it is removed from the cancel map to ensure it
+ * is not removed from the selector when cancellation is done.
+ *
+ * @param action this is the operation that is to be registered
+ * @param set this is the action set to register the action with
+ */
+ private void update(Action action, ActionSet set) throws IOException {
+ Operation task = action.getOperation();
+ Trace trace = task.getTrace();
+ SelectionKey key = set.key();
+ int interest = action.getInterest();
+ int current = key.interestOps();
+ int updated = current | interest;
+
+ try {
+ if(OP_READ == (interest & OP_READ)) {
+ trace.trace(UPDATE_READ_INTEREST);
+ }
+ if(OP_WRITE == (interest & OP_WRITE)) {
+ trace.trace(UPDATE_WRITE_INTEREST);
+ }
+ trace.trace(UPDATE_INTEREST, updated);
+ key.interestOps(updated);
+ set.attach(action);
+ } catch(Exception cause) {
+ trace.trace(ERROR, cause);
+ }
+ }
+
+ /**
+ * This method is used to perform an actual select on a channel. It
+ * will register the channel with the internal selector using the
+ * required I/O event bit mask. In order to ensure that selection
+ * is performed correctly the provided channel must be connected.
+ *
+ * @param action this is the operation that is to be registered
+ *
+ * @return this returns the selection key used for selection
+ */
+ private void select(Action action) throws IOException {
+ SelectableChannel channel = action.getChannel();
+ Operation task = action.getOperation();
+ Trace trace = task.getTrace();
+ int interest = action.getInterest();
+
+ if(interest > 0) {
+ ActionSet set = selector.register(channel, interest);
+
+ if(OP_READ == (interest & OP_READ)) {
+ trace.trace(REGISTER_READ_INTEREST);
+ }
+ if(OP_WRITE == (interest & OP_WRITE)) {
+ trace.trace(REGISTER_WRITE_INTEREST);
+ }
+ trace.trace(REGISTER_INTEREST, interest);
+ set.attach(action);
+ selecting.put(channel, set);
+ }
+ }
+
+ /**
+ * This method is used to perform the select and if required queue
+ * the operations that are ready for execution. If the selector
+ * is woken up without any ready channels then this will return
+ * quietly. If however there are a number of channels ready to be
+ * processed then they are handed to the executor object and
+ * marked as ready for cancellation.
+ */
+ private void distribute() throws IOException {
+ if(selector.select(5000) > 0) {
+ if(isActive()) {
+ process();
+ }
+ }
+ }
+
+ /**
+ * This will iterate over the set of selection keys and process each
+ * of them. The <code>Operation</code> associated with the selection
+ * key is handed to the executor to perform the channel operation.
+ * Also, if configured to cancel, this method will add the channel
+ * and the associated selection key to the cancellation map.
+ */
+ private void process() throws IOException{
+ List<ActionSet> ready = selector.selectedSets();
+
+ for(ActionSet set : ready) {
+ process(set);
+ remove(set);
+ }
+ }
+
+ /**
+ * This will use the specified action set to acquire the channel
+ * and <code>Operation</code> associated with it to hand to the
+ * executor to perform the channel operation.
+ *
+ * @param set this is the set of actions that are to be processed
+ */
+ private void process(ActionSet set) throws IOException {
+ Action[] actions = set.ready();
+
+ for(Action action : actions) {
+ Operation task = action.getOperation();
+ Trace trace = task.getTrace();
+ int interest = action.getInterest();
+
+ try {
+ if(OP_READ == (interest & OP_READ)) {
+ trace.trace(READ_INTEREST_READY, interest);
+ }
+ if(OP_WRITE == (interest & OP_WRITE)) {
+ trace.trace(WRITE_INTEREST_READY, interest);
+ }
+ execute(action);
+ } catch(Exception cause) {
+ trace.trace(ERROR, cause);
+ }
+ }
+ }
+
+ /**
+ * This method ensures that references to the actions and channel
+ * are cleared from this instance. To ensure there are no memory
+ * leaks it is important to clear out all actions and channels.
+ * Also, if configured to cancel executing actions this will
+ * register the channel and actions to cancel on the next loop.
+ *
+ * @param set this is the set of actions that are to be removed
+ */
+ private void remove(ActionSet set) throws IOException {
+ Channel channel = set.channel();
+ SelectionKey key = set.key();
+
+ if(key.isValid()) {
+ int interest = set.interest();
+ int ready = key.readyOps();
+
+ if(cancel) {
+ int remaining = interest & ~ready;
+
+ if(remaining == 0) {
+ executing.put(channel, set);
+ } else {
+ key.interestOps(remaining);
+ }
+ set.remove(ready);
+ }
+ } else {
+ selecting.remove(channel);
+ }
+ }
+
+ /**
+ * This is where the action is handed off to the executor. Before
+ * the action is executed a trace event is generated, this will
+ * ensure that the entry and exit points can be tracked. It is
+ * also useful in debugging performance issues and memory leaks.
+ *
+ * @param action this is the action to execute
+ */
+ private void execute(Action action) {
+ Operation task = action.getOperation();
+ Trace trace = task.getTrace();
+ int interest = action.getInterest();
+
+ try {
+ trace.trace(EXECUTE_ACTION, interest);
+ executor.execute(action);
+ } catch(Exception cause) {
+ trace.trace(ERROR, cause);
+ }
+ }
+}
+
+ \ No newline at end of file
diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/ActionSelector.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/ActionSelector.java
new file mode 100644
index 0000000..a9e78e9
--- /dev/null
+++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/ActionSelector.java
@@ -0,0 +1,193 @@
+/*
+ * ActionSelector.java February 2013
+ *
+ * Copyright (C) 2013, Niall Gallagher <niallg@users.sf.net>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.simpleframework.transport.reactor;
+
+import java.io.IOException;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * The <code>ActionSelector</code> object is used to perform socket
+ * based selection with the help of the <code>ActionSet</code> object.
+ * All registered channels have an associated action set. The action
+ * set contains a set of actions that should be executed when the
+ * selector selects the channel.
+ *
+ * @author Niall Gallagher
+ */
+class ActionSelector {
+
+ /**
+ * This is the selector used to perform the select operations.
+ */
+ private final Selector selector;
+
+ /**
+ * Constructor for the <code>ActionSelector</code> object. This is
+ * used to create a selector that can register socket channels
+ * with an associated <code>ActionSet</code>. The set can then be
+ * used to store actions to be executed upon selection.
+ */
+ public ActionSelector() throws IOException {
+ this.selector = Selector.open();
+ }
+
+ /**
+ * This is used to perform a select on the selector. This returns
+ * the number of channels that have been selected. If this returns
+ * a number greater than zero the <code>ready</code> method can
+ * be used to acquire the actions that are ready for execution.
+ *
+ * @param timeout this is the timeout associated with the select
+ *
+ * @return this returns the number of channels that are ready
+ */
+ public int select(long timeout) throws IOException {
+ return selector.select(timeout);
+ }
+
+ /**
+ * This performs the actual registration of the channel for selection
+ * based on the provided interest bitmask. When the channel has been
+ * registered for selection this returns an <code>ActionSet</code>
+ * for the selection key. The set can then be used to register the
+ * actions that should be executed when selection succeeds.
+ *
+ * @param channel this is the channel to register for selection
+ * @param interest this is the interested operations bitmask
+ *
+ * @return this is the action set associated with the registration
+ */
+ public ActionSet register(SelectableChannel channel, int interest) throws IOException {
+ SelectionKey key = channel.register(selector, interest);
+ Object value = key.attachment();
+
+ if(value == null) {
+ value = new ActionSet(key);
+ key.attach(value);
+ }
+ return (ActionSet)value;
+ }
+
+ /**
+ * This is used to acquire all the action sets that are associated
+ * with this selector. Only action sets that have a valid selection
+ * key are returned. Modification of the list will not affect the
+ * associated selector instance.
+ *
+ * @return this returns all the associated action sets for this
+ */
+ public List<ActionSet> registeredSets() {
+ Set<SelectionKey> keys = selector.keys();
+ Iterator<SelectionKey> ready = keys.iterator();
+
+ return registeredSets(ready);
+ }
+
+ /**
+ * This is used to acquire all the action sets that are associated
+ * with this selector. Only action sets that have a valid selection
+ * key are returned. Modification of the list will not affect the
+ * associated selector instance.
+ *
+ * @param keys the selection keys to get the associated sets from
+ *
+ * @return this returns all the associated action sets for this
+ */
+ private List<ActionSet> registeredSets(Iterator<SelectionKey> keys) {
+ List<ActionSet> sets = new LinkedList<ActionSet>();
+
+ while(keys.hasNext()) {
+ SelectionKey key = keys.next();
+ ActionSet actions = (ActionSet)key.attachment();
+
+ if(!key.isValid()) {
+ key.cancel();
+ } else {
+ sets.add(actions);
+ }
+ }
+ return sets;
+ }
+
+ /**
+ * This is used to acquire all the action sets that are selected
+ * by this selector. All action sets returned are unregistered from
+ * the selector and must be registered again to hear about further
+ * I/O events that occur on the associated channel.
+ *
+ * @return this returns all the selected action sets for this
+ */
+ public List<ActionSet> selectedSets() throws IOException {
+ Set<SelectionKey> keys = selector.selectedKeys();
+ Iterator<SelectionKey> ready = keys.iterator();
+
+ return selectedSets(ready);
+ }
+
+ /**
+ * This is used to acquire all the action sets that are selected
+ * by this selector. All action sets returned are unregistered from
+ * the selector and must be registered again to hear about further
+ * I/O events that occur on the associated channel.
+ *
+ * @param keys the selection keys to get the associated sets from
+ *
+ * @return this returns all the selected action sets for this
+ */
+ private List<ActionSet> selectedSets(Iterator<SelectionKey> keys) {
+ List<ActionSet> ready = new LinkedList<ActionSet>();
+
+ while(keys.hasNext()) {
+ SelectionKey key = keys.next();
+ ActionSet actions = (ActionSet)key.attachment();
+
+ if(key != null) {
+ keys.remove();
+ }
+ if(key != null) {
+ ready.add(actions);
+ }
+ }
+ return ready;
+ }
+
+ /**
+ * This is used to wake the selector if it is in the middle of a
+ * select operation. Waking up the selector in this manner is
+ * useful if further actions are to be registered with it.
+ */
+ public void wake() throws IOException {
+ selector.wakeup();
+ }
+
+ /**
+ * This is used to close the associated selector. Further attempts
+ * to register a channel with the selector will fail. All actions
+ * should be cancelled before closing the selector in this way.
+ */
+ public void close() throws IOException {
+ selector.close();
+ }
+}
diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/ActionSet.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/ActionSet.java
new file mode 100644
index 0000000..adc4ef7
--- /dev/null
+++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/ActionSet.java
@@ -0,0 +1,269 @@
+/*
+ * ActionSet.java February 2013
+ *
+ * Copyright (C) 2013, Niall Gallagher <niallg@users.sf.net>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.simpleframework.transport.reactor;
+
+import static java.nio.channels.SelectionKey.OP_ACCEPT;
+import static java.nio.channels.SelectionKey.OP_CONNECT;
+import static java.nio.channels.SelectionKey.OP_READ;
+import static java.nio.channels.SelectionKey.OP_WRITE;
+
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+
+/**
+ * The <code>ActionSet</code> object represents a set of actions that
+ * are associated with a particular selection key. Here the set
+ * stores an <code>Action</code> for each of the interested operation
+ * types. In some situations a single action may be interested in
+ * several operations which must be remembered by the set.
+ *
+ * @author Niall Gallagher
+ */
+class ActionSet {
+
+ /**
+ * This is the selection key associated with the action set.
+ */
+ private final SelectionKey key;
+
+ /**
+ * This contains the the actions indexed by operation type.
+ */
+ private final Action[] set;
+
+ /**
+ * Constructor for the <code>ActionSet</code> object. This is
+ * used to create a set for storing actions keyed by operation
+ * type. Only one action is kept per operation type.
+ *
+ * @param key this is the associated selection key
+ */
+ public ActionSet(SelectionKey key) {
+ this.set = new Action[4];
+ this.key = key;
+ }
+
+ /**
+ * This provides the selection key associated with the action set.
+ * For each ready operation on the selection key the set contains
+ * an action that can be executed.
+ *
+ * @return this provides the selection key for this action set
+ */
+ public SelectionKey key() {
+ return key;
+ }
+
+ /**
+ * This provides the channel associated with the action set. This
+ * is the channel that is registered for selection using the
+ * interested operations for the set.
+ *
+ * @return this returns the selectable channel for the action set
+ */
+ public SelectableChannel channel() {
+ return key.channel();
+ }
+
+ /**
+ * This provides an iterator of the actions that exist within the
+ * action set. Regardless of whether a single action is interested
+ * is several operations this will return an iteration of unique
+ * actions. Modifications to the iterator do not affect the set.
+ *
+ * @return this returns an iterator of unique actions for the set
+ */
+ public Action[] list() {
+ Action[] actions = new Action[4];
+ int count = 0;
+
+ for(Action action : set) {
+ if(action != null) {
+ actions[count++] = action;
+ }
+ }
+ return copyOf(actions, count);
+ }
+
+ /**
+ * This is sued to acquire all actions that match the currently
+ * ready operations of the key. All actions returned by this will
+ * be executed and the interest will typically be removed.
+ *
+ * @return returns the array of ready operations for the set
+ */
+ public Action[] ready() {
+ int ready = key.readyOps();
+
+ if(ready != 0) {
+ return get(ready);
+ }
+ return new Action[]{};
+ }
+
+ /**
+ * This is used to attach an action to the set for a specific
+ * interest bitmask. If the bitmask contains several operations
+ * then the action is registered for each individual operation.
+ *
+ * @param action this is the action that is to be attached
+ * @param interest this is the interest for the action
+ */
+ public void attach(Action action) {
+ int interest = action.getInterest();
+
+ if((interest | OP_READ) == interest) {
+ set[0] = action;
+ }
+ if((interest | OP_WRITE) == interest) {
+ set[1] = action;
+ }
+ if((interest | OP_ACCEPT) == interest) {
+ set[2] = action;
+ }
+ if((interest | OP_CONNECT) == interest) {
+ set[3] = action;
+ }
+ }
+
+ /**
+ * This is used to remove interest from the set. Removal of
+ * interest from the set is performed by registering a null for
+ * the interest operation.
+ *
+ * @param interest this is the interest to be removed
+ */
+ public Action[] remove(int interest) {
+ Action[] actions = get(interest);
+
+ if((interest | OP_READ) == interest) {
+ set[0] = null;
+ }
+ if((interest | OP_WRITE) == interest) {
+ set[1] = null;
+ }
+ if((interest | OP_ACCEPT) == interest) {
+ set[2] = null;
+ }
+ if((interest | OP_CONNECT) == interest) {
+ set[3] = null;
+ }
+ return actions;
+ }
+
+ /**
+ * This is used to acquire the actions that match the bitmask of
+ * interest operations. If there are no actions representing the
+ * interest required an empty array will be returned.
+ *
+ * @param interest this is the interest to acquire actions for
+ *
+ * @return this will return an array of actions for the interest
+ */
+ public Action[] get(int interest) {
+ Action[] actions = new Action[4];
+ int count = 0;
+
+ if((interest | OP_READ) == interest) {
+ if(set[0] != null) {
+ actions[count++] = set[0];
+ }
+ }
+ if((interest | OP_WRITE) == interest) {
+ if(set[1] != null) {
+ actions[count++] = set[1];
+ }
+ }
+ if((interest | OP_ACCEPT) == interest) {
+ if(set[2] != null) {
+ actions[count++] = set[2];
+ }
+ }
+ if((interest | OP_CONNECT) == interest) {
+ if(set[3] != null) {
+ actions[count++] = set[3];
+ }
+ }
+ return copyOf(actions, count);
+ }
+
+ /**
+ * This is used to create a copy of the specified list with only
+ * the first few non null values. This ensures we can keep the
+ * internal array immutable and still use arrays.
+ *
+ * @param list this is the list that is to be copied to a new array
+ * @param count this is the number of entries to copy from the list
+ *
+ * @return a copy of the original list up to the specified count
+ */
+ private Action[] copyOf(Action[] list, int count) {
+ Action[] copy = new Action[count];
+
+ for(int i = 0; i < count; i++) {
+ copy[i] = list[i];
+ }
+ return copy;
+ }
+
+ /**
+ * This is used to acquire the operations that this is interested
+ * in. If there are currently no registered actions then this will
+ * return zero. Interest is represented by non-null actions only.
+ *
+ * @return this returns the interested operations for this
+ */
+ public int interest() {
+ int interest = 0;
+
+ if(set[0] != null) {
+ interest |= OP_READ;
+ }
+ if(set[1] != null) {
+ interest |= OP_WRITE;
+ }
+ if(set[2] != null) {
+ interest |= OP_ACCEPT;
+ }
+ if(set[3] != null) {
+ interest |= OP_CONNECT;
+ }
+ return interest;
+ }
+
+ /**
+ * This is used to clear all interest from the set. This will
+ * basically clear out any actions that have been registered with
+ * the set. After invocation the iterator will be empty.
+ */
+ public void clear() {
+ set[0] = set[1] =
+ set[2] = set[3] = null;
+ }
+
+ /**
+ * This is used to cancel the <code>SelectionKey</code> associated
+ * with the action set. Canceling the key in this manner ensures
+ * it is not returned in further selection operations.
+ */
+ public void cancel() {
+ key.cancel();
+ }
+}
+ \ No newline at end of file
diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/CancelAction.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/CancelAction.java
new file mode 100644
index 0000000..c4ad894
--- /dev/null
+++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/CancelAction.java
@@ -0,0 +1,114 @@
+/*
+ * CancelAction.java February 2007
+ *
+ * Copyright (C) 2007, Niall Gallagher <niallg@users.sf.net>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.simpleframework.transport.reactor;
+
+import java.nio.channels.SelectableChannel;
+
+/**
+ * The <code>CancelAction</code> object is used to represent a task
+ * that can be executed to cancel an operation. This is used in the
+ * place of a normal <code>Operation</code> to pass for execution
+ * when the operation has expired before the I/O event is was
+ * interested in occurred. Before this is executed the operation is
+ * removed from selection.
+ *
+ * @author Niall Gallagher
+ */
+class CancelAction implements Action {
+
+ /**
+ * This is the operation that is to be canceled by this action.
+ */
+ private final Operation task;
+
+ /**
+ * This is the operation object that is to be canceled.
+ */
+ private final Action action;
+
+ /**
+ * Constructor for the <code>Cancellation</code> object. This is
+ * used to create a runnable task that delegates to the cancel
+ * method of the operation. This will be executed asynchronously
+ * by the executor after being removed from selection.
+ *
+ * @param action this is the task that is to be canceled by this
+ */
+ public CancelAction(Action action) {
+ this.task = action.getOperation();
+ this.action = action;
+ }
+
+ /**
+ * This method is executed by the <code>Executor</code> object
+ * if the operation expires before the required I/O event(s)
+ * have occurred. It is typically used to shutdown the socket
+ * and release any resources associated with the operation.
+ */
+ public void run() {
+ task.cancel();
+ }
+
+ /**
+ * This is used to get the expiry for the operation. The expiry
+ * represents some static time in the future when the action will
+ * expire if it does not become ready. This is used to cancel the
+ * operation so that it does not remain in the distributor.
+ *
+ * @return the remaining time this operation will wait for
+ */
+ public long getExpiry() {
+ return 0;
+ }
+
+ /**
+ * This returns the I/O operations that the action is interested
+ * in as an integer bit mask. When any of these operations are
+ * ready the distributor will execute the provided operation.
+ *
+ * @return the integer bit mask of interested I/O operations
+ */
+ public int getInterest() {
+ return action.getInterest();
+ }
+
+ /**
+ * This is the <code>SelectableChannel</code> which is used to
+ * determine if the operation should be executed. If the channel
+ * is ready for a given I/O event it can be run. For instance if
+ * the operation is used to perform some form of read operation
+ * it can be executed when ready to read data from the channel.
+ *
+ * @return this returns the channel used to govern execution
+ */
+ public SelectableChannel getChannel() {
+ return action.getChannel();
+ }
+
+ /**
+ * This is used to acquire the <code>Operation</code> that is to
+ * be executed when the required operations are ready. It is the
+ * responsibility of the distributor to invoke the operation.
+ *
+ * @return the operation to be executed when it is ready
+ */
+ public Operation getOperation() {
+ return task;
+ }
+} \ No newline at end of file
diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/ExecuteAction.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/ExecuteAction.java
new file mode 100644
index 0000000..b92174c
--- /dev/null
+++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/ExecuteAction.java
@@ -0,0 +1,121 @@
+/*
+ * ExecuteAction.java February 2007
+ *
+ * Copyright (C) 2007, Niall Gallagher <niallg@users.sf.net>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.simpleframework.transport.reactor;
+
+import java.nio.channels.SelectableChannel;
+
+/**
+ * The <code>ExecuteAction</code> object is represents an action
+ * that the distributor is to process. This contains the operation
+ * and the required I/O events as an integer bit mask as well as the
+ * selectable channel used to register for selection. In order to
+ * ensure that the action does not remain within the distributor for
+ * too long the action has an expiry time.
+ *
+ * @author Niall Gallagher
+ */
+class ExecuteAction implements Action {
+
+ /**
+ * The task to execute when the required operations is ready.
+ */
+ private final Operation task;
+
+ /**
+ * This is the bit mask of required operations to be executed.
+ */
+ private final int require;
+
+ /**
+ * This is the time in the future that the event will expire in.
+ */
+ private final long expiry;
+
+ /**
+ * Constructor for the <code>Event</code> object. The actions are
+ * used to encapsulate the task to execute and the operations
+ * to listen to when some action is to be performed.
+ *
+ * @param task this is the task to be executed when it is ready
+ * @param require this is the required operations to listen to
+ */
+ public ExecuteAction(Operation task, int require, long expiry) {
+ this.expiry = System.currentTimeMillis() + expiry;
+ this.require = require;
+ this.task = task;
+ }
+
+ /**
+ * This is used to execute the operation for the action. This will
+ * be executed when the interested I/O event is ready for the
+ * associated <code>SelectableChannel</code> object. If the action
+ * expires before the interested I/O operation is ready this will
+ * not be executed, instead the operation is canceled.
+ */
+ public void run() {
+ task.run();
+ }
+
+ /**
+ * This is used to get the expiry for the operation. The expiry
+ * represents some static time in the future when the action will
+ * expire if it does not become ready. This is used to cancel the
+ * operation so that it does not remain in the distributor.
+ *
+ * @return the remaining time this operation will wait for
+ */
+ public long getExpiry() {
+ return expiry;
+ }
+
+ /**
+ * This is the <code>SelectableChannel</code> which is used to
+ * determine if the operation should be executed. If the channel
+ * is ready for a given I/O event it can be run. For instance if
+ * the operation is used to perform some form of read operation
+ * it can be executed when ready to read data from the channel.
+ *
+ * @return this returns the channel used to govern execution
+ */
+ public SelectableChannel getChannel() {
+ return task.getChannel();
+ }
+
+ /**
+ * This is used to acquire the <code>Operation</code> that is to
+ * be executed when the required operations are ready. It is the
+ * responsibility of the distributor to invoke the operation.
+ *
+ * @return the operation to be executed when it is ready
+ */
+ public Operation getOperation() {
+ return task;
+ }
+
+ /**
+ * This returns the I/O operations that the action is interested
+ * in as an integer bit mask. When any of these operations are
+ * ready the distributor will execute the provided operation.
+ *
+ * @return the integer bit mask of interested I/O operations
+ */
+ public int getInterest() {
+ return require;
+ }
+} \ No newline at end of file
diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/ExecutorReactor.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/ExecutorReactor.java
new file mode 100644
index 0000000..9d741d8
--- /dev/null
+++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/ExecutorReactor.java
@@ -0,0 +1,132 @@
+/*
+ * ExecutorReactor.java February 2007
+ *
+ * Copyright (C) 2007, Niall Gallagher <niallg@users.sf.net>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.simpleframework.transport.reactor;
+
+import java.io.IOException;
+import java.util.concurrent.Executor;
+
+/**
+ * The <code>ExecutorReactor</code> is used to schedule operation for
+ * execution using an <code>Executor</code> implementation. This can be
+ * useful when the operations performed are time intensive. For example
+ * if the operations performed a read of the underlying channel and
+ * then had to parse the contents of the payload. Such operations would
+ * reduce the performance of the reactor if it could not delegate to
+ * some other form of executor, as it would delay their execution.
+ *
+ * @author Niall Gallagher
+ */
+public class ExecutorReactor implements Reactor {
+
+ /**
+ * This is used to distribute the ready operations for execution.
+ */
+ private final OperationDistributor exchange;
+
+ /**
+ * This is used to execute the operations that ready to run.
+ */
+ private final Executor executor;
+
+ /**
+ * Constructor for the <code>ExecutorReactor</code> object. This is
+ * used to create a reactor that can delegate to the executor. This
+ * also accepts the operations it is interested in, the value is
+ * taken from the <code>SelectionKey</code> object. A bit mask can
+ * be used to show interest in several operations at once.
+ *
+ * @param executor this is the executor used to run the operations
+ */
+ public ExecutorReactor(Executor executor) throws IOException {
+ this(executor, 1);
+ }
+
+ /**
+ * Constructor for the <code>ExecutorReactor</code> object. This is
+ * used to create a reactor that can delegate to the executor. This
+ * also accepts the operations it is interested in, the value is
+ * taken from the <code>SelectionKey</code> object. A bit mask can
+ * be used to show interest in several operations at once.
+ *
+ * @param executor this is the executor used to run the operations
+ * @param count this is the number of distributors to be used
+ */
+ public ExecutorReactor(Executor executor, int count) throws IOException {
+ this(executor, count, 120000);
+ }
+
+ /**
+ * Constructor for the <code>ExecutorReactor</code> object. This is
+ * used to create a reactor that can delegate to the executor. This
+ * also accepts the operations it is interested in, the value is
+ * taken from the <code>SelectionKey</code> object. A bit mask can
+ * be used to show interest in several operations at once.
+ *
+ * @param executor this is the executor used to run the operations
+ * @param count this is the number of distributors to be used
+ * @param expiry the length of time to maintain and idle operation
+ */
+ public ExecutorReactor(Executor executor, int count, long expiry) throws IOException {
+ this.exchange = new PartitionDistributor(executor, count, expiry);
+ this.executor = executor;
+ }
+
+ /**
+ * This method is used to execute the provided operation without
+ * the need to specifically check for I/O events. This is used if
+ * the operation knows that the <code>SelectableChannel</code> is
+ * ready, or if the I/O operation can be performed without knowing
+ * if the channel is ready. Typically this is an efficient means
+ * to perform a poll rather than a select on the channel.
+ *
+ * @param task this is the task to execute immediately
+ */
+ public void process(Operation task) throws IOException {
+ executor.execute(task);
+ }
+
+ /**
+ * This method is used to execute the provided operation when there
+ * is an I/O event that task is interested in. This will used the
+ * operations <code>SelectableChannel</code> object to determine
+ * the events that are ready on the channel. If this reactor is
+ * interested in any of the ready events then the task is executed.
+ *
+ * @param task this is the task to execute on interested events
+ * @param require this is the bit-mask value for interested events
+ */
+ public void process(Operation task, int require) throws IOException {
+ exchange.process(task, require);
+ }
+
+ /**
+ * This is used to stop the reactor so that further requests to
+ * execute operations does nothing. This will clean up all of
+ * the reactors resources and unregister any operations that are
+ * currently awaiting execution. This should be used to ensure
+ * any threads used by the reactor gracefully stop.
+ */
+ public void stop() throws IOException {
+ exchange.close();
+ }
+}
+
+
+
+
diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/Latch.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/Latch.java
new file mode 100644
index 0000000..deaee98
--- /dev/null
+++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/Latch.java
@@ -0,0 +1,71 @@
+/*
+ * Latch.java February 2007
+ *
+ * Copyright (C) 2007, Niall Gallagher <niallg@users.sf.net>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.simpleframework.transport.reactor;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * The <code>Latch</code> is used to provide a simple latch that will
+ * allow a thread to block until it is signaled that it is ready.
+ * The latch will block on the <code>close</code> method and when the
+ * latch is signaled the close method will release all threads.
+ *
+ * @author Niall Gallagher
+ */
+class Latch extends CountDownLatch {
+
+ /**
+ * Constructor for the <code>Latch</code> object. This will
+ * create a count down latch that will block when it is
+ * closed. Any blocked threads will be released when the
+ * latch is signaled that it is ready.
+ */
+ public Latch() {
+ super(1);
+ }
+
+ /**
+ * This is used to signal that the latch is ready. Invoking
+ * this method will release all threads that are blocking on
+ * the close method. This method is used when the distributor
+ * is closed and all operations have been purged.
+ */
+ public void signal() throws IOException {
+ try {
+ countDown();
+ } catch(Exception e) {
+ throw new IOException("Thread interrupted");
+ }
+ }
+
+ /**
+ * This will block all threads attempting to close the latch.
+ * All threads will be release when the latch is signaled. This
+ * is used to ensure the distributor blocks until it has fully
+ * purged all registered operations that are registered.
+ */
+ public void close() throws IOException {
+ try {
+ await();
+ } catch(Exception e){
+ throw new IOException("Thread interrupted");
+ }
+ }
+} \ No newline at end of file
diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/Operation.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/Operation.java
new file mode 100644
index 0000000..e97be61
--- /dev/null
+++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/Operation.java
@@ -0,0 +1,69 @@
+/*
+ * Operation.java February 2007
+ *
+ * Copyright (C) 2007, Niall Gallagher <niallg@users.sf.net>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.simpleframework.transport.reactor;
+
+import java.nio.channels.SelectableChannel;
+
+import org.simpleframework.transport.trace.Trace;
+
+/**
+ * The <code>Operation</code> interface is used to describe a task
+ * that can be executed when the associated channel is ready for some
+ * operation. Typically the <code>SelectableChannel</code> is used to
+ * register with a selector with a set of given interested operations
+ * when those operations can be performed this is executed.
+ *
+ * @author Niall Gallagher
+ *
+ * @see org.simpleframework.transport.reactor.Reactor
+ */
+public interface Operation extends Runnable {
+
+ /**
+ * This is used to acquire the trace object that is associated
+ * with the operation. A trace object is used to collection details
+ * on what operations are being performed. For instance it may
+ * contain information relating to I/O events or errors.
+ *
+ * @return this returns the trace associated with this operation
+ */
+ Trace getTrace();
+
+ /**
+ * This is the <code>SelectableChannel</code> which is used to
+ * determine if the operation should be executed. If the channel
+ * is ready for a given I/O event it can be run. For instance if
+ * the operation is used to perform some form of read operation
+ * it can be executed when ready to read data from the channel.
+ *
+ * @return this returns the channel used to govern execution
+ */
+ SelectableChannel getChannel();
+
+ /**
+ * This is used to cancel the operation if it has timed out. This
+ * is typically invoked when it has been waiting in a selector for
+ * an extended duration of time without any active operations on
+ * it. In such a case the reactor must purge the operation to free
+ * the memory and open channels associated with the operation.
+ */
+ void cancel();
+}
+
+
diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/OperationDistributor.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/OperationDistributor.java
new file mode 100644
index 0000000..3a83909
--- /dev/null
+++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/OperationDistributor.java
@@ -0,0 +1,62 @@
+/*
+ * Distributor.java February 2007
+ *
+ * Copyright (C) 2007, Niall Gallagher <niallg@users.sf.net>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.simpleframework.transport.reactor;
+
+import java.io.IOException;
+
+/**
+ * The <code>Distributor</code> object is used to execute operations
+ * that have an interested I/O event ready. This acts much like a
+ * scheduler would in that it delays the execution of the operations
+ * until such time as the associated <code>SelectableChannel</code>
+ * has an interested I/O event ready.
+ * <p>
+ * This distributor has two modes, one mode is used to cancel the
+ * channel once an I/O event has occurred. This means that the channel
+ * is removed from the <code>Selector</code> so that the selector
+ * does not break when asked to select again. Canceling the channel
+ * is useful when the operation execution may not fully read the
+ * payload or when the operation takes a significant amount of time.
+ *
+ * @see org.simpleframework.transport.reactor.ActionDistributor
+ */
+interface OperationDistributor {
+
+ /**
+ * This is used to process the <code>Operation</code> object. This
+ * will wake up the selector if it is currently blocked selecting
+ * and register the operations associated channel. Once the
+ * selector is awake it will acquire the operation from the queue
+ * and register the associated <code>SelectableChannel</code> for
+ * selection. The operation will then be executed when the channel
+ * is ready for the interested I/O events.
+ *
+ * @param task this is the task that is scheduled for distribution
+ * @param require this is the bit-mask value for interested events
+ */
+ void process(Operation task, int require) throws IOException;
+
+ /**
+ * This is used to close the distributor such that it cancels all
+ * of the registered channels and closes down the selector. This
+ * is used when the distributor is no longer required, after the
+ * close further attempts to process operations will fail.
+ */
+ void close() throws IOException;
+}
diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/PartitionDistributor.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/PartitionDistributor.java
new file mode 100644
index 0000000..b0d24ac
--- /dev/null
+++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/PartitionDistributor.java
@@ -0,0 +1,136 @@
+/*
+ * PartitionDistributor.java February 2007
+ *
+ * Copyright (C) 2007, Niall Gallagher <niallg@users.sf.net>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.simpleframework.transport.reactor;
+
+import java.io.IOException;
+import java.nio.channels.SelectableChannel;
+import java.util.concurrent.Executor;
+
+/**
+ * The <code>PartitionDistributor</code> object is a distributor that
+ * partitions the selection process in to several threads. Each of
+ * the threads has a single selector, and operations are distributed
+ * amongst the threads using the hash code of the socket. Partitions
+ * ensure that several selector threads can share a higher load and
+ * respond to a more I/O events.
+ *
+ * @author Niall Gallagher
+ */
+class PartitionDistributor implements OperationDistributor {
+
+ /**
+ * This contains the distributors that represent a partition.
+ */
+ private final OperationDistributor[] list;
+
+ /**
+ * Constructor for the <code>PartitionDistributor</code> object.
+ * This will create a distributor that partitions the operations
+ * amongst a pool of selectors using the channels hash code.
+ *
+ * @param executor this is the executor used to run operations
+ * @param count this is the number of partitions to be used
+ */
+ public PartitionDistributor(Executor executor, int count) throws IOException {
+ this(executor, count, 120000);
+ }
+
+ /**
+ * Constructor for the <code>PartitionDistributor</code> object.
+ * This will create a distributor that partitions the operations
+ * amongst a pool of selectors using the channels hash code.
+ *
+ * @param executor this is the executor used to run operations
+ * @param count this is the number of partitions to be used
+ * @param expiry this is the expiry duration that is to be used
+ */
+ public PartitionDistributor(Executor executor, int count, long expiry) throws IOException {
+ this.list = new OperationDistributor[count];
+ this.start(executor, expiry);
+ }
+
+ /**
+ * This is used to create the partitions that represent a thread
+ * used for selection. Operations will index to a particular one
+ * using the hash code of the operations channel. If there is only
+ * one partition all operations will index to the partition.
+ *
+ * @param executor the executor used to run the operations
+ * @param expiry this is the expiry duration that is to be used
+ */
+ private void start(Executor executor, long expiry) throws IOException {
+ for(int i = 0; i < list.length; i++) {
+ list[i] = new ActionDistributor(executor, true, expiry);
+ }
+ }
+
+ /**
+ * This is used to process the <code>Operation</code> object. This
+ * will wake up the selector if it is currently blocked selecting
+ * and register the operations associated channel. Once the
+ * selector is awake it will acquire the operation from the queue
+ * and register the associated <code>SelectableChannel</code> for
+ * selection. The operation will then be executed when the channel
+ * is ready for the interested I/O events.
+ *
+ * @param task this is the task that is scheduled for distribution
+ * @param require this is the bit-mask value for interested events
+ */
+ public void process(Operation task, int require) throws IOException {
+ int length = list.length;
+
+ if(length == 1) {
+ list[0].process(task, require);
+ } else {
+ process(task, require, length);
+ }
+ }
+
+ /**
+ * This is used to process the <code>Operation</code> object. This
+ * will wake up the selector if it is currently blocked selecting
+ * and register the operations associated channel. Once the
+ * selector is awake it will acquire the operation from the queue
+ * and register the associated <code>SelectableChannel</code> for
+ * selection. The operation will then be executed when the channel
+ * is ready for the interested I/O events.
+ *
+ * @param task this is the task that is scheduled for distribution
+ * @param require this is the bit-mask value for interested events
+ * @param length this is the number of distributors to hash with
+ */
+ private void process(Operation task, int require, int length) throws IOException {
+ SelectableChannel channel = task.getChannel();
+ int hash = channel.hashCode();
+
+ list[hash % length].process(task, require);
+ }
+
+ /**
+ * This is used to close the distributor such that it cancels all
+ * of the registered channels and closes down the selector. This
+ * is used when the distributor is no longer required, after the
+ * close further attempts to process operations will fail.
+ */
+ public void close() throws IOException {
+ for(OperationDistributor entry : list) {
+ entry.close();
+ }
+ }
+}
diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/Reactor.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/Reactor.java
new file mode 100644
index 0000000..a947cb5
--- /dev/null
+++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/Reactor.java
@@ -0,0 +1,79 @@
+/*
+ * Reactor.java February 2007
+ *
+ * Copyright (C) 2007, Niall Gallagher <niallg@users.sf.net>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.simpleframework.transport.reactor;
+
+import java.io.IOException;
+
+/**
+ * The <code>Reactor</code> interface is used to describe an object
+ * that is used to schedule asynchronous I/O operations. An operation
+ * is performed by handing it to the reactor, which will determine
+ * if an interested event has occurred. This allows the operation to
+ * perform the task in a manner that does not block.
+ * <p>
+ * Implementing an <code>Operation</code> object requires that the
+ * operation itself is aware of the I/O task it is performing. For
+ * example, if the operation is concerned with reading data from the
+ * underlying channel then the operation should perform the read, if
+ * there is more data required then that operation to register with
+ * the reactor again to receive further notifications.
+ *
+ * @author Niall Gallagher
+ *
+ * @see org.simpleframework.transport.reactor.Operation
+ */
+public interface Reactor {
+
+ /**
+ * This method is used to execute the provided operation without
+ * the need to specifically check for I/O events. This is used if
+ * the operation knows that the <code>SelectableChannel</code> is
+ * ready, or if the I/O operation can be performed without knowing
+ * if the channel is ready. Typically this is an efficient means
+ * to perform a poll rather than a select on the channel.
+ *
+ * @param task this is the task to execute immediately
+ */
+ void process(Operation task) throws IOException;
+
+ /**
+ * This method is used to execute the provided operation when there
+ * is an I/O event that task is interested in. This will used the
+ * operations <code>SelectableChannel</code> object to determine
+ * the events that are ready on the channel. If this reactor is
+ * interested in any of the ready events then the task is executed.
+ *
+ * @param task this is the task to execute on interested events
+ * @param require this is the bitmask value for interested events
+ */
+ void process(Operation task, int require) throws IOException;
+
+ /**
+ * This is used to stop the reactor so that further requests to
+ * execute operations does nothing. This will clean up all of
+ * the reactors resources and unregister any operations that are
+ * currently awaiting execution. This should be used to ensure
+ * any threads used by the reactor gracefully stop.
+ */
+ void stop() throws IOException;
+}
+
+
+
+
diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/ReactorEvent.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/ReactorEvent.java
new file mode 100644
index 0000000..529644e
--- /dev/null
+++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/ReactorEvent.java
@@ -0,0 +1,120 @@
+/*
+ * ReactorEvent.java February 2007
+ *
+ * Copyright (C) 2007, Niall Gallagher <niallg@users.sf.net>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.simpleframework.transport.reactor;
+
+/**
+ * The <code>ReactorEvent</code> enumeration is used for tracing the
+ * operations that occur within the reactor. This is useful when the
+ * performance of the system needs to be monitored or when there is a
+ * resource or memory consumption issue that needs to be debugged.
+ *
+ * @author Niall Gallagher
+ */
+public enum ReactorEvent {
+
+ /**
+ * This event indicates the registration of an I/O interest.
+ */
+ SELECT,
+
+ /**
+ * This indicates that the selected I/O interest has not occurred.
+ */
+ SELECT_EXPIRED,
+
+ /**
+ * This occurs when a selection key is cancelled for all interests.
+ */
+ SELECT_CANCEL,
+
+ /**
+ * This is used to indicate the channel is already selecting.
+ */
+ ALREADY_SELECTING,
+
+ /**
+ * This occurs rarely however it indicates an invalid registration.
+ */
+ INVALID_KEY,
+
+ /**
+ * This occurs upon the initial registration of an I/O interest.
+ */
+ REGISTER_INTEREST,
+
+ /**
+ * This occurs upon the initial registration of a read I/O interest.
+ */
+ REGISTER_READ_INTEREST,
+
+ /**
+ * This occurs upon the initial registration of a write I/O interest.
+ */
+ REGISTER_WRITE_INTEREST,
+
+ /**
+ * This is used to indicate the operation interest changed.
+ */
+ UPDATE_INTEREST,
+
+ /**
+ * This occurs upon the initial registration of a read I/O interest.
+ */
+ UPDATE_READ_INTEREST,
+
+ /**
+ * This occurs upon the initial registration of a write I/O interest.
+ */
+ UPDATE_WRITE_INTEREST,
+
+ /**
+ * This indicates that the I/O interest has been satisfied.
+ */
+ INTEREST_READY,
+
+ /**
+ * This indicates that the I/O read interest has been satisfied.
+ */
+ READ_INTEREST_READY,
+
+ /**
+ * This indicates that the I/O write interest has been satisfied.
+ */
+ WRITE_INTEREST_READY,
+
+ /**
+ * This is the final action of executing the action.
+ */
+ EXECUTE_ACTION,
+
+ /**
+ * This occurs on an attempt to register an closed channel.
+ */
+ CHANNEL_CLOSED,
+
+ /**
+ * This occurs when the selector has been shutdown globally.
+ */
+ CLOSE_SELECTOR,
+
+ /**
+ * This occurs if there is an error with the selection.
+ */
+ ERROR,
+}
diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/SynchronousReactor.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/SynchronousReactor.java
new file mode 100644
index 0000000..102829d
--- /dev/null
+++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/reactor/SynchronousReactor.java
@@ -0,0 +1,107 @@
+/*
+ * SynchronousReactor.java February 2007
+ *
+ * Copyright (C) 2007, Niall Gallagher <niallg@users.sf.net>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.simpleframework.transport.reactor;
+
+import java.io.IOException;
+import java.util.concurrent.Executor;
+
+import org.simpleframework.common.thread.SynchronousExecutor;
+
+/**
+ * The <code>SynchronousReactor</code> object is used to execute the
+ * ready operations of within a single synchronous thread. This is
+ * used when the I/O operations to be performed do not require much
+ * time to execute and so will not block the execution thread.
+ *
+ * @author Niall Gallagher
+ */
+public class SynchronousReactor implements Reactor {
+
+ /**
+ * This is used to distribute the ready operations for execution.
+ */
+ private final OperationDistributor exchange;
+
+ /**
+ * This is used to execute the operations that ready to run.
+ */
+ private final Executor executor;
+
+ /**
+ * Constructor for the <code>SynchronousReactor</code> object. This
+ * is used to create a reactor that does not require thread pooling
+ * to execute the ready operations. All I/O operations are run
+ * in the selection thread and should complete quickly.
+ */
+ public SynchronousReactor() throws IOException {
+ this(false);
+ }
+
+ /**
+ * Constructor for the <code>SynchronousReactor</code> object. This
+ * is used to create a reactor that does not require thread pooling
+ * to execute the ready operations. All I/O operations are run
+ * in the selection thread and should complete quickly.
+ *
+ * @param cancel determines the selection key should be cancelled
+ */
+ public SynchronousReactor(boolean cancel) throws IOException {
+ this.executor = new SynchronousExecutor();
+ this.exchange = new ActionDistributor(executor, cancel);
+ }
+
+ /**
+ * This method is used to execute the provided operation without
+ * the need to specifically check for I/O events. This is used if
+ * the operation knows that the <code>SelectableChannel</code> is
+ * ready, or if the I/O operation can be performed without knowing
+ * if the channel is ready. Typically this is an efficient means
+ * to perform a poll rather than a select on the channel.
+ *
+ * @param task this is the task to execute immediately
+ */
+ public void process(Operation task) throws IOException {
+ executor.execute(task);
+ }
+
+ /**
+ * This method is used to execute the provided operation when there
+ * is an I/O event that task is interested in. This will used the
+ * operations <code>SelectableChannel</code> object to determine
+ * the events that are ready on the channel. If this reactor is
+ * interested in any of the ready events then the task is executed.
+ *
+ * @param task this is the task to execute on interested events
+ * @param require this is the bit-mask value for interested events
+ */
+ public void process(Operation task, int require) throws IOException {
+ exchange.process(task, require);
+ }
+
+ /**
+ * This is used to stop the reactor so that further requests to
+ * execute operations does nothing. This will clean up all of
+ * the reactors resources and unregister any operations that are
+ * currently awaiting execution. This should be used to ensure
+ * any threads used by the reactor graceful stop.
+ */
+ public void stop() throws IOException {
+ exchange.close();
+ }
+}
diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/trace/Trace.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/trace/Trace.java
new file mode 100644
index 0000000..7e3711d
--- /dev/null
+++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/trace/Trace.java
@@ -0,0 +1,57 @@
+/*
+ * Trace.java October 2012
+ *
+ * Copyright (C) 2002, Niall Gallagher <niallg@users.sf.net>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.simpleframework.transport.trace;
+
+/**
+ * The <code>Trace</code> interface represents an trace log for various
+ * connection events. A trace is not limited to low level I/O events
+ * it can also gather event data that relates to protocol specific
+ * events. Using a trace in this manner enables problems to be solved
+ * with connections as they arise.
+ * <p>
+ * When implementing a <code>Trace</code> there should be special
+ * attention paid to its affect on the performance of the server. The
+ * trace is used deep within the core and any delays experienced in
+ * the trace will be reflected in the performance of the server.
+ *
+ * @author Niall Gallagher
+ *
+ * @see org.simpleframework.transport.trace.TraceAnalyzer
+ */
+public interface Trace {
+
+ /**
+ * This method is used to accept an event that occurred on the socket
+ * associated with this trace. Typically the event is a symbolic
+ * description of the event such as an enum or a string.
+ *
+ * @param event this is the event that occurred on the socket
+ */
+ void trace(Object event);
+
+ /**
+ * This method is used to accept an event that occurred on the socket
+ * associated with this trace. Typically the event is a symbolic
+ * description of the event such as an enum or a string.
+ *
+ * @param event this is the event that occurred on the socket
+ * @param value provides additional information such as an exception
+ */
+ void trace(Object event, Object value);
+} \ No newline at end of file
diff --git a/simple/simple-transport/src/main/java/org/simpleframework/transport/trace/TraceAnalyzer.java b/simple/simple-transport/src/main/java/org/simpleframework/transport/trace/TraceAnalyzer.java
new file mode 100644
index 0000000..af3550b
--- /dev/null
+++ b/simple/simple-transport/src/main/java/org/simpleframework/transport/trace/TraceAnalyzer.java
@@ -0,0 +1,59 @@
+/*
+ * TraceAnalyzer.java October 2012
+ *
+ * Copyright (C) 2002, Niall Gallagher <niallg@users.sf.net>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.simpleframework.transport.trace;
+
+import java.nio.channels.SelectableChannel;
+
+/**
+ * The <code>TraceAnalyzer</code> object represents a tracing analyzer
+ * used to monitor events on a socket. Its primary responsibilities
+ * are to create <code>Trace</code> objects that are attached to a
+ * specific socket channel. When any event occurs on that channel the
+ * trace is notified and can forward the details on for analysis.
+ * <p>
+ * An analyzer implementation must make sure that it does not affect
+ * the performance of the server. If there are delays creating a trace
+ * or within the trace itself it will have an impact on performance.
+ *
+ * @author Niall Gallagher
+ *
+ * @see org.simpleframework.transport.trace.Trace
+ */
+public interface TraceAnalyzer {
+
+ /**
+ * This method is used to attach a trace to the specified channel.
+ * Attaching a trace basically means associating events from that
+ * trace with the specified socket. It ensures that the events
+ * from a specific channel can be observed in isolation.
+ *
+ * @param channel this is the channel to associate with the trace
+ *
+ * @return this returns a trace associated with the channel
+ */
+ Trace attach(SelectableChannel channel);
+
+ /**
+ * This is used to stop the agent and clear all trace information.
+ * Stopping the agent is typically done when the server is stopped
+ * and is used to free any resources associated with the agent. If
+ * an agent does not hold information this method can be ignored.
+ */
+ void stop();
+} \ No newline at end of file