summaryrefslogtreecommitdiffstats
path: root/simple/simple-transport/src/test/java/org/simpleframework/transport/SocketTransportPipeTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'simple/simple-transport/src/test/java/org/simpleframework/transport/SocketTransportPipeTest.java')
-rw-r--r--simple/simple-transport/src/test/java/org/simpleframework/transport/SocketTransportPipeTest.java92
1 files changed, 92 insertions, 0 deletions
diff --git a/simple/simple-transport/src/test/java/org/simpleframework/transport/SocketTransportPipeTest.java b/simple/simple-transport/src/test/java/org/simpleframework/transport/SocketTransportPipeTest.java
new file mode 100644
index 0000000..6654f31
--- /dev/null
+++ b/simple/simple-transport/src/test/java/org/simpleframework/transport/SocketTransportPipeTest.java
@@ -0,0 +1,92 @@
+package org.simpleframework.transport;
+
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.util.concurrent.Executor;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import junit.framework.TestCase;
+
+import org.simpleframework.common.thread.ConcurrentExecutor;
+import org.simpleframework.transport.reactor.ExecutorReactor;
+import org.simpleframework.transport.reactor.Reactor;
+import org.simpleframework.transport.trace.MockTrace;
+import org.simpleframework.transport.trace.Trace;
+
+public class SocketTransportPipeTest extends TestCase {
+
+ private static final int ITERATIONS = 100000;
+
+ public void testPipe() throws Exception {
+ ServerSocket server = new ServerSocket(0);
+ SocketAddress address = new InetSocketAddress("localhost", server.getLocalPort());
+ SocketChannel channel = SocketChannel.open();
+ channel.configureBlocking(false); // underlying socket must be non-blocking
+ channel.connect(address);
+ while(!channel.finishConnect()) { // wait to finish connection
+ Thread.sleep(10);
+ };
+ Trace trace = new MockTrace();
+ SocketWrapper wrapper = new SocketWrapper(channel, trace);
+ Executor executor = new ConcurrentExecutor(Runnable.class);
+ Reactor reactor = new ExecutorReactor(executor);
+ SocketTransport transport = new SocketTransport(wrapper,reactor);
+ java.net.Socket socket = server.accept();
+ final InputStream read = socket.getInputStream();
+ final ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+ final LinkedBlockingQueue<String> sent = new LinkedBlockingQueue<String>();
+ final LinkedBlockingQueue<String> received = new LinkedBlockingQueue<String>();
+ Thread thread = new Thread(new Runnable() {
+ public void run(){
+ try {
+ byte[] token = new byte[]{'\r','\n'};
+ int pos = 0;
+ int count = 0;
+ while((count = read.read()) != -1){
+ if(count != token[pos++]){
+ pos = 0;
+ }
+ if(pos == token.length) {
+ String value = buffer.toString().trim();
+ String expect = sent.take();
+
+ if(!value.equals(expect)) {
+ throw new Exception("Out of sequence expected " + expect + " but got " + value);
+ }
+ received.offer(value);
+ buffer.reset();
+ pos = 0;
+ } else {
+ buffer.write(count);
+ System.err.write(count);
+ System.err.flush();
+ }
+
+ }
+ }catch(Exception e){
+ e.printStackTrace();
+ }
+ }
+ });
+ thread.start();
+ for(int i = 0; i < ITERATIONS; i++) {
+ String message = "message-"+i;
+ transport.write(ByteBuffer.wrap((message+"\r\n").getBytes()));
+ sent.offer(message);
+ }
+ transport.flush();
+ transport.close();
+
+ for(int i = 0; i < ITERATIONS; i++) {
+ assertEquals(received.take(), "message-"+i);
+ }
+ assertTrue(sent.isEmpty());
+ assertTrue(received.isEmpty());
+
+ }
+}