summaryrefslogtreecommitdiffstats
path: root/simple/simple-transport/src/test/java/org/simpleframework/transport/TransportTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'simple/simple-transport/src/test/java/org/simpleframework/transport/TransportTest.java')
-rw-r--r--simple/simple-transport/src/test/java/org/simpleframework/transport/TransportTest.java404
1 files changed, 404 insertions, 0 deletions
diff --git a/simple/simple-transport/src/test/java/org/simpleframework/transport/TransportTest.java b/simple/simple-transport/src/test/java/org/simpleframework/transport/TransportTest.java
new file mode 100644
index 0000000..1a14431
--- /dev/null
+++ b/simple/simple-transport/src/test/java/org/simpleframework/transport/TransportTest.java
@@ -0,0 +1,404 @@
+package org.simpleframework.transport;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.util.LinkedList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
+
+import junit.framework.TestCase;
+
+import org.simpleframework.transport.reactor.ExecutorReactor;
+import org.simpleframework.transport.reactor.Reactor;
+
+/**
+ * Measure the performance of the transports to ensure that the perform
+ * well and that they send the correct sequence of bytes and that the
+ * blocks sent are in the correct order. This also performs a comparison
+ * with direct socket output streams to ensure there is a reasonable
+ * performance difference.
+ *
+ * @author Niall Gallagher
+ */
+public class TransportTest extends TestCase {
+
+ private static final int REPEAT = 1000;
+
+ public void testTransport() throws Exception {
+ testTransport(REPEAT);
+ }
+
+ public void testTransport(int repeat) throws Exception {
+ for(int i = 1; i < 7; i++) { // just do some random sizes
+ testTransport(i, 100);
+ }
+ for(int i = 4092; i < 4102; i++) {
+ testTransport(i, 100);
+ }
+ for(int i = 8190; i < 8200; i++) {
+ testTransport(i, 100);
+ }
+ for(int i = 11282; i < 11284; i++) {
+ testTransport(i, 1000);
+ }
+ for(int i = 204800; i < 204805; i++) {
+ testTransport(i, 1000);
+ }
+ testTransport(16, repeat);
+ testTransport(64, repeat);
+ testTransport(256, repeat);
+ testTransport(1024, repeat);
+ testTransport(2048, repeat);
+ testTransport(4096, repeat);
+ testTransport(4098, repeat);
+ testTransport(8192, repeat);
+ testTransport(8197, repeat);
+ }
+
+ // Test blocking transport
+ private void testTransport(int size, int repeat) throws Exception {
+ // ThreadDumper dumper = new ThreadDumper();
+ SocketConsumer consumer = new SocketConsumer(size, repeat);
+ SocketAddress address = new InetSocketAddress("localhost", consumer.getPort());
+ SocketChannel channel = SocketChannel.open();
+ channel.configureBlocking(false); // underlying socket must be non-blocking
+ channel.connect(address);
+
+ while(!channel.finishConnect()) { // wait to finish connection
+ Thread.sleep(10);
+ };
+ ExecutorService executor = Executors.newFixedThreadPool(20);
+ Reactor reactor = new ExecutorReactor(executor);
+ // Transport transport = new SocketTransport(channel, reactor, 2, 3);//XXX bug
+ MockSocket pipeline = new MockSocket(channel);
+ Transport transport = new SocketTransport(pipeline, reactor, 8192);
+ OutputStream out = new TransportOutputStream(transport);
+
+ // dumper.start();
+ testOutputStream(consumer, out, size, repeat);
+
+ out.close();
+ executor.shutdown();
+ channel.close();
+ reactor.stop();
+ // dumper.kill();
+ Thread.sleep(100);
+ }
+
+ public void s_testSocket() throws Exception {
+ s_testSocket(REPEAT);
+ }
+
+ public void s_testSocket(int repeat) throws Exception {
+ testSocket(16, repeat);
+ testSocket(64, repeat);
+ testSocket(256, repeat);
+ testSocket(1024, repeat);
+ testSocket(2048, repeat);
+ testSocket(4098, repeat);
+ testSocket(8192, repeat);
+ }
+
+ // Test blocking socket
+ private void testSocket(int size, int repeat) throws Exception {
+ // ThreadDumper dumper = new ThreadDumper();
+ SocketConsumer consumer = new SocketConsumer(size, repeat);
+ Socket socket = new Socket("localhost", consumer.getPort());
+ OutputStream out = socket.getOutputStream();
+
+ //dumper.start();
+ testOutputStream(consumer, out, size, repeat);
+
+ out.close();
+ socket.close();
+ //dumper.kill();
+ Thread.sleep(100);
+ }
+
+ private class AlpahbetIterator {
+
+ private byte[] alphabet = "abcdefghijklmnopqstuvwxyz".getBytes();
+
+ private int off;
+
+ public byte next() {
+ if(off == alphabet.length) {
+ off = 0;
+ }
+ return alphabet[off++];
+ }
+
+ public void reset() {
+ off = 0;
+ }
+ }
+
+ private void testOutputStream(SocketConsumer consumer, OutputStream out, int size, int repeat) throws Exception {
+ byte[] block = new byte[size]; // write size
+ AlpahbetIterator it = new AlpahbetIterator(); // write known data
+
+ for(int i = 1; i < block.length; i++) {
+ block[i] = it.next();
+ }
+ AtomicLong count = new AtomicLong();
+ PerformanceMonitor monitor = new PerformanceMonitor(consumer, count, out.getClass().getSimpleName(), size);
+
+ for(int i = 0; i < repeat; i++) {
+ block[0] = (byte) i; // mark the first byte in the block to be sure we get blocks in sequence
+ //System.err.println("["+i+"]"+new String(block,"ISO-8859-1"));
+ out.write(block); // manipulation of the underlying buffer is taking place when the compact is invoked, this is causing major problems as the next packet will be out of sequence
+ count.addAndGet(block.length);
+ }
+ Thread.sleep(2000); // wait for all bytes to flush through to consumer
+ monitor.kill();
+ }
+
+ private class PerformanceMonitor extends Thread {
+ private AtomicLong count;
+
+ private volatile boolean dead;
+
+ private SocketConsumer consumer;
+
+ private String name;
+
+ private int size;
+
+ public PerformanceMonitor(SocketConsumer consumer, AtomicLong count, String name, int size) {
+ this.consumer = consumer;
+ this.count = count;
+ this.name = name;
+ this.size = size;
+ this.start();
+ }
+
+ public void run() {
+ int second = 0;
+ while(!dead) {
+ try {
+ long octets = count.longValue();
+ System.out.printf("%s,%s,%s,%s,%s%n", name, size, second++, octets, consumer.getWindow());
+ Thread.sleep(1000);
+ } catch(Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ public void kill() throws Exception {
+ dead = true;
+ }
+ }
+
+ private class SocketConsumer extends Thread {
+
+ private ServerSocket server;
+
+ private Window window;
+
+ private long repeat;
+
+ private long size;
+
+ public SocketConsumer(int size, int repeat) throws Exception {
+ this.window = new Window(20);
+ this.server = getSocket();
+ this.repeat = repeat;
+ this.size = size;
+ this.start();
+ }
+
+ public int getPort() {
+ return server.getLocalPort();
+ }
+
+ public String getWindow() {
+ return window.toString();
+ }
+
+ private ServerSocket getSocket() throws Exception {
+ // Scan the ephemeral port range
+ for(int i = 2000; i < 10000; i++) { // keep trying to grab the socket
+ try {
+ ServerSocket socket = new ServerSocket(i);
+ System.out.println("port=["+socket.getLocalPort()+"]");
+ return socket;
+ } catch(Exception e) {
+ Thread.sleep(200);
+ }
+ }
+ // Scan a second time for good measure, maybe something got freed up
+ for(int i = 2000; i < 10000; i++) { // keep trying to grab the socket
+ try {
+ ServerSocket socket = new ServerSocket(i);
+ System.out.println("port=["+socket.getLocalPort()+"]");
+ return socket;
+ } catch(Exception e) {
+ Thread.sleep(200);
+ }
+ }
+ throw new IOException("Could not create a client socket");
+ }
+
+ public void run() {
+ long count = 0;
+ int windowOctet = 0;
+ int expectWindowOctet = 0;
+
+ try {
+ Socket socket = server.accept();
+ InputStream in = socket.getInputStream();
+ InputStream source = new BufferedInputStream(in);
+ AlpahbetIterator it = new AlpahbetIterator();
+
+ scan: for(int i = 0; i < repeat; i++) {
+ int octet = source.read(); // check first byte in the block to make sure its correct in sequence
+
+ if(octet == -1) {
+ break scan;
+ }
+ count++; // we have read another byte
+ windowOctet = octet & 0x000000ff;
+ expectWindowOctet = i & 0x000000ff;
+
+ if((byte) octet != (byte) i) {
+ throw new Exception("Wrong sequence of blocks sent, was "
+ + (byte)octet + " should have been " + (byte)i + " count is "+count+" window is "+window+" compare "+explore(it, source, 5));
+ }
+ window.recieved(octet);
+
+ for(int j = 1, k = 0; j < size; j++, k++) {
+ octet = source.read();
+
+ if(octet == -1) {
+ break scan;
+ }
+ byte next = it.next();
+
+ if((byte) octet != next) {
+ throw new Exception("Invalid data received expected "+((byte)octet)+"("+((char)octet)+
+ ") but was "+next+"("+((char)next)+") total count is "+count+" block count is "+k+" window is expected "+
+ expectWindowOctet+"("+((char)expectWindowOctet)+")("+((byte)expectWindowOctet)+") got "+windowOctet+"("+
+ ((char)windowOctet)+")("+((byte)windowOctet)+") "+window+" compare "+explore(it, source, 5));
+ }
+ count++;
+ }
+ it.reset();
+ }
+ } catch(Throwable e) {
+ e.printStackTrace();
+ }
+ if(count != size * repeat) {
+ new Exception("Invalid number of bytes read, was " + count
+ + " should have been " + (size * repeat)).printStackTrace();
+ }
+ try {
+ // server.close();
+ }catch(Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ private String explore(AlpahbetIterator it, InputStream source, int count) throws IOException {
+ StringBuffer buf = new StringBuffer();
+ buf.append("expected (");
+ for(int i = 0; i < count; i++) {
+ buf.append( (char)it.next() );
+ }
+ buf.append(") is (");
+ for(int i = 0; i < count; i++) {
+ buf.append( (char)source.read() );
+ }
+ buf.append(")");
+ return buf.toString();
+ }
+ }
+
+
+ private static class TransportOutputStream extends OutputStream {
+
+ private Transport transport;
+
+ public TransportOutputStream(Transport transport) {
+ this.transport = transport;
+ }
+
+ public void write(int octet) throws IOException {
+ byte[] data = new byte[] { (byte) octet };
+ write(data);
+ }
+
+ public void write(byte[] data, int off, int len) throws IOException {
+ try {
+ ByteBuffer buffer = ByteBuffer.wrap(data, off, len);
+ ByteBuffer safe = buffer.asReadOnlyBuffer();
+
+ if(len > 0) {
+ transport.write(safe);
+ }
+ } catch(Exception e) {
+ e.printStackTrace();
+ throw new IOException("Write failed");
+ }
+ }
+
+ public void flush() throws IOException {
+ try {
+ transport.flush();
+ } catch(Exception e) {
+ e.printStackTrace();
+ throw new IOException("Flush failed");
+ }
+ }
+
+ public void close() throws IOException {
+ try {
+ transport.close();
+ } catch(Exception e) {
+ e.printStackTrace();
+ throw new IOException("Close failed");
+ }
+ }
+
+ }
+
+ private static class Window {
+
+ private final LinkedList<String> window;
+ private final int size;
+
+ public Window(int size) {
+ this.window = new LinkedList<String>();
+ this.size = size;
+ }
+
+ public synchronized void recieved(int sequence) {
+ window.addLast(String.valueOf(sequence));
+
+ if(window.size() > size) {
+ window.removeFirst();
+ }
+ }
+
+ public synchronized String toString() {
+ StringBuilder builder = new StringBuilder("[");
+ String delim = "";
+ for(String b : window) {
+ builder.append(delim).append(b);
+ delim=", ";
+ }
+ builder.append("]");
+ return builder.toString();
+ }
+ }
+
+}