diff options
author | serya <serya@chromium.org> | 2014-11-18 22:34:17 -0800 |
---|---|---|
committer | Commit bot <commit-bot@chromium.org> | 2014-11-19 06:34:36 +0000 |
commit | 648090c58236cc635b853992b53b67d38eb3ca6a (patch) | |
tree | 085d6789cf221c085071661add961ab46f022a6e /components/devtools_bridge | |
parent | 85fb6f102205ae33f4fde9d3afe66f626d5459b4 (diff) | |
download | chromium_src-648090c58236cc635b853992b53b67d38eb3ca6a.zip chromium_src-648090c58236cc635b853992b53b67d38eb3ca6a.tar.gz chromium_src-648090c58236cc635b853992b53b67d38eb3ca6a.tar.bz2 |
Native tunnel implementation.
Native API lets using single IO thread (in contrast with Java LocalSocket which provides blocking API).
Another benefit of using native implementation is sharing code with client side (see https://codereview.chromium.org/720133002/ - net::StreamSocket needed).
Significant tests modifications needed because:
1. net::StreamSocket doesn't allow to close stream in each direction separately (in request/response scenario server calculates end of stream position from content).
2. SocketTunnelServer becomes dependent on native implementation of AbstractDataChannel.
TBR=szym@chromium.org (for dependency on net)
BUG=383418
Review URL: https://codereview.chromium.org/735003004
Cr-Commit-Position: refs/heads/master@{#304771}
Diffstat (limited to 'components/devtools_bridge')
33 files changed, 1228 insertions, 863 deletions
diff --git a/components/devtools_bridge/DEPS b/components/devtools_bridge/DEPS index 655f7e5..699e22a 100644 --- a/components/devtools_bridge/DEPS +++ b/components/devtools_bridge/DEPS @@ -1,6 +1,7 @@ include_rules = [ "-chrome", "-content", + "+net", "+third_party/libjingle", "+third_party/webrtc", ] diff --git a/components/devtools_bridge/abstract_data_channel.h b/components/devtools_bridge/abstract_data_channel.h index 43acc45..473774e 100644 --- a/components/devtools_bridge/abstract_data_channel.h +++ b/components/devtools_bridge/abstract_data_channel.h @@ -8,6 +8,7 @@ #include <string> #include "base/callback.h" +#include "base/memory/ref_counted.h" #include "base/memory/scoped_ptr.h" namespace devtools_bridge { @@ -37,6 +38,26 @@ class AbstractDataChannel { DISALLOW_COPY_AND_ASSIGN(Observer); }; + /** + * Proxy for accessing data channel from a different thread. + * May outlive data channel (methods will have no effect if DataChannel + * destroyed). + */ + class Proxy : public base::RefCountedThreadSafe<Proxy> { + public: + virtual void SendBinaryMessage(const void* data, size_t length) = 0; + virtual void Close() = 0; + + protected: + Proxy() {} + virtual ~Proxy() {} + + private: + friend class base::RefCountedThreadSafe<Proxy>; + + DISALLOW_COPY_AND_ASSIGN(Proxy); + }; + virtual void RegisterObserver(scoped_ptr<Observer> observer) = 0; virtual void UnregisterObserver() = 0; @@ -45,6 +66,8 @@ class AbstractDataChannel { virtual void Close() = 0; + virtual scoped_refptr<Proxy> proxy() = 0; + private: DISALLOW_COPY_AND_ASSIGN(AbstractDataChannel); }; diff --git a/components/devtools_bridge/android/java/src/org/chromium/components/devtools_bridge/ServerSession.java b/components/devtools_bridge/android/java/src/org/chromium/components/devtools_bridge/ServerSession.java index 90c8b34..4dadcb9 100644 --- a/components/devtools_bridge/android/java/src/org/chromium/components/devtools_bridge/ServerSession.java +++ b/components/devtools_bridge/android/java/src/org/chromium/components/devtools_bridge/ServerSession.java @@ -20,7 +20,7 @@ public class ServerSession extends SessionBase implements SessionBase.ServerSess public ServerSession(SessionDependencyFactory factory, Executor executor, String defaultSocketName) { - super(factory, executor, new SocketTunnelServer(defaultSocketName)); + super(factory, executor, factory.newSocketTunnelServer(defaultSocketName)); } @Override @@ -154,8 +154,8 @@ public class ServerSession extends SessionBase implements SessionBase.ServerSess } } - protected SocketTunnelServer createSocketTunnelServer(String serverSocketName) { - return new SocketTunnelServer(serverSocketName); + protected SocketTunnel newSocketTunnelServer(String serverSocketName) { + return mFactory.newSocketTunnelServer(serverSocketName); } private final class ClientMessageHandler extends SessionControlMessages.ClientMessageHandler { diff --git a/components/devtools_bridge/android/java/src/org/chromium/components/devtools_bridge/SessionBase.java b/components/devtools_bridge/android/java/src/org/chromium/components/devtools_bridge/SessionBase.java index db793b0..11fd9b6 100644 --- a/components/devtools_bridge/android/java/src/org/chromium/components/devtools_bridge/SessionBase.java +++ b/components/devtools_bridge/android/java/src/org/chromium/components/devtools_bridge/SessionBase.java @@ -43,7 +43,7 @@ public abstract class SessionBase { private static final int DEFAULT_TUNNEL_CHANNEL_ID = 1; private final Executor mExecutor; - private final SessionDependencyFactory mFactory; + protected final SessionDependencyFactory mFactory; private AbstractPeerConnection mConnection; private AbstractDataChannel mControlChannel; private List<String> mCandidates = new ArrayList<String>(); @@ -51,8 +51,8 @@ public abstract class SessionBase { private boolean mConnected = false; private Cancellable mAutoCloseTask; private SessionControlMessages.MessageHandler mControlMessageHandler; - private final Map<Integer, SocketTunnelBase> mTunnels = - new HashMap<Integer, SocketTunnelBase>(); + private final Map<Integer, SocketTunnel> mTunnels = + new HashMap<Integer, SocketTunnel>(); private EventListener mEventListener; protected int mAutoCloseTimeoutMs = 30000; @@ -126,7 +126,7 @@ public abstract class SessionBase { protected SessionBase(SessionDependencyFactory factory, Executor executor, - SocketTunnelBase defaultTunnel) { + SocketTunnel defaultTunnel) { mExecutor = executor; mFactory = factory; addTunnel(DEFAULT_TUNNEL_CHANNEL_ID, defaultTunnel); @@ -136,6 +136,10 @@ public abstract class SessionBase { checkCalledOnSessionThread(); if (isStarted()) stop(); + + for (SocketTunnel tunnel : mTunnels.values()) { + tunnel.dispose(); + } } public void setEventListener(EventListener listener) { @@ -152,7 +156,7 @@ public abstract class SessionBase { return mTunnels.containsKey(channelId); } - private final void addTunnel(int channelId, SocketTunnelBase tunnel) { + private final void addTunnel(int channelId, SocketTunnel tunnel) { assert !mTunnels.containsKey(channelId); assert !tunnel.isBound(); // Tunnel renegotiation not implemented. @@ -203,9 +207,9 @@ public abstract class SessionBase { mControlMessageHandler = handler; mControlChannel.registerObserver(new ControlChannelObserver()); - for (Map.Entry<Integer, SocketTunnelBase> entry : mTunnels.entrySet()) { + for (Map.Entry<Integer, SocketTunnel> entry : mTunnels.entrySet()) { int channelId = entry.getKey(); - SocketTunnelBase tunnel = entry.getValue(); + SocketTunnel tunnel = entry.getValue(); tunnel.bind(connection().createDataChannel(channelId)); } } @@ -220,7 +224,7 @@ public abstract class SessionBase { stopAutoCloseTimer(); - for (SocketTunnelBase tunnel : mTunnels.values()) { + for (SocketTunnel tunnel : mTunnels.values()) { tunnel.unbind().dispose(); } diff --git a/components/devtools_bridge/android/java/src/org/chromium/components/devtools_bridge/SessionDependencyFactory.java b/components/devtools_bridge/android/java/src/org/chromium/components/devtools_bridge/SessionDependencyFactory.java index 94c99f7..e90e8ce 100644 --- a/components/devtools_bridge/android/java/src/org/chromium/components/devtools_bridge/SessionDependencyFactory.java +++ b/components/devtools_bridge/android/java/src/org/chromium/components/devtools_bridge/SessionDependencyFactory.java @@ -38,5 +38,7 @@ public abstract class SessionDependencyFactory { public abstract AbstractPeerConnection createPeerConnection( RTCConfiguration config, AbstractPeerConnection.Observer observer); + public abstract SocketTunnel newSocketTunnelServer(String socketName); + public abstract void dispose(); } diff --git a/components/devtools_bridge/android/java/src/org/chromium/components/devtools_bridge/SessionDependencyFactoryNative.java b/components/devtools_bridge/android/java/src/org/chromium/components/devtools_bridge/SessionDependencyFactoryNative.java index f98f126..cac78a5 100644 --- a/components/devtools_bridge/android/java/src/org/chromium/components/devtools_bridge/SessionDependencyFactoryNative.java +++ b/components/devtools_bridge/android/java/src/org/chromium/components/devtools_bridge/SessionDependencyFactoryNative.java @@ -37,6 +37,11 @@ public class SessionDependencyFactoryNative extends SessionDependencyFactory { } @Override + public SocketTunnel newSocketTunnelServer(String socketBase) { + return new SocketTunnelServerImpl(mFactoryPtr, socketBase); + } + + @Override public void dispose() { nativeDestroyFactory(mFactoryPtr); } @@ -104,6 +109,10 @@ public class SessionDependencyFactoryNative extends SessionDependencyFactory { mChannelPtr = ptr; } + long nativePtr() { + return mChannelPtr; + } + @Override public void registerObserver(Observer observer) { nativeRegisterDataChannelObserver(mChannelPtr, observer); @@ -142,6 +151,44 @@ public class SessionDependencyFactoryNative extends SessionDependencyFactory { } } + private static class SocketTunnelServerImpl implements SocketTunnel { + private final String mSocketName; + private final long mFactoryPtr; + private DataChannelImpl mDataChannel; + private long mTunnelPtr; + + public SocketTunnelServerImpl(long factoryPtr, String socketName) { + mFactoryPtr = factoryPtr; + mSocketName = socketName; + } + + @Override + public void bind(AbstractDataChannel dataChannel) { + mDataChannel = (DataChannelImpl) dataChannel; + mTunnelPtr = nativeCreateSocketTunnelServer( + mFactoryPtr, mDataChannel.nativePtr(), mSocketName); + } + + @Override + public AbstractDataChannel unbind() { + AbstractDataChannel result = mDataChannel; + nativeDestroySocketTunnelServer(mTunnelPtr); + mTunnelPtr = 0; + mDataChannel = null; + return result; + } + + @Override + public boolean isBound() { + return mDataChannel != null; + } + + @Override + public void dispose() { + assert !isBound(); + } + } + // Peer connection callbacks. @CalledByNative @@ -229,4 +276,8 @@ public class SessionDependencyFactoryNative extends SessionDependencyFactory { long channelPtr, ByteBuffer message, int size); private static native void nativeSendTextMessage(long channelPtr, ByteBuffer message, int size); private static native void nativeCloseDataChannel(long channelPtr); + + private static native long nativeCreateSocketTunnelServer( + long factoryPtr, long channelPtr, String socketName); + private static native void nativeDestroySocketTunnelServer(long tunnelPtr); } diff --git a/components/devtools_bridge/android/java/src/org/chromium/components/devtools_bridge/SocketTunnel.java b/components/devtools_bridge/android/java/src/org/chromium/components/devtools_bridge/SocketTunnel.java new file mode 100644 index 0000000..f7670e8 --- /dev/null +++ b/components/devtools_bridge/android/java/src/org/chromium/components/devtools_bridge/SocketTunnel.java @@ -0,0 +1,27 @@ +// Copyright 2014 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +package org.chromium.components.devtools_bridge; + +/** + * Interface for client or server socket tunnel. Tunnels a socket over a data channel. + * Client tunnel should be bound to one side and server tunnel to another. + * + * Data flow schema looks like this: + * + * DevToolsServer + * <-unix socket-> + * SocketTunnelServer + * <-data channel-> + * SocketTunnelClient + * <- unix socket -> + * Client (DevTools frontend) + */ +interface SocketTunnel { + void bind(AbstractDataChannel dataChannel); + AbstractDataChannel unbind(); + boolean isBound(); + + void dispose(); +} diff --git a/components/devtools_bridge/android/java/src/org/chromium/components/devtools_bridge/SocketTunnelServer.java b/components/devtools_bridge/android/java/src/org/chromium/components/devtools_bridge/SocketTunnelServer.java deleted file mode 100644 index 7ef0c95..0000000 --- a/components/devtools_bridge/android/java/src/org/chromium/components/devtools_bridge/SocketTunnelServer.java +++ /dev/null @@ -1,135 +0,0 @@ -// Copyright 2014 The Chromium Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. - -package org.chromium.components.devtools_bridge; - -import android.net.LocalSocketAddress; - -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - -/** - * Tunnels DevTools UNIX socket over AbstractDataChannel. - */ -public class SocketTunnelServer extends SocketTunnelBase { - private final LocalSocketAddress mAddress; - - private final ExecutorService mReadingThreadPool = Executors.newCachedThreadPool(); - - // Connections with opened client to server stream. If bound to data channel must be accessed - // on signaling thread. - private final Map<Integer, Connection> mClientConnections = - new HashMap<Integer, Connection>(); - - // Connections with opened server to client stream. Values are added - // on signaling thread and removed on reading thread. - private final ConcurrentMap<Integer, Connection> mServerConnections = - new ConcurrentHashMap<Integer, Connection>(); - - public SocketTunnelServer(String socketName) { - mAddress = new LocalSocketAddress(socketName); - } - - @Override - public AbstractDataChannel unbind() { - AbstractDataChannel dataChannel = super.unbind(); - - mReadingThreadPool.shutdownNow(); - - // Safe to access mClientConnections here once AbstractDataChannel.Observer detached. - for (Connection connection : mClientConnections.values()) { - connection.terminate(); - } - mClientConnections.clear(); - return dataChannel; - } - - public boolean hasConnections() { - return mClientConnections.size() + mServerConnections.size() > 0; - } - - @Override - protected void onReceivedDataPacket(int connectionId, byte[] data) throws ProtocolError { - checkCalledOnSignalingThread(); - - if (!mClientConnections.containsKey(connectionId)) { - throw new ProtocolError("Unknows conection id"); - } - - mClientConnections.get(connectionId).onReceivedDataPacket(data); - } - - @Override - protected void onReceivedControlPacket(int connectionId, byte opCode) throws ProtocolError { - checkCalledOnSignalingThread(); - - switch (opCode) { - case CLIENT_OPEN: - onClientOpen(connectionId); - break; - - case CLIENT_CLOSE: - onClientClose(connectionId); - break; - - default: - throw new ProtocolError("Invalid opCode"); - } - } - - private void onClientOpen(int connectionId) throws ProtocolError { - checkCalledOnSignalingThread(); - - if (mClientConnections.containsKey(connectionId) || - mServerConnections.containsKey(connectionId)) { - throw new ProtocolError("Conection id already used"); - } - - Connection connection = new Connection(connectionId); - mClientConnections.put(connectionId, connection); - mServerConnections.put(connectionId, connection); - - mReadingThreadPool.execute(connection); - } - - private void onClientClose(int connectionId) throws ProtocolError { - checkCalledOnSignalingThread(); - - if (!mClientConnections.containsKey(connectionId)) { - throw new ProtocolError("Unknows connection id"); - } - - Connection connection = mClientConnections.get(connectionId); - - connection.closedByClient(); - mClientConnections.remove(connectionId); - } - - private final class Connection extends ConnectionBase implements Runnable { - public Connection(int id) { - super(id); - } - - public void closedByClient() { - shutdownOutput(); - } - - @Override - public void run() { - assert mServerConnections.containsKey(mId); - - if (connect(mAddress)) { - sendToDataChannel(buildControlPacket(mId, SERVER_OPEN_ACK)); - runReadingLoop(); - } - mServerConnections.remove(mId); - shutdownInput(); - sendToDataChannel(buildControlPacket(mId, SERVER_CLOSE)); - } - } -} diff --git a/components/devtools_bridge/android/javatests/src/org/chromium/components/devtools_bridge/DevToolsBridgeServerTest.java b/components/devtools_bridge/android/javatests/src/org/chromium/components/devtools_bridge/DevToolsBridgeServerTest.java index 253e7b2..5710a2b 100644 --- a/components/devtools_bridge/android/javatests/src/org/chromium/components/devtools_bridge/DevToolsBridgeServerTest.java +++ b/components/devtools_bridge/android/javatests/src/org/chromium/components/devtools_bridge/DevToolsBridgeServerTest.java @@ -74,9 +74,10 @@ public class DevToolsBridgeServerTest extends ServiceTestCase<LocalBindingTestSe Future<String> response = TestUtils.asyncRequest(CLIENT_SOCKET_NAME, REQUEST); LocalSocket serverSocket = serverListeningSocket.accept(); - String request = TestUtils.readAll(serverSocket); + String request = TestUtils.read(serverSocket, REQUEST.length()); Assert.assertEquals(REQUEST, request); - TestUtils.writeAndShutdown(serverSocket, RESPONSE); + TestUtils.write(serverSocket, RESPONSE); + serverSocket.close(); Assert.assertEquals(RESPONSE, response.get()); clientSession.dispose(); diff --git a/components/devtools_bridge/android/javatests/src/org/chromium/components/devtools_bridge/LocalSessionBridgeTest.java b/components/devtools_bridge/android/javatests/src/org/chromium/components/devtools_bridge/LocalSessionBridgeTest.java index 222c285..358702c 100644 --- a/components/devtools_bridge/android/javatests/src/org/chromium/components/devtools_bridge/LocalSessionBridgeTest.java +++ b/components/devtools_bridge/android/javatests/src/org/chromium/components/devtools_bridge/LocalSessionBridgeTest.java @@ -18,9 +18,9 @@ import java.util.concurrent.Future; */ public class LocalSessionBridgeTest extends InstrumentationTestCase { private static final String SERVER_SOCKET_NAME = - "org.chromium.components.devtools_bridge.LocalSessionBridgeTest.SERVER_SOCKET"; + LocalSessionBridgeTest.class.getName() + ".SERVER_SOCKET"; private static final String CLIENT_SOCKET_NAME = - "org.chromium.components.devtools_bridge.LocalSessionBridgeTest.CLIENT_SOCKET"; + LocalSessionBridgeTest.class.getName() + ".CLIENT_SOCKET"; private static final String REQUEST = "Request"; private static final String RESPONSE = "Response"; @@ -79,9 +79,36 @@ public class LocalSessionBridgeTest extends InstrumentationTestCase { LocalServerSocket serverListeningSocket = new LocalServerSocket(SERVER_SOCKET_NAME); Future<String> response = TestUtils.asyncRequest(CLIENT_SOCKET_NAME, REQUEST); LocalSocket serverSocket = serverListeningSocket.accept(); - String request = TestUtils.readAll(serverSocket); + String request = TestUtils.read(serverSocket, REQUEST.length()); Assert.assertEquals(REQUEST, request); - TestUtils.writeAndShutdown(serverSocket, RESPONSE); + TestUtils.write(serverSocket, RESPONSE); + serverSocket.close(); Assert.assertEquals(RESPONSE, response.get()); } + + @MediumTest + public void testRequestFailure1() throws Exception { + mBridge.start(); + LocalServerSocket serverListeningSocket = new LocalServerSocket(SERVER_SOCKET_NAME); + Future<String> response = TestUtils.asyncRequest(CLIENT_SOCKET_NAME, REQUEST); + LocalSocket socket = serverListeningSocket.accept(); + int firstByte = socket.getInputStream().read(); + + Assert.assertEquals((int) REQUEST.charAt(0), firstByte); + + socket.close(); + Assert.assertEquals("", response.get()); + } + + @MediumTest + public void testRequestFailure2() throws Exception { + mBridge.dispose(); + // Android system socket will reject connection. + mBridge = new LocalSessionBridge("jdwp-control", CLIENT_SOCKET_NAME); + mBridge.start(); + + Future<String> response = TestUtils.asyncRequest(CLIENT_SOCKET_NAME, REQUEST); + + Assert.assertEquals("", response.get()); + } } diff --git a/components/devtools_bridge/android/javatests/src/org/chromium/components/devtools_bridge/LocalTunnelBridgeTest.java b/components/devtools_bridge/android/javatests/src/org/chromium/components/devtools_bridge/LocalTunnelBridgeTest.java deleted file mode 100644 index 6810c28..0000000 --- a/components/devtools_bridge/android/javatests/src/org/chromium/components/devtools_bridge/LocalTunnelBridgeTest.java +++ /dev/null @@ -1,111 +0,0 @@ -// Copyright 2014 The Chromium Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. - -package org.chromium.components.devtools_bridge; - -import android.net.LocalServerSocket; -import android.net.LocalSocket; -import android.test.InstrumentationTestCase; -import android.test.suitebuilder.annotation.SmallTest; - -import junit.framework.Assert; - -import java.io.IOException; -import java.util.concurrent.Future; - -/** - * Tests for {@link SocketTunnelBridge} - */ -public class LocalTunnelBridgeTest extends InstrumentationTestCase { - private static final String REQUEST = "Request"; - private static final String RESPONSE = "Response"; - - private static final String SERVER_SOCKET_NAME = - "org.chromium.components.devtools_bridge.LocalTunnelBridgeTest.SERVER_SOCKET"; - private static final String CLIENT_SOCKET_NAME = - "org.chromium.components.devtools_bridge.LocalTunnelBridgeTest.CLIENT_SOCKET"; - - private LocalTunnelBridge mBridge; - private LocalServerSocket mServerSocket; - - @Override - public void setUp() throws Exception { - super.setUp(); - mServerSocket = new LocalServerSocket(SERVER_SOCKET_NAME); - } - - private void startBridge() throws IOException { - startBridge(SERVER_SOCKET_NAME); - } - - private void startBridge(String serverSocketName) throws IOException { - Assert.assertNull(mBridge); - mBridge = new LocalTunnelBridge(serverSocketName, CLIENT_SOCKET_NAME); - mBridge.start(); - } - - @Override - public void tearDown() throws Exception { - super.tearDown(); - if (mBridge != null) { - mBridge.dispose(); - mBridge = null; - } - mServerSocket.close(); - } - - @SmallTest - public void testStartStop() throws Exception { - startBridge(); - mBridge.stop(); - } - - @SmallTest - public void testRequestResponse() throws Exception { - startBridge(); - - Future<String> response = TestUtils.asyncRequest(CLIENT_SOCKET_NAME, REQUEST); - - LocalSocket socket = mServerSocket.accept(); - String request = TestUtils.readAll(socket); - TestUtils.writeAndShutdown(socket, RESPONSE); - - Assert.assertEquals(REQUEST, request); - - Assert.assertEquals(RESPONSE, response.get()); - socket.close(); - - mBridge.stop(); - } - - @SmallTest - public void testRequestFailure1() throws Exception { - startBridge(); - - Future<String> response = TestUtils.asyncRequest(CLIENT_SOCKET_NAME, REQUEST); - LocalSocket socket = mServerSocket.accept(); - int firstByte = socket.getInputStream().read(); - - Assert.assertEquals((int) REQUEST.charAt(0), firstByte); - - socket.close(); - - Assert.assertEquals("", response.get()); - - mBridge.waitAllConnectionsClosed(); - mBridge.stop(); - } - - @SmallTest - public void testRequestFailure2() throws Exception { - startBridge("jdwp-control"); // Android system socket will reject connection. - - Future<String> response = TestUtils.asyncRequest(CLIENT_SOCKET_NAME, REQUEST); - - Assert.assertEquals("", response.get()); - - mBridge.waitAllConnectionsClosed(); - mBridge.stop(); - } -} diff --git a/components/devtools_bridge/android/javatests/src/org/chromium/components/devtools_bridge/SessionDependencyFactoryTest.java b/components/devtools_bridge/android/javatests/src/org/chromium/components/devtools_bridge/SessionDependencyFactoryTest.java index a7dc15b..90a3a9c 100644 --- a/components/devtools_bridge/android/javatests/src/org/chromium/components/devtools_bridge/SessionDependencyFactoryTest.java +++ b/components/devtools_bridge/android/javatests/src/org/chromium/components/devtools_bridge/SessionDependencyFactoryTest.java @@ -11,8 +11,6 @@ import android.test.suitebuilder.annotation.SmallTest; import junit.framework.Assert; import java.nio.ByteBuffer; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.LinkedBlockingDeque; /** * Tests for {@link SessionDependencyFactory} @@ -22,12 +20,12 @@ public class SessionDependencyFactoryTest extends InstrumentationTestCase { private SessionDependencyFactory mInstance; private AbstractPeerConnection mConnection; - private ObserverMock mObserver; + private PeerConnectionObserverMock mObserver; @Override protected void setUp() throws Exception { super.setUp(); - mObserver = new ObserverMock(); + mObserver = new PeerConnectionObserverMock(); } @SmallTest @@ -111,7 +109,7 @@ public class SessionDependencyFactoryTest extends InstrumentationTestCase { @SmallTest public void testNegotiation() throws Exception { mInstance = newFactory(); - Pipe pipe = new Pipe(mInstance); + DataPipe pipe = new DataPipe(mInstance); pipe.negotiate(); pipe.dispose(); mInstance.dispose(); @@ -120,7 +118,7 @@ public class SessionDependencyFactoryTest extends InstrumentationTestCase { @SmallTest public void testConnection() throws Exception { mInstance = newFactory(); - Pipe pipe = new Pipe(mInstance); + DataPipe pipe = new DataPipe(mInstance); pipe.negotiate(); pipe.awaitConnected(); pipe.dispose(); @@ -133,7 +131,7 @@ public class SessionDependencyFactoryTest extends InstrumentationTestCase { mConnection = newConnection(); AbstractDataChannel channel = mConnection.createDataChannel(DATA_CHANNEL_ID); - channel.registerObserver(new DataChannelObserver()); + channel.registerObserver(new DataChannelObserverMock()); channel.send(ByteBuffer.allocateDirect(1), AbstractDataChannel.MessageType.TEXT); channel.send(ByteBuffer.allocateDirect(1), AbstractDataChannel.MessageType.BINARY); channel.unregisterObserver(); @@ -147,7 +145,7 @@ public class SessionDependencyFactoryTest extends InstrumentationTestCase { @SmallTest public void testDataChannelOpens() throws Exception { mInstance = newFactory(); - Pipe pipe = new Pipe(mInstance); + DataPipe pipe = new DataPipe(mInstance); pipe.registerDatatChannelObservers(); @@ -165,7 +163,7 @@ public class SessionDependencyFactoryTest extends InstrumentationTestCase { @MediumTest public void testPumpData() throws Exception { mInstance = newFactory(); - Pipe pipe = new Pipe(mInstance); + DataPipe pipe = new DataPipe(mInstance); pipe.registerDatatChannelObservers(); pipe.negotiate(); pipe.dataChannelObserver(0).opened.await(); @@ -196,180 +194,7 @@ public class SessionDependencyFactoryTest extends InstrumentationTestCase { return newConnection(mObserver); } - private AbstractPeerConnection newConnection(ObserverMock observer) { + private AbstractPeerConnection newConnection(PeerConnectionObserverMock observer) { return mInstance.createPeerConnection(new RTCConfiguration(), observer); } - - static class Pipe { - final ObserverMock mObserver1 = new ObserverMock(); - final ObserverMock mObserver2 = new ObserverMock(); - - DataChannelObserver mDataChannelObserver1 = new DataChannelObserver(); - DataChannelObserver mDataChannelObserver2 = new DataChannelObserver(); - - final AbstractPeerConnection mConnection1; - final AbstractPeerConnection mConnection2; - - final AbstractDataChannel mDataChannel1; - final AbstractDataChannel mDataChannel2; - - Pipe(SessionDependencyFactory factory) { - RTCConfiguration config = new RTCConfiguration(); - mConnection1 = factory.createPeerConnection(config, mObserver1); - mConnection2 = factory.createPeerConnection(config, mObserver2); - - mObserver1.iceCandidatesSink = mConnection2; - mObserver2.iceCandidatesSink = mConnection1; - - mDataChannel1 = mConnection1.createDataChannel(DATA_CHANNEL_ID); - mDataChannel2 = mConnection2.createDataChannel(DATA_CHANNEL_ID); - } - - void dispose() { - mDataChannel1.dispose(); - mDataChannel2.dispose(); - mConnection1.dispose(); - mConnection2.dispose(); - } - - void negotiate() throws Exception { - mConnection1.createAndSetLocalDescription( - AbstractPeerConnection.SessionDescriptionType.OFFER); - mObserver1.localDescriptionAvailable.await(); - - mConnection2.setRemoteDescription( - AbstractPeerConnection.SessionDescriptionType.OFFER, - mObserver1.localDescription); - mObserver2.remoteDescriptionSet.await(); - - mConnection2.createAndSetLocalDescription( - AbstractPeerConnection.SessionDescriptionType.ANSWER); - mObserver2.localDescriptionAvailable.await(); - - mConnection1.setRemoteDescription( - AbstractPeerConnection.SessionDescriptionType.ANSWER, - mObserver2.localDescription); - mObserver1.remoteDescriptionSet.await(); - } - - void awaitConnected() throws Exception { - mObserver1.connected.await(); - mObserver2.connected.await(); - } - - void send(int channelIndex, String data) { - send(channelIndex, data.getBytes(), AbstractDataChannel.MessageType.TEXT); - } - - void send(int channelIndex, byte[] bytes, AbstractDataChannel.MessageType type) { - ByteBuffer rawMessage = ByteBuffer.allocateDirect(bytes.length); - rawMessage.put(bytes); - rawMessage.limit(rawMessage.position()); - rawMessage.position(0); - dataChannel(channelIndex).send(rawMessage, type); - } - - AbstractDataChannel dataChannel(int channelIndex) { - switch (channelIndex) { - case 0: - return mDataChannel1; - - case 1: - return mDataChannel2; - - default: - throw new ArrayIndexOutOfBoundsException(); - } - } - - DataChannelObserver dataChannelObserver(int channelIndex) { - switch (channelIndex) { - case 0: - return mDataChannelObserver1; - - case 1: - return mDataChannelObserver2; - - default: - throw new ArrayIndexOutOfBoundsException(); - } - } - - void registerDatatChannelObservers() { - mDataChannel1.registerObserver(mDataChannelObserver1); - mDataChannel2.registerObserver(mDataChannelObserver2); - } - - void unregisterDatatChannelObservers() { - mDataChannel1.unregisterObserver(); - mDataChannel2.unregisterObserver(); - } - } - - private static class ObserverMock implements AbstractPeerConnection.Observer { - public AbstractPeerConnection.SessionDescriptionType localDescriptionType; - public String localDescription; - public String failureDescription; - - public final CountDownLatch localDescriptionAvailable = new CountDownLatch(1); - public final CountDownLatch failureAvailable = new CountDownLatch(1); - public final CountDownLatch remoteDescriptionSet = new CountDownLatch(1); - public final CountDownLatch connected = new CountDownLatch(1); - - public AbstractPeerConnection iceCandidatesSink; - - @Override - public void onFailure(String description) { - failureDescription = description; - failureAvailable.countDown(); - } - - @Override - public void onLocalDescriptionCreatedAndSet( - AbstractPeerConnection.SessionDescriptionType type, String description) { - localDescriptionType = type; - localDescription = description; - localDescriptionAvailable.countDown(); - } - - @Override - public void onRemoteDescriptionSet() { - remoteDescriptionSet.countDown(); - } - - @Override - public void onIceCandidate(String iceCandidate) { - if (iceCandidatesSink != null) - iceCandidatesSink.addIceCandidate(iceCandidate); - } - - @Override - public void onIceConnectionChange(boolean connected) { - this.connected.countDown(); - } - } - - private static class DataChannelObserver implements AbstractDataChannel.Observer { - public final CountDownLatch opened = new CountDownLatch(1); - public final CountDownLatch closed = new CountDownLatch(1); - public final LinkedBlockingDeque<byte[]> received = new LinkedBlockingDeque<byte[]>(); - - public void onStateChange(AbstractDataChannel.State state) { - switch (state) { - case OPEN: - opened.countDown(); - break; - - case CLOSED: - closed.countDown(); - break; - } - } - - public void onMessage(ByteBuffer message) { - byte[] bytes = new byte[message.remaining()]; - message.get(bytes); - received.add(bytes); - } - } } diff --git a/components/devtools_bridge/android/javatests/src/org/chromium/components/devtools_bridge/SocketTunnelServerTest.java b/components/devtools_bridge/android/javatests/src/org/chromium/components/devtools_bridge/SocketTunnelServerTest.java index a250009..56fb383 100644 --- a/components/devtools_bridge/android/javatests/src/org/chromium/components/devtools_bridge/SocketTunnelServerTest.java +++ b/components/devtools_bridge/android/javatests/src/org/chromium/components/devtools_bridge/SocketTunnelServerTest.java @@ -8,7 +8,6 @@ import android.net.LocalServerSocket; import android.net.LocalSocket; import android.test.InstrumentationTestCase; import android.test.suitebuilder.annotation.MediumTest; -import android.test.suitebuilder.annotation.SmallTest; import junit.framework.Assert; @@ -23,93 +22,62 @@ public class SocketTunnelServerTest extends InstrumentationTestCase { private static final int CONNECTION_ID = 30; private static final String SOCKET_NAME = "ksdjhflksjhdflk"; - private DataChannelMock mDataChannelMock; - private SocketTunnelServer mServer; + private static final int SERVER_CHANNEL = 1; + private static final int CLIENT_CHANNEL = 0; + + private SessionDependencyFactory mFactory; + private DataPipe mPipe; + private SocketTunnel mServer; private LocalServerSocket mSocket; + private DataChannelObserverMock mObserverMock; @Override public void setUp() throws Exception { super.setUp(); + mFactory = SessionDependencyFactory.newInstance(); + mPipe = new DataPipe(mFactory); + mServer = mFactory.newSocketTunnelServer(SOCKET_NAME); + mServer.bind(mPipe.dataChannel(SERVER_CHANNEL)); mSocket = new LocalServerSocket(SOCKET_NAME); + mObserverMock = new DataChannelObserverMock(); + mPipe.dataChannel(CLIENT_CHANNEL).registerObserver(mObserverMock); + mPipe.negotiate(); + mPipe.awaitConnected(); } @Override public void tearDown() throws Exception { - mSocket.close(); - if (mServer != null) destroyServer(); + mPipe.dataChannel(CLIENT_CHANNEL).unregisterObserver(); + mServer.unbind(); + mPipe.dispose(); + mFactory.dispose(); super.tearDown(); } - private void createServer() { - createServer(new DataChannelMock()); - } - - private void createServer(DataChannelMock dataChannel) { - mDataChannelMock = dataChannel; - mServer = new SocketTunnelServer(SOCKET_NAME); - mServer.bind(mDataChannelMock); - } - - private void destroyServer() { - mServer.unbind().dispose(); - mServer = null; - } - - @SmallTest - public void testOpenDataChannel() { - createServer(); - mDataChannelMock.open(); - } - - @SmallTest - public void testDecodeControlPacket() { - createServer(); - ByteBuffer packet = buildControlPacket(CONNECTION_ID, SocketTunnelBase.SERVER_OPEN_ACK); - - PacketDecoder decoder = PacketDecoder.decode(packet); - Assert.assertTrue(decoder.isControlPacket()); - Assert.assertEquals(CONNECTION_ID, decoder.connectionId()); - Assert.assertEquals(SocketTunnelBase.SERVER_OPEN_ACK, decoder.opCode()); + private void sendPacket(ByteBuffer packet) { + packet.position(0); + mPipe.send(CLIENT_CHANNEL, packet); } - @SmallTest + @MediumTest public void testConnectToSocket() throws IOException { - createServer(); LocalSocket socket = connectToSocket(1); - Assert.assertTrue(mServer.hasConnections()); socket.close(); } private LocalSocket connectToSocket(int connectionId) throws IOException { - mDataChannelMock.notifyMessage( - buildControlPacket(connectionId, SocketTunnelBase.CLIENT_OPEN)); + sendPacket(SocketTunnelBase.buildControlPacket( + connectionId, SocketTunnelBase.CLIENT_OPEN)); return mSocket.accept(); } private void sendClose(int connectionId) { - mDataChannelMock.notifyMessage( - buildControlPacket(connectionId, SocketTunnelBase.CLIENT_CLOSE)); - } - - private ByteBuffer buildControlPacket(int connectionId, byte opCode) { - ByteBuffer packet = SocketTunnelBase.buildControlPacket(connectionId, opCode); - packet.limit(packet.position()); - packet.position(0); - Assert.assertTrue(packet.remaining() > 0); - return packet; - } - - private ByteBuffer buildDataPacket(int connectionId, byte[] data) { - ByteBuffer packet = SocketTunnelBase.buildDataPacket(connectionId, data, data.length); - packet.limit(packet.position()); - packet.position(0); - Assert.assertTrue(packet.remaining() > 0); - return packet; + sendPacket(SocketTunnelBase.buildControlPacket( + connectionId, SocketTunnelBase.CLIENT_CLOSE)); } - @SmallTest + @MediumTest public void testReceiveOpenAcknowledgement() throws IOException, InterruptedException { - createServer(); LocalSocket socket = connectToSocket(CONNECTION_ID); receiveOpenAck(CONNECTION_ID); @@ -117,8 +85,16 @@ public class SocketTunnelServerTest extends InstrumentationTestCase { socket.close(); } + private PacketDecoder receivePacket() throws InterruptedException { + byte[] bytes = mObserverMock.received.take(); + ByteBuffer buffer = ByteBuffer.allocate(bytes.length); + buffer.put(bytes); + buffer.position(0); + return PacketDecoder.decode(buffer); + } + private PacketDecoder receiveControlPacket(int connectionId) throws InterruptedException { - PacketDecoder decoder = PacketDecoder.decode(mDataChannelMock.receive()); + PacketDecoder decoder = receivePacket(); Assert.assertTrue(decoder.isControlPacket()); Assert.assertEquals(connectionId, decoder.connectionId()); return decoder; @@ -134,26 +110,20 @@ public class SocketTunnelServerTest extends InstrumentationTestCase { Assert.assertEquals(SocketTunnelBase.SERVER_CLOSE, decoder.opCode()); } - @SmallTest + @MediumTest public void testClosingSocket() throws IOException, InterruptedException { - createServer(); LocalSocket socket = connectToSocket(CONNECTION_ID); receiveOpenAck(CONNECTION_ID); - socket.shutdownOutput(); + socket.close(); - PacketDecoder decoder = PacketDecoder.decode(mDataChannelMock.receive()); + PacketDecoder decoder = receiveControlPacket(CONNECTION_ID); - Assert.assertTrue(decoder.isControlPacket()); Assert.assertEquals(SocketTunnelBase.SERVER_CLOSE, decoder.opCode()); - Assert.assertEquals(CONNECTION_ID, decoder.connectionId()); - - socket.close(); } - @SmallTest + @MediumTest public void testReadData() throws IOException, InterruptedException { - createServer(); LocalSocket socket = connectToSocket(CONNECTION_ID); receiveOpenAck(CONNECTION_ID); @@ -171,7 +141,7 @@ public class SocketTunnelServerTest extends InstrumentationTestCase { ByteBuffer result = ByteBuffer.allocate(length); while (true) { - PacketDecoder decoder = PacketDecoder.decode(mDataChannelMock.receive()); + PacketDecoder decoder = receivePacket(); if (decoder.isDataPacket()) { Assert.assertEquals(connectionId, decoder.connectionId()); result.put(decoder.data()); @@ -196,9 +166,8 @@ public class SocketTunnelServerTest extends InstrumentationTestCase { private static final int[] CHUNK_SIZES = new int[] { 0, 1, 5, 100, 1000, SocketTunnelBase.READING_BUFFER_SIZE * 2 }; - @SmallTest + @MediumTest public void testReadLongDataChunk() throws IOException, InterruptedException { - createServer(); LocalSocket socket = connectToSocket(CONNECTION_ID); receiveOpenAck(CONNECTION_ID); @@ -224,13 +193,11 @@ public class SocketTunnelServerTest extends InstrumentationTestCase { Assert.assertEquals(sentData, readData); } - @SmallTest + @MediumTest public void testReuseConnectionId() throws IOException, InterruptedException { - createServer(); LocalSocket socket = connectToSocket(CONNECTION_ID); receiveOpenAck(CONNECTION_ID); - socket.shutdownOutput(); socket.close(); receiveClose(CONNECTION_ID); sendClose(CONNECTION_ID); @@ -242,13 +209,12 @@ public class SocketTunnelServerTest extends InstrumentationTestCase { private static final byte[] SAMPLE = "Sample".getBytes(); - @SmallTest + @MediumTest public void testWriteData() throws IOException, InterruptedException { - createServer(); LocalSocket socket = connectToSocket(CONNECTION_ID); receiveOpenAck(CONNECTION_ID); - mDataChannelMock.notifyMessage(buildDataPacket(CONNECTION_ID, SAMPLE)); + sendPacket(SocketTunnelBase.buildDataPacket(CONNECTION_ID, SAMPLE, SAMPLE.length)); byte[] result = new byte[SAMPLE.length]; int read = 0; @@ -262,44 +228,4 @@ public class SocketTunnelServerTest extends InstrumentationTestCase { socket.close(); } - - private enum TestStates { - INITIAL, SENDING, CLOSING, MAY_FINISH_SENDING, SENT, DONE - } - - @MediumTest - public void testStopWhileSendingData() throws IOException { - - final TestUtils.StateBarrier<TestStates> barrier = - new TestUtils.StateBarrier<TestStates>(TestStates.INITIAL); - - createServer(new DataChannelMock() { - @Override - public void send(ByteBuffer message, AbstractDataChannel.MessageType type) { - barrier.advance(TestStates.INITIAL, TestStates.SENDING); - barrier.advance(TestStates.MAY_FINISH_SENDING, TestStates.SENT); - } - }); - - LocalSocket socket = connectToSocket(CONNECTION_ID); - barrier.advance(TestStates.SENDING, TestStates.CLOSING); - socket.close(); - - new Thread() { - @Override - public void run() { - try { - Thread.sleep(100); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - - barrier.advance(TestStates.CLOSING, TestStates.MAY_FINISH_SENDING); - } - }.start(); - - destroyServer(); - - barrier.advance(TestStates.SENT, TestStates.DONE); - } } diff --git a/components/devtools_bridge/android/javatests/src/org/chromium/components/devtools_bridge/tests/DebugActivity.java b/components/devtools_bridge/android/javatests/src/org/chromium/components/devtools_bridge/tests/DebugActivity.java index e5e0797..97e4f88 100644 --- a/components/devtools_bridge/android/javatests/src/org/chromium/components/devtools_bridge/tests/DebugActivity.java +++ b/components/devtools_bridge/android/javatests/src/org/chromium/components/devtools_bridge/tests/DebugActivity.java @@ -48,7 +48,6 @@ public class DebugActivity extends Activity { textView.setMovementMethod(LinkMovementMethod.getInstance()); mLayout.addView(textView); - addActionButton("Start LocalTunnelBridge", DebugService.START_TUNNEL_BRIDGE_ACTION); addActionButton("Start LocalSessionBridge", DebugService.START_SESSION_BRIDGE_ACTION); addActionButton("Start hosted DevToolsBridgeServer", DebugService.START_SERVER_ACTION); addActionButton("Stop", DebugService.STOP_ACTION); diff --git a/components/devtools_bridge/android/javatests/src/org/chromium/components/devtools_bridge/tests/DebugService.java b/components/devtools_bridge/android/javatests/src/org/chromium/components/devtools_bridge/tests/DebugService.java index 3974876..b009555 100644 --- a/components/devtools_bridge/android/javatests/src/org/chromium/components/devtools_bridge/tests/DebugService.java +++ b/components/devtools_bridge/android/javatests/src/org/chromium/components/devtools_bridge/tests/DebugService.java @@ -16,7 +16,6 @@ import android.widget.Toast; import org.chromium.components.devtools_bridge.DevToolsBridgeServerSandbox; import org.chromium.components.devtools_bridge.GCDClientSessionTestingHost; import org.chromium.components.devtools_bridge.LocalSessionBridge; -import org.chromium.components.devtools_bridge.LocalTunnelBridge; import java.io.IOException; @@ -24,7 +23,6 @@ import java.io.IOException; * Service for testing devtools bridge. */ public class DebugService extends Service { - public static final String START_TUNNEL_BRIDGE_ACTION = "action.START_TUNNEL_BRIDGE"; public static final String START_SESSION_BRIDGE_ACTION = "action.START_SESSION_BRIDGE"; public static final String START_SERVER_ACTION = "action.START_SERVER"; public static final String START_GCD_CLIENT_ACTION = "action.START_GCD_CLIENT"; @@ -52,35 +50,6 @@ public class DebugService extends Service { return "webview_devtools_remote_" + Integer.valueOf(Process.myPid()); } - private class LocalTunnelBridgeController implements Controller { - private LocalTunnelBridge mBridge; - - @Override - public void create() throws IOException { - mBridge = new LocalTunnelBridge(replicatingSocketName(), exposingSocketName()); - } - - @Override - public void start() throws Exception { - mBridge.start(); - } - - @Override - public void stop() { - mBridge.stop(); - } - - @Override - public void dispose() { - mBridge.dispose(); - } - - @Override - public String toString() { - return "LocalTunnelBridge"; - } - } - private class LocalSessionBridgeController implements Controller { private LocalSessionBridge mBridge; @@ -191,9 +160,7 @@ public class DebugService extends Service { if (intent == null) return START_NOT_STICKY; String action = intent.getAction(); - if (START_TUNNEL_BRIDGE_ACTION.equals(action)) { - return start(new LocalTunnelBridgeController()); - } else if (START_SESSION_BRIDGE_ACTION.equals(action)) { + if (START_SESSION_BRIDGE_ACTION.equals(action)) { return start(new LocalSessionBridgeController()); } else if (START_SERVER_ACTION.equals(action)) { return start(new DevToolsBridgeServerSandboxController()); diff --git a/components/devtools_bridge/android/session_dependency_factory_android.cc b/components/devtools_bridge/android/session_dependency_factory_android.cc index 7b72ac9..168f2fc 100644 --- a/components/devtools_bridge/android/session_dependency_factory_android.cc +++ b/components/devtools_bridge/android/session_dependency_factory_android.cc @@ -10,6 +10,7 @@ #include "components/devtools_bridge/abstract_data_channel.h" #include "components/devtools_bridge/abstract_peer_connection.h" #include "components/devtools_bridge/rtc_configuration.h" +#include "components/devtools_bridge/socket_tunnel_server.h" #include "jni/SessionDependencyFactoryNative_jni.h" using base::android::AttachCurrentThread; @@ -156,6 +157,16 @@ SessionDependencyFactoryAndroid::CreatePeerConnection( return impl_->CreatePeerConnection(config.Pass(), delegate.Pass()); } +scoped_refptr<base::TaskRunner> +SessionDependencyFactoryAndroid::signaling_thread_task_runner() { + return impl_->signaling_thread_task_runner(); +} + +scoped_refptr<base::TaskRunner> +SessionDependencyFactoryAndroid::io_thread_task_runner() { + return impl_->io_thread_task_runner(); +} + // JNI generated methods static jlong CreateFactory(JNIEnv* env, jclass jcaller) { @@ -274,5 +285,20 @@ static void CloseDataChannel(JNIEnv* env, jclass jcaller, jlong channel_ptr) { reinterpret_cast<AbstractDataChannel*>(channel_ptr)->Close(); } +static jlong CreateSocketTunnelServer( + JNIEnv* env, jclass jcaller, jlong factory_ptr, jlong channel_ptr, + jstring socket_name) { + return reinterpret_cast<jlong>( + new SocketTunnelServer( + reinterpret_cast<SessionDependencyFactory*>(factory_ptr), + reinterpret_cast<AbstractDataChannel*>(channel_ptr), + ConvertJavaStringToUTF8(env, socket_name))); +} + +static void DestroySocketTunnelServer( + JNIEnv* env, jclass jcaller, jlong tunnel_ptr) { + delete reinterpret_cast<SocketTunnelServer*>(tunnel_ptr); +} + } // namespace android } // namespace devtools_bridge diff --git a/components/devtools_bridge/android/session_dependency_factory_android.h b/components/devtools_bridge/android/session_dependency_factory_android.h index 0961556..62ecae3 100644 --- a/components/devtools_bridge/android/session_dependency_factory_android.h +++ b/components/devtools_bridge/android/session_dependency_factory_android.h @@ -22,6 +22,10 @@ class SessionDependencyFactoryAndroid : public SessionDependencyFactory { scoped_ptr<RTCConfiguration> config, scoped_ptr<AbstractPeerConnection::Delegate> delegate) override; + virtual scoped_refptr<base::TaskRunner> signaling_thread_task_runner() + override; + virtual scoped_refptr<base::TaskRunner> io_thread_task_runner() override; + private: const scoped_ptr<SessionDependencyFactory> impl_; diff --git a/components/devtools_bridge/session_dependency_factory.cc b/components/devtools_bridge/session_dependency_factory.cc index 27a893d..c7d4a98 100644 --- a/components/devtools_bridge/session_dependency_factory.cc +++ b/components/devtools_bridge/session_dependency_factory.cc @@ -4,12 +4,18 @@ #include "components/devtools_bridge/session_dependency_factory.h" +#include "base/bind.h" +#include "base/location.h" +#include "base/task_runner.h" +#include "base/threading/thread.h" #include "components/devtools_bridge/abstract_data_channel.h" #include "components/devtools_bridge/abstract_peer_connection.h" #include "components/devtools_bridge/rtc_configuration.h" #include "third_party/libjingle/source/talk/app/webrtc/mediaconstraintsinterface.h" #include "third_party/libjingle/source/talk/app/webrtc/peerconnectioninterface.h" #include "third_party/webrtc/base/bind.h" +#include "third_party/webrtc/base/messagehandler.h" +#include "third_party/webrtc/base/messagequeue.h" #include "third_party/webrtc/base/ssladapter.h" #include "third_party/webrtc/base/thread.h" @@ -68,6 +74,52 @@ class MediaConstraints Constraints optional_; }; +/** + * Posts tasks on signaling thread. If stopped (when SesseionDependencyFactry + * is destroying) ignores posted tasks. + */ +class SignalingThreadTaskRunner : public base::TaskRunner, + private rtc::MessageHandler { + public: + explicit SignalingThreadTaskRunner(rtc::Thread* thread) : thread_(thread) {} + + bool PostDelayedTask(const tracked_objects::Location& from_here, + const base::Closure& task, + base::TimeDelta delay) override { + DCHECK(delay.ToInternalValue() == 0); + + rtc::CritScope scope(&critical_section_); + + if (thread_) + thread_->Send(this, 0, new Task(task)); + + return true; + } + + bool RunsTasksOnCurrentThread() const override { + rtc::CritScope scope(&critical_section_); + + return thread_ != NULL && thread_->IsCurrent(); + } + + void Stop() { + rtc::CritScope scope(&critical_section_); + thread_ = NULL; + } + + private: + typedef rtc::TypedMessageData<base::Closure> Task; + + ~SignalingThreadTaskRunner() override {} + + void OnMessage(rtc::Message* msg) override { + static_cast<Task*>(msg->pdata)->data().Run(); + } + + mutable rtc::CriticalSection critical_section_; + rtc::Thread* thread_; // Guarded by |critical_section_|. +}; + class DataChannelObserverImpl : public webrtc::DataChannelObserver { public: DataChannelObserverImpl( @@ -104,15 +156,78 @@ class DataChannelObserverImpl : public webrtc::DataChannelObserver { bool open_; }; +/** + * Thread-safe view on AbstractDataChannel. + */ +class DataChannelProxyImpl : public AbstractDataChannel::Proxy { + public: + DataChannelProxyImpl( + SessionDependencyFactory* factory, + rtc::scoped_refptr<webrtc::DataChannelInterface> data_channel) + : data_channel_(data_channel), + signaling_thread_task_runner_( + factory->signaling_thread_task_runner()) { + } + + void StopOnSignalingThread() { + data_channel_ = NULL; + } + + virtual void SendBinaryMessage(const void* data, size_t length) override { + auto buffer = make_scoped_ptr(new webrtc::DataBuffer(rtc::Buffer(), true)); + buffer->data.SetData(data, length); + + signaling_thread_task_runner_->PostTask( + FROM_HERE, base::Bind( + &DataChannelProxyImpl::SendMessageOnSignalingThread, + this, + base::Passed(&buffer))); + } + + virtual void Close() override { + signaling_thread_task_runner_->PostTask( + FROM_HERE, base::Bind(&DataChannelProxyImpl::CloseOnSignalingThread, + this)); + } + + private: + + ~DataChannelProxyImpl() override {} + + void SendMessageOnSignalingThread(scoped_ptr<webrtc::DataBuffer> message) { + if (data_channel_ != NULL) + data_channel_->Send(*message); + } + + void CloseOnSignalingThread() { + if (data_channel_ != NULL) + data_channel_->Close(); + } + + // Accessed on signaling thread. + rtc::scoped_refptr<webrtc::DataChannelInterface> data_channel_; + + const scoped_refptr<base::TaskRunner> signaling_thread_task_runner_; +}; + class DataChannelImpl : public AbstractDataChannel { public: - explicit DataChannelImpl( + DataChannelImpl( + SessionDependencyFactory* factory, rtc::Thread* const signaling_thread, rtc::scoped_refptr<webrtc::DataChannelInterface> impl) - : signaling_thread_(signaling_thread), + : factory_(factory), + signaling_thread_(signaling_thread), impl_(impl) { } + ~DataChannelImpl() { + if (proxy_.get()) { + signaling_thread_->Invoke<void>(rtc::Bind( + &DataChannelProxyImpl::StopOnSignalingThread, proxy_.get())); + } + } + virtual void RegisterObserver(scoped_ptr<Observer> observer) override { observer_.reset(new DataChannelObserverImpl(impl_.get(), observer.Pass())); signaling_thread_->Invoke<void>(rtc::Bind( @@ -141,6 +256,12 @@ class DataChannelImpl : public AbstractDataChannel { impl_->Close(); } + scoped_refptr<Proxy> proxy() override { + if (!proxy_.get()) + proxy_ = new DataChannelProxyImpl(factory_, impl_); + return proxy_; + } + private: void RegisterObserverOnSignalingThread() { // State initialization and observer registration happen atomically @@ -149,6 +270,8 @@ class DataChannelImpl : public AbstractDataChannel { impl_->RegisterObserver(observer_.get()); } + SessionDependencyFactory* const factory_; + scoped_refptr<DataChannelProxyImpl> proxy_; rtc::Thread* const signaling_thread_; scoped_ptr<DataChannelObserverImpl> observer_; const rtc::scoped_refptr<webrtc::DataChannelInterface> impl_; @@ -320,11 +443,13 @@ class SetRemoteDescriptionHandler class PeerConnectionImpl : public AbstractPeerConnection { public: PeerConnectionImpl( + SessionDependencyFactory* const factory, rtc::Thread* signaling_thread, rtc::scoped_refptr<webrtc::PeerConnectionInterface> connection, scoped_ptr<PeerConnectionObserverImpl> observer, scoped_ptr<AbstractPeerConnection::Delegate> delegate) - : holder_(new rtc::RefCountedObject<PeerConnectionHolder>( + : factory_(factory), + holder_(new rtc::RefCountedObject<PeerConnectionHolder>( signaling_thread, connection.get(), delegate.get())), signaling_thread_(signaling_thread), connection_(connection), @@ -394,6 +519,7 @@ class PeerConnectionImpl : public AbstractPeerConnection { init.id = channelId; return make_scoped_ptr(new DataChannelImpl( + factory_, signaling_thread_, connection_->CreateDataChannel("", &init))); } @@ -414,6 +540,7 @@ class PeerConnectionImpl : public AbstractPeerConnection { // TODO(serya): Send on signaling thread. } + SessionDependencyFactory* const factory_; const rtc::scoped_refptr<PeerConnectionHolder> holder_; rtc::Thread* const signaling_thread_; rtc::scoped_refptr<webrtc::PeerConnectionInterface> connection_; @@ -436,6 +563,9 @@ class SessionDependencyFactoryImpl : public SessionDependencyFactory { } virtual ~SessionDependencyFactoryImpl() { + if (signaling_thread_task_runner_.get()) + signaling_thread_task_runner_->Stop(); + signaling_thread_.Invoke<void>(rtc::Bind( &SessionDependencyFactoryImpl::DisposeOnSignalingThread, this)); } @@ -454,18 +584,38 @@ class SessionDependencyFactoryImpl : public SessionDependencyFactory { config->impl(), &constraints, NULL, NULL, observer.get()); return make_scoped_ptr(new PeerConnectionImpl( - &signaling_thread_, connection, observer.Pass(), delegate.Pass())); + this, &signaling_thread_, connection, observer.Pass(), + delegate.Pass())); + } + + scoped_refptr<base::TaskRunner> signaling_thread_task_runner() override { + if (!signaling_thread_task_runner_.get()) { + signaling_thread_task_runner_ = + new SignalingThreadTaskRunner(&signaling_thread_); + } + return signaling_thread_task_runner_; + } + + scoped_refptr<base::TaskRunner> io_thread_task_runner() override { + if (!io_thread_.get()) { + io_thread_.reset(new base::Thread("devtools bridge IO thread")); + base::Thread::Options options; + options.message_loop_type = base::MessageLoop::TYPE_IO; + CHECK(io_thread_->StartWithOptions(options)); + } + return io_thread_->task_runner(); } private: void DisposeOnSignalingThread() { DCHECK(signaling_thread_.IsCurrent()); CheckedRelease(&factory_); - if (!cleanup_on_signaling_thread_.is_null()) { + if (!cleanup_on_signaling_thread_.is_null()) cleanup_on_signaling_thread_.Run(); - } } + scoped_ptr<base::Thread> io_thread_; + scoped_refptr<SignalingThreadTaskRunner> signaling_thread_task_runner_; base::Closure cleanup_on_signaling_thread_; rtc::Thread signaling_thread_; rtc::Thread worker_thread_; diff --git a/components/devtools_bridge/session_dependency_factory.h b/components/devtools_bridge/session_dependency_factory.h index 96f49bb..23d7b03 100644 --- a/components/devtools_bridge/session_dependency_factory.h +++ b/components/devtools_bridge/session_dependency_factory.h @@ -12,6 +12,10 @@ #include "components/devtools_bridge/abstract_peer_connection.h" #include "components/devtools_bridge/rtc_configuration.h" +namespace base { +class TaskRunner; +} + namespace devtools_bridge { /** @@ -33,6 +37,9 @@ class SessionDependencyFactory { scoped_ptr<RTCConfiguration> config, scoped_ptr<AbstractPeerConnection::Delegate> delegate) = 0; + virtual scoped_refptr<base::TaskRunner> signaling_thread_task_runner() = 0; + virtual scoped_refptr<base::TaskRunner> io_thread_task_runner() = 0; + private: DISALLOW_COPY_AND_ASSIGN(SessionDependencyFactory); }; diff --git a/components/devtools_bridge/socket_tunnel_connection.cc b/components/devtools_bridge/socket_tunnel_connection.cc new file mode 100644 index 0000000..da9def2 --- /dev/null +++ b/components/devtools_bridge/socket_tunnel_connection.cc @@ -0,0 +1,96 @@ +// Copyright 2014 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "components/devtools_bridge/socket_tunnel_connection.h" + +#include <stdlib.h> + +#include "net/base/io_buffer.h" +#include "net/base/net_errors.h" +#include "net/socket/stream_socket.h" + +namespace devtools_bridge { + +SocketTunnelConnection::SocketTunnelConnection(int index) : index_(index) { +} + +SocketTunnelConnection::~SocketTunnelConnection() { +} + +void SocketTunnelConnection::Write(scoped_refptr<net::IOBufferWithSize> chunk) { + // TODO(serya): While it is unlikely (socket normally much faster than + // data channel) we should disconnect if too much data buffered. + buffer_.push_back(); + if (buffer_.size() == 1) { + current_ = new net::DrainableIOBuffer(chunk.get(), chunk->size()); + WriteCurrent(); + } +} + +void SocketTunnelConnection::BuildControlPacket(char* buffer, + int op_code) { + COMPILE_ASSERT(kControlPacketSizeBytes == 3, + unexpected_control_packet_size); + buffer[0] = kControlConnectionId; + buffer[1] = op_code; + buffer[2] = index_ + kMinConnectionId; +} + +void SocketTunnelConnection::WriteCurrent() { + while (true) { + while(current_->BytesRemaining() > 0) { + int result = socket()->Write(current_.get(), current_->BytesRemaining(), + base::Bind(&SocketTunnelConnection::OnWriteComplete, + base::Unretained(this))); + if (result > 0) + current_->DidConsume(result); + } + current_ = NULL; + + buffer_.pop_front(); + if (buffer_.empty()) + return; // Stop writing. + + net::IOBufferWithSize* chunk = buffer_.front().get(); + current_ = new net::DrainableIOBuffer(chunk, chunk->size()); + } +} + +void SocketTunnelConnection::OnWriteComplete(int result) { + if (result > 0) { + current_->DidConsume(result); + WriteCurrent(); + } +} + +void SocketTunnelConnection::ReadNextChunk() { + if (!read_buffer_.get()) { + read_buffer_ = new net::GrowableIOBuffer(); + read_buffer_->SetCapacity(kMaxPacketSizeBytes); + } + // Header of the data packet. + *read_buffer_->StartOfBuffer() = index_ + kMinConnectionId; + read_buffer_->set_offset(1); + + int result = socket()->Read( + read_buffer_.get(), + read_buffer_->RemainingCapacity(), + base::Bind(&SocketTunnelConnection::OnReadComplete, + base::Unretained(this))); + if (result == net::ERR_IO_PENDING) + return; + else + OnReadComplete(result); +} + +void SocketTunnelConnection::OnReadComplete(int result) { + if (result > 0) { + OnDataPacketRead(read_buffer_->StartOfBuffer(), + read_buffer_->offset() + result); + } else { + OnReadError(result); + } +} + +} // namespace devtools_bridge diff --git a/components/devtools_bridge/socket_tunnel_connection.h b/components/devtools_bridge/socket_tunnel_connection.h new file mode 100644 index 0000000..c663892 --- /dev/null +++ b/components/devtools_bridge/socket_tunnel_connection.h @@ -0,0 +1,99 @@ +// Copyright 2014 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef COMPONENTS_DEVTOOLS_BRIDGE_SOCKET_TUNNEL_CONNECTION_H_ +#define COMPONENTS_DEVTOOLS_BRIDGE_SOCKET_TUNNEL_CONNECTION_H_ + +#include <deque> +#include <string> + +#include "base/memory/ref_counted.h" +#include "base/memory/scoped_ptr.h" + +namespace net { +class DrainableIOBuffer; +class GrowableIOBuffer; +class IOBufferWithSize; +class StreamSocket; +} + +namespace devtools_bridge { + +/** + * Abstract base class for SocketTunnelServer/Client connection. + * + * Connection binds a pair of net::StreamSocket (or alike) through + * a data channel. SocketTunnel may handle up to kMaxConnectionCount + * simultaneous connection (DevTools can keep ~10 connection; + * other connections hang in unopened state; additional connections + * could help to deal with data channel latency). + * + * Client should create net::StreamListenSocket (or logical equivalent) + * and listen for incoming connection. When one comes it sends CLIENT_OPEN + * packet to the server. + * + * Server transforms client's packet to calls of net::StreamSocket. On + * CLIENT_OPEN it creates a socket and connects. If connection succeeds + * it sends back SERVER_OPEN_ACK. If it fails it sends SERVER_CLOSE. + * + * After SERVER_OPEN_ACK server may send SERVER_CLOSE any time (if the socket + * it connects to has closed on another side). If client closes the connection + * sending CLIENT_CLOSE server acknowledges it by sending SERVER_CLOSE. + * Client may reuse connection ID once it received SERVER_CLOSE (because + * data channel is ordered and reliable). + */ +class SocketTunnelConnection { + public: + enum ClientOpCode { + CLIENT_OPEN = 0, + CLIENT_CLOSE = 1 + }; + + enum ServerOpCode { + SERVER_OPEN_ACK = 0, + SERVER_CLOSE = 1 + }; + + static const int kMaxConnectionCount = 64; + + static const int kMaxPacketSizeBytes = 1024 * 4; + static const int kControlPacketSizeBytes = 3; + + static const int kControlConnectionId = 0; + + static const int kMinConnectionId = 1; + static const int kMaxConnectionId = + kMinConnectionId + kMaxConnectionCount - 1; + + void Write(scoped_refptr<net::IOBufferWithSize> chunk); + void ReadNextChunk(); + + protected: + SocketTunnelConnection(int index); + ~SocketTunnelConnection(); + + const int index_; + + // |buffer| length must be kControlPacketSizeBytes. + void BuildControlPacket(char* buffer, int op_code); + + virtual net::StreamSocket* socket() = 0; + virtual void OnDataPacketRead(const void* data, size_t length) = 0; + virtual void OnReadError(int error) = 0; + + private: + void WriteCurrent(); + void OnWriteComplete(int result); + void OnReadComplete(int result); + + std::deque<scoped_refptr<net::IOBufferWithSize> > buffer_; + scoped_refptr<net::DrainableIOBuffer> current_; + scoped_refptr<net::GrowableIOBuffer> read_buffer_; + + DISALLOW_COPY_AND_ASSIGN(SocketTunnelConnection); +}; + +} // namespace devtools_bridge + +#endif // COMPONENTS_DEVTOOLS_BRIDGE_SOCKET_TUNNEL_CONNECTION_H_ diff --git a/components/devtools_bridge/socket_tunnel_packet_handler.cc b/components/devtools_bridge/socket_tunnel_packet_handler.cc new file mode 100644 index 0000000..f12e583 --- /dev/null +++ b/components/devtools_bridge/socket_tunnel_packet_handler.cc @@ -0,0 +1,64 @@ +// Copyright 2014 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "components/devtools_bridge/socket_tunnel_packet_handler.h" + +#include <stdlib.h> + +#include "components/devtools_bridge/socket_tunnel_connection.h" +#include "net/base/io_buffer.h" +#include "net/base/net_errors.h" + +namespace devtools_bridge { + +static const int kControlConnectionId = + SocketTunnelConnection::kControlConnectionId; +static const int kMinConnectionId = SocketTunnelConnection::kMinConnectionId; +static const int kMaxConnectionId = SocketTunnelConnection::kMaxConnectionId; +static const int kControlPacketSizeBytes = + SocketTunnelConnection::kControlPacketSizeBytes; + +void SocketTunnelPacketHandler::DecodePacket(const void* data, size_t length) { + const unsigned char* bytes = static_cast<const unsigned char*>(data); + if (length == 0) { + DLOG(ERROR) << "Empty packet"; + HandleProtocolError(); + return; + } + int connection_id = bytes[0]; + if (connection_id != kControlConnectionId) { + if (connection_id < kMinConnectionId || + connection_id > kMaxConnectionId) { + DLOG(ERROR) << "Invalid connection ID: " << connection_id; + HandleProtocolError(); + return; + } + + int connection_index = connection_id - kMinConnectionId; + scoped_refptr<net::IOBufferWithSize> packet( + new net::IOBufferWithSize(length - 1)); + memcpy(packet->data(), bytes + 1, length - 1); + HandleDataPacket(connection_index, packet); + } else if (length >= kControlPacketSizeBytes) { + COMPILE_ASSERT(kControlPacketSizeBytes == 3, + unexpected_control_packet_size); + + int op_code = bytes[1]; + connection_id = bytes[2]; + if (connection_id < kMinConnectionId || + connection_id > kMaxConnectionId) { + DLOG(ERROR) << "Invalid connection ID: " << connection_id; + HandleProtocolError(); + return; + } + int connection_index = connection_id - kMinConnectionId; + HandleControlPacket(connection_index, op_code); + } else { + DLOG(ERROR) << "Invalid packet"; + HandleProtocolError(); + return; + } +} + +} // namespace devtools_bridge diff --git a/components/devtools_bridge/socket_tunnel_packet_handler.h b/components/devtools_bridge/socket_tunnel_packet_handler.h new file mode 100644 index 0000000..dc64f30 --- /dev/null +++ b/components/devtools_bridge/socket_tunnel_packet_handler.h @@ -0,0 +1,37 @@ +// Copyright 2014 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef COMPONENTS_DEVTOOLS_BRIDGE_SOCKET_TUNNEL_PACKET_HANDLER_H_ +#define COMPONENTS_DEVTOOLS_BRIDGE_SOCKET_TUNNEL_PACKET_HANDLER_H_ + +#include "base/memory/ref_counted.h" + +namespace net { +class IOBufferWithSize; +} + +namespace devtools_bridge { + +/** + * Abstract base class for handling SocketTunnelServer/Client messages. + */ +class SocketTunnelPacketHandler { + public: + virtual void HandleControlPacket(int connection_index, int op_code) = 0; + virtual void HandleDataPacket( + int connection_index, scoped_refptr<net::IOBufferWithSize> data) = 0; + virtual void HandleProtocolError() = 0; + + void DecodePacket(const void* data, size_t length); + + protected: + SocketTunnelPacketHandler() {} + + private: + DISALLOW_COPY_AND_ASSIGN(SocketTunnelPacketHandler); +}; + +} // namespace devtools_bridge + +#endif // COMPONENTS_DEVTOOLS_BRIDGE_SOCKET_TUNNEL_PACKET_HANDLER_H_ diff --git a/components/devtools_bridge/socket_tunnel_server.cc b/components/devtools_bridge/socket_tunnel_server.cc new file mode 100644 index 0000000..1297916 --- /dev/null +++ b/components/devtools_bridge/socket_tunnel_server.cc @@ -0,0 +1,266 @@ +// Copyright 2014 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "components/devtools_bridge/socket_tunnel_server.h" + +#include "base/bind.h" +#include "base/location.h" +#include "components/devtools_bridge/abstract_data_channel.h" +#include "components/devtools_bridge/session_dependency_factory.h" +#include "components/devtools_bridge/socket_tunnel_connection.h" +#include "components/devtools_bridge/socket_tunnel_packet_handler.h" +#include "net/base/io_buffer.h" +#include "net/base/net_errors.h" +#include "net/socket/unix_domain_client_socket_posix.h" + +namespace devtools_bridge { + +class SocketTunnelServer::Connection : public SocketTunnelConnection { + public: + class Delegate { + public: + virtual void RemoveConnection(int index) = 0; + virtual void SendPacket( + const void* data, size_t length) = 0; + }; + + Connection(Delegate* delegate, int index, const std::string& socket_name) + : SocketTunnelConnection(index), + delegate_(delegate), + socket_(socket_name, true) { + } + + void Connect() { + int result = socket()->Connect(base::Bind( + &Connection::OnConnectionComplete, base::Unretained(this))); + if (result != net::ERR_IO_PENDING) + OnConnectionComplete(result); + } + + void ClosedByClient() { + if (socket()->IsConnected()) { + socket()->Disconnect(); + SendControlPacket(SERVER_CLOSE); + } + delegate_->RemoveConnection(index_); + } + + protected: + net::StreamSocket* socket() override { + return &socket_; + } + + void OnDataPacketRead(const void* data, size_t length) override { + delegate_->SendPacket(data, length); + ReadNextChunk(); + } + + virtual void OnReadError(int error) override { + socket()->Disconnect(); + SendControlPacket(SERVER_CLOSE); + delegate_->RemoveConnection(index_); + delegate_ = NULL; + } + + private: + void OnConnectionComplete(int result) { + if (result == net::OK) { + SendControlPacket(SERVER_OPEN_ACK); + ReadNextChunk(); + } else { + SendControlPacket(SERVER_CLOSE); + delegate_->RemoveConnection(index_); + delegate_ = NULL; + } + } + + void SendControlPacket(ServerOpCode op_code) { + char buffer[kControlPacketSizeBytes]; + BuildControlPacket(buffer, op_code); + delegate_->SendPacket(buffer, kControlPacketSizeBytes); + } + + Delegate* delegate_; + net::UnixDomainClientSocket socket_; +}; + +/** + * Lives on the IO thread. + */ +class SocketTunnelServer::ConnectionController + : private Connection::Delegate { + public: + ConnectionController( + scoped_refptr<base::TaskRunner> io_task_runner, + scoped_refptr<AbstractDataChannel::Proxy> data_channel, + const std::string& socket_name) + : io_task_runner_(io_task_runner), + data_channel_(data_channel), + socket_name_(socket_name) { + DCHECK(data_channel_.get()); + } + + void HandleControlPacket(int connection_index, int op_code) { + DCHECK(connection_index < kMaxConnectionCount); + switch (op_code) { + case SocketTunnelConnection::CLIENT_OPEN: + if (connections_[connection_index].get() != NULL) { + DLOG(ERROR) << "Opening connection which already open: " + << connection_index; + HandleProtocolError(); + return; + } + connections_[connection_index].reset( + new Connection(this, connection_index, socket_name_)); + connections_[connection_index]->Connect(); + break; + + case SocketTunnelConnection::CLIENT_CLOSE: + if (connections_[connection_index].get() == NULL) { + // Ignore. Client may close the connection before received + // notification from the server. + return; + } + connections_[connection_index]->ClosedByClient(); + break; + + default: + DLOG(ERROR) << "Invalid op_code: " << op_code; + HandleProtocolError(); + return; + } + } + + void HandleDataPacket(int connection_index, + scoped_refptr<net::IOBufferWithSize> packet) { + Connection* connection = connections_[connection_index].get(); + if (connection != NULL) + connection->Write(packet); + } + + void HandleProtocolError() { + data_channel_->Close(); + } + + void CloseAllConnections() { + for (int i = 0; i < kMaxConnectionCount; i++) { + connections_[i].reset(); + } + } + + private: + static void DeleteConnectionImpl(Connection*) {} + + // Connection::Delegate implementation + void RemoveConnection(int connection_index) override { + // Remove immediately, delete later to preserve this of the caller. + Connection* connection = connections_[connection_index].release(); + io_task_runner_->PostTask( + FROM_HERE, base::Bind(&ConnectionController::DeleteConnectionImpl, + base::Owned(connection))); + } + + void SendPacket(const void* data, size_t length) override { + data_channel_->SendBinaryMessage(data, length); + } + + static const int kMaxConnectionCount = + SocketTunnelConnection::kMaxConnectionCount; + + scoped_refptr<base::TaskRunner> io_task_runner_; + scoped_refptr<AbstractDataChannel::Proxy> data_channel_; + scoped_ptr<Connection> connections_[kMaxConnectionCount]; + const std::string socket_name_; +}; + +class SocketTunnelServer::DataChannelObserver + : public AbstractDataChannel::Observer, + private SocketTunnelPacketHandler { + public: + DataChannelObserver(scoped_refptr<base::TaskRunner> io_task_runner, + scoped_ptr<ConnectionController> controller) + : io_task_runner_(io_task_runner), + controller_(controller.Pass()) { + } + + ~DataChannelObserver() override { + // Deleting on IO thread allows post tasks with base::Unretained + // because all of them will be processed before deletion. + io_task_runner_->PostTask( + FROM_HERE, base::Bind(&DataChannelObserver::DeleteControllerOnIOThread, + base::Passed(&controller_))); + } + + void OnOpen() override { + // Nothing to do. Activity could only be initiated by a control packet. + } + + void OnClose() override { + io_task_runner_->PostTask( + FROM_HERE, base::Bind( + &ConnectionController::CloseAllConnections, + base::Unretained(controller_.get()))); + } + + void OnMessage(const void* data, size_t length) override { + DecodePacket(data, length); + } + + private: + static void DeleteControllerOnIOThread( + scoped_ptr<ConnectionController> controller) {} + + // SocketTunnelPacketHandler implementation. + + void HandleControlPacket(int connection_index, int op_code) override { + io_task_runner_->PostTask( + FROM_HERE, base::Bind( + &ConnectionController::HandleControlPacket, + base::Unretained(controller_.get()), + connection_index, + op_code)); + } + + void HandleDataPacket(int connection_index, + scoped_refptr<net::IOBufferWithSize> data) override { + io_task_runner_->PostTask( + FROM_HERE, base::Bind( + &ConnectionController::HandleDataPacket, + base::Unretained(controller_.get()), + connection_index, + data)); + } + + void HandleProtocolError() override { + io_task_runner_->PostTask( + FROM_HERE, base::Bind( + &ConnectionController::HandleProtocolError, + base::Unretained(controller_.get()))); + } + + const scoped_refptr<base::TaskRunner> io_task_runner_; + scoped_ptr<ConnectionController> controller_; +}; + +SocketTunnelServer::SocketTunnelServer(SessionDependencyFactory* factory, + AbstractDataChannel* data_channel, + const std::string& socket_name) + : data_channel_(data_channel) { + scoped_ptr<ConnectionController> controller( + new ConnectionController(factory->io_thread_task_runner(), + data_channel->proxy(), + socket_name)); + + scoped_ptr<DataChannelObserver> data_channel_observer( + new DataChannelObserver(factory->io_thread_task_runner(), + controller.Pass())); + + data_channel_->RegisterObserver(data_channel_observer.Pass()); +} + +SocketTunnelServer::~SocketTunnelServer() { + data_channel_->UnregisterObserver(); +} + +} // namespace devtools_bridge diff --git a/components/devtools_bridge/socket_tunnel_server.h b/components/devtools_bridge/socket_tunnel_server.h new file mode 100644 index 0000000..1dd3f4c --- /dev/null +++ b/components/devtools_bridge/socket_tunnel_server.h @@ -0,0 +1,41 @@ +// Copyright 2014 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef COMPONENTS_DEVTOOLS_BRIDGE_SOCKET_TUNNEL_SERVER_H_ +#define COMPONENTS_DEVTOOLS_BRIDGE_SOCKET_TUNNEL_SERVER_H_ + +#include <map> +#include <string> + +#include "base/memory/scoped_ptr.h" + +namespace net { +class StreamSocket; +} + +namespace devtools_bridge { + +class AbstractDataChannel; +class SessionDependencyFactory; + +class SocketTunnelServer { + public: + SocketTunnelServer(SessionDependencyFactory* factory, + AbstractDataChannel* data_channel, + const std::string& socket_name); + ~SocketTunnelServer(); + + private: + class Connection; + class ConnectionController; + class DataChannelObserver; + + AbstractDataChannel* const data_channel_; + + DISALLOW_COPY_AND_ASSIGN(SocketTunnelServer); +}; + +} // namespace devtools_bridge + +#endif // COMPONENTS_DEVTOOLS_BRIDGE_SOCKET_TUNNEL_SERVER_H_ diff --git a/components/devtools_bridge/test/android/javatests/src/org/chromium/components/devtools_bridge/DataChannelObserverMock.java b/components/devtools_bridge/test/android/javatests/src/org/chromium/components/devtools_bridge/DataChannelObserverMock.java new file mode 100644 index 0000000..68972da --- /dev/null +++ b/components/devtools_bridge/test/android/javatests/src/org/chromium/components/devtools_bridge/DataChannelObserverMock.java @@ -0,0 +1,36 @@ +// Copyright 2014 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +package org.chromium.components.devtools_bridge; + +import java.nio.ByteBuffer; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingDeque; + +/** + * Mock for AbstractDataChannel.Observer. + */ +public class DataChannelObserverMock implements AbstractDataChannel.Observer { + public final CountDownLatch opened = new CountDownLatch(1); + public final CountDownLatch closed = new CountDownLatch(1); + public final LinkedBlockingDeque<byte[]> received = new LinkedBlockingDeque<byte[]>(); + + public void onStateChange(AbstractDataChannel.State state) { + switch (state) { + case OPEN: + opened.countDown(); + break; + + case CLOSED: + closed.countDown(); + break; + } + } + + public void onMessage(ByteBuffer message) { + byte[] bytes = new byte[message.remaining()]; + message.get(bytes); + received.add(bytes); + } +} diff --git a/components/devtools_bridge/test/android/javatests/src/org/chromium/components/devtools_bridge/DataPipe.java b/components/devtools_bridge/test/android/javatests/src/org/chromium/components/devtools_bridge/DataPipe.java index c36319a..7283047 100644 --- a/components/devtools_bridge/test/android/javatests/src/org/chromium/components/devtools_bridge/DataPipe.java +++ b/components/devtools_bridge/test/android/javatests/src/org/chromium/components/devtools_bridge/DataPipe.java @@ -11,69 +11,110 @@ import java.nio.ByteBuffer; * come to another and vice versa. */ public class DataPipe { - private final SignalingThreadMock mSignalingThread = new SignalingThreadMock(); + private static final int DATA_CHANNEL_ID = 0; - private final PairedDataChannel mDC0 = new PairedDataChannel(); - private final PairedDataChannel mDC1 = new PairedDataChannel(); + final PeerConnectionObserverMock mObserver1 = new PeerConnectionObserverMock(); + final PeerConnectionObserverMock mObserver2 = new PeerConnectionObserverMock(); - public void connect() { - mDC0.setPair(mDC1); - mDC1.setPair(mDC0); - mDC0.open(); - mDC1.open(); + DataChannelObserverMock mDataChannelObserverMock1 = new DataChannelObserverMock(); + DataChannelObserverMock mDataChannelObserverMock2 = new DataChannelObserverMock(); + + final AbstractPeerConnection mConnection1; + final AbstractPeerConnection mConnection2; + + final AbstractDataChannel mDataChannel1; + final AbstractDataChannel mDataChannel2; + + DataPipe(SessionDependencyFactory factory) { + RTCConfiguration config = new RTCConfiguration(); + mConnection1 = factory.createPeerConnection(config, mObserver1); + mConnection2 = factory.createPeerConnection(config, mObserver2); + + mObserver1.iceCandidatesSink = mConnection2; + mObserver2.iceCandidatesSink = mConnection1; + + mDataChannel1 = mConnection1.createDataChannel(DATA_CHANNEL_ID); + mDataChannel2 = mConnection2.createDataChannel(DATA_CHANNEL_ID); } - public void disconnect() { - mDC0.setPair(null); - mDC1.setPair(null); - mDC0.close(); - mDC1.close(); + void dispose() { + mDataChannel1.dispose(); + mDataChannel2.dispose(); + mConnection1.dispose(); + mConnection2.dispose(); } - public AbstractDataChannel dataChannel(int index) { - switch (index) { - case 0: - return mDC0; - case 1: - return mDC1; - default: - throw new IllegalArgumentException("index"); - } + void negotiate() throws Exception { + mConnection1.createAndSetLocalDescription( + AbstractPeerConnection.SessionDescriptionType.OFFER); + mObserver1.localDescriptionAvailable.await(); + + mConnection2.setRemoteDescription( + AbstractPeerConnection.SessionDescriptionType.OFFER, + mObserver1.localDescription); + mObserver2.remoteDescriptionSet.await(); + + mConnection2.createAndSetLocalDescription( + AbstractPeerConnection.SessionDescriptionType.ANSWER); + mObserver2.localDescriptionAvailable.await(); + + mConnection1.setRemoteDescription( + AbstractPeerConnection.SessionDescriptionType.ANSWER, + mObserver2.localDescription); + mObserver1.remoteDescriptionSet.await(); } - public void dispose() { - mDC0.dispose(); - mDC1.dispose(); - mSignalingThread.dispose(); + void awaitConnected() throws Exception { + mObserver1.connected.await(); + mObserver2.connected.await(); } - private class PairedDataChannel extends DataChannelMock { - private PairedDataChannel mPair; + void send(int channelIndex, String data) { + byte[] bytes = data.getBytes(); + ByteBuffer rawMessage = ByteBuffer.allocateDirect(bytes.length); + rawMessage.put(bytes); + rawMessage.limit(rawMessage.position()); + rawMessage.position(0); + dataChannel(channelIndex).send(rawMessage, AbstractDataChannel.MessageType.TEXT); + } - public PairedDataChannel() { - super(mSignalingThread); - } + void send(int channelIndex, ByteBuffer rawMessage) { + dataChannel(channelIndex).send(rawMessage, AbstractDataChannel.MessageType.BINARY); + } - public void setPair(final PairedDataChannel pair) { - mSignalingThread.invoke(new Runnable() { - @Override - public void run() { - mPair = pair; - } - }); - } + AbstractDataChannel dataChannel(int channelIndex) { + switch (channelIndex) { + case 0: + return mDataChannel1; - @Override - protected void sendOnSignalingThread(ByteBuffer message) { - assert message.remaining() > 0; + case 1: + return mDataChannel2; - if (mPair == null) return; - mPair.notifyMessageOnSignalingThread(message); + default: + throw new ArrayIndexOutOfBoundsException(); } + } + + DataChannelObserverMock dataChannelObserver(int channelIndex) { + switch (channelIndex) { + case 0: + return mDataChannelObserverMock1; - @Override - protected void disposeSignalingThread() { - // Ignore. Will dispose in DataPipe.dispose. + case 1: + return mDataChannelObserverMock2; + + default: + throw new ArrayIndexOutOfBoundsException(); } } + + void registerDatatChannelObservers() { + mDataChannel1.registerObserver(mDataChannelObserverMock1); + mDataChannel2.registerObserver(mDataChannelObserverMock2); + } + + void unregisterDatatChannelObservers() { + mDataChannel1.unregisterObserver(); + mDataChannel2.unregisterObserver(); + } } diff --git a/components/devtools_bridge/test/android/javatests/src/org/chromium/components/devtools_bridge/LocalSessionBridge.java b/components/devtools_bridge/test/android/javatests/src/org/chromium/components/devtools_bridge/LocalSessionBridge.java index 85c81a7..13f2418 100644 --- a/components/devtools_bridge/test/android/javatests/src/org/chromium/components/devtools_bridge/LocalSessionBridge.java +++ b/components/devtools_bridge/test/android/javatests/src/org/chromium/components/devtools_bridge/LocalSessionBridge.java @@ -66,7 +66,18 @@ public class LocalSessionBridge { } public void dispose() { - if (isStarted()) stop(); + mServerExecutor.runSynchronously(new Runnable() { + @Override + public void run() { + mServerSession.dispose(); + } + }); + mClientExecutor.runSynchronously(new Runnable() { + @Override + public void run() { + mClientSession.dispose(); + } + }); mServerExecutor.dispose(); mClientExecutor.dispose(); @@ -96,16 +107,18 @@ public class LocalSessionBridge { } public void stop() { + assert mStarted; + mServerExecutor.runSynchronously(new Runnable() { @Override public void run() { - mServerSession.dispose(); + mServerSession.stop(); } }); mClientExecutor.runSynchronously(new Runnable() { @Override public void run() { - mClientSession.dispose(); + mClientSession.stop(); } }); mStarted = false; @@ -129,7 +142,7 @@ public class LocalSessionBridge { private class ServerSessionMock extends ServerSession { public ServerSessionMock(String serverSocketName) { - super(mFactory, mServerExecutor, serverSocketName); + super(LocalSessionBridge.this.mFactory, mServerExecutor, serverSocketName); } public void setAutoCloseTimeoutMs(int value) { @@ -164,8 +177,8 @@ public class LocalSessionBridge { } @Override - protected SocketTunnelServer createSocketTunnelServer(String serverSocketName) { - SocketTunnelServer tunnel = super.createSocketTunnelServer(serverSocketName); + protected SocketTunnel newSocketTunnelServer(String serverSocketName) { + SocketTunnel tunnel = super.newSocketTunnelServer(serverSocketName); Log.d(TAG, "Server tunnel created on " + serverSocketName); return tunnel; } @@ -174,7 +187,7 @@ public class LocalSessionBridge { private class ClientSessionMock extends ClientSession { public ClientSessionMock(ServerSession serverSession, String clientSocketName) throws IOException { - super(mFactory, + super(LocalSessionBridge.this.mFactory, mClientExecutor, createServerSessionProxy(serverSession), clientSocketName); diff --git a/components/devtools_bridge/test/android/javatests/src/org/chromium/components/devtools_bridge/LocalTunnelBridge.java b/components/devtools_bridge/test/android/javatests/src/org/chromium/components/devtools_bridge/LocalTunnelBridge.java deleted file mode 100644 index 1aa7823..0000000 --- a/components/devtools_bridge/test/android/javatests/src/org/chromium/components/devtools_bridge/LocalTunnelBridge.java +++ /dev/null @@ -1,190 +0,0 @@ -// Copyright 2014 The Chromium Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. - -package org.chromium.components.devtools_bridge; - -import android.util.Log; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.concurrent.CountDownLatch; - -/** - * It allows testing DevTools socket tunneling on a single device. - * - * SocketTunnelClient opens LocalServerSocket named |socketToExpose| and - * tunnels all incoming connections to |socketToReplicate| using - * SocketTunnelServer and DataPipe between them. All data passes through - * WebRTC data channel but doens't leave the device. - */ -public class LocalTunnelBridge { - private static final String TAG = "LocalTunnelBridge"; - - private final DataPipe mPipe; - private final SocketTunnelServer mServer; - private final SocketTunnelClient mClient; - private boolean mLogPackets = false; - - private final CountDownLatch mServerDataChannelOpenedFlag = new CountDownLatch(1); - private final CountDownLatch mServerDataChannelClosedFlag = new CountDownLatch(1); - - public LocalTunnelBridge(String socketToReplicate, String socketToExpose) throws IOException { - mPipe = new DataPipe(); - - mServer = new SocketTunnelServer(socketToReplicate) { - @Override - protected void onProtocolError(ProtocolError e) { - throw new RuntimeException("Protocol error on server", e); - } - - @Override - protected void sendToDataChannel(ByteBuffer packet) { - if (mLogPackets) - Log.d(TAG, "Sending " + stringifyServerPacket(packet)); - super.sendToDataChannel(packet); - } - - @Override - protected void onReceivedDataPacket(int connectionId, byte[] data) - throws ProtocolError { - if (mLogPackets) { - Log.d(TAG, "Received client data packet with " + - Integer.toString(data.length) + " bytes"); - } - super.onReceivedDataPacket(connectionId, data); - } - - @Override - protected void onReceivedControlPacket(int connectionId, byte opCode) - throws ProtocolError { - if (mLogPackets) { - Log.d(TAG, "Received client control packet"); - } - super.onReceivedControlPacket(connectionId, opCode); - } - - @Override - protected void onSocketException(IOException e, int connectionId) { - Log.d(TAG, "Server socket exception on " + e + - " (connection " + Integer.toString(connectionId) + ")"); - super.onSocketException(e, connectionId); - } - - protected void onDataChannelOpened() { - Log.d(TAG, "Server data channel opened"); - super.onDataChannelOpened(); - mServerDataChannelOpenedFlag.countDown(); - } - - protected void onDataChannelClosed() { - Log.d(TAG, "Client data channel opened"); - super.onDataChannelClosed(); - mServerDataChannelClosedFlag.countDown(); - } - }; - - mServer.bind(mPipe.dataChannel(0)); - - mClient = new SocketTunnelClient(socketToExpose) { - @Override - protected void onProtocolError(ProtocolError e) { - throw new RuntimeException("Protocol error on client" + e); - } - - @Override - protected void onReceivedDataPacket(int connectionId, byte[] data) - throws ProtocolError { - if (mLogPackets) { - Log.d(TAG, "Received server data packet with " - + Integer.toString(data.length) + " bytes"); - } - super.onReceivedDataPacket(connectionId, data); - } - - @Override - protected void onReceivedControlPacket(int connectionId, byte opCode) - throws ProtocolError { - if (mLogPackets) { - Log.d(TAG, "Received server control packet"); - } - super.onReceivedControlPacket(connectionId, opCode); - } - - @Override - protected void sendToDataChannel(ByteBuffer packet) { - if (mLogPackets) { - Log.d(TAG, "Sending " + stringifyClientPacket(packet)); - } - super.sendToDataChannel(packet); - } - }; - mClient.bind(mPipe.dataChannel(1)); - } - - public void start() { - mPipe.connect(); - } - - public void stop() { - mPipe.disconnect(); - } - - public void dispose() { - mClient.unbind(); - mServer.unbind(); - mPipe.dispose(); - } - - public void waitAllConnectionsClosed() throws InterruptedException { - while (mServer.hasConnections() || mClient.hasConnections()) { - Thread.sleep(50); - } - } - - private String stringifyDataPacket(String type, PacketDecoder decoder) { - if (!decoder.isDataPacket()) { - throw new RuntimeException("Invalid packet"); - } - return type + "_DATA:" + Integer.toString(decoder.data().length); - } - - private String stringifyClientPacket(ByteBuffer packet) { - PacketDecoder decoder = decode(packet); - if (!decoder.isControlPacket()) - return stringifyDataPacket("CLIENT", decoder); - switch (decoder.opCode()) { - case SocketTunnelBase.CLIENT_OPEN: - return "CLIENT_OPEN " + Integer.valueOf(decoder.connectionId()); - case SocketTunnelBase.CLIENT_CLOSE: - return "CLIENT_CLOSE " + Integer.valueOf(decoder.connectionId()); - default: - throw new RuntimeException("Invalid packet"); - } - } - - private String stringifyServerPacket(ByteBuffer packet) { - PacketDecoder decoder = decode(packet); - if (!decoder.isControlPacket()) - return stringifyDataPacket("SERVER", decoder); - switch (decoder.opCode()) { - case SocketTunnelBase.SERVER_OPEN_ACK: - return "SERVER_OPEN_ACK " + Integer.valueOf(decoder.connectionId()); - case SocketTunnelBase.SERVER_CLOSE: - return "SERVER_CLOSE " + Integer.valueOf(decoder.connectionId()); - default: - throw new RuntimeException("Invalid packet"); - } - } - - private PacketDecoder decode(ByteBuffer packet) { - int position = packet.position(); - packet.position(0); - if (position == 0) { - throw new RuntimeException("Empty packet"); - } - PacketDecoder decoder = PacketDecoder.decode(packet); - packet.position(position); - return decoder; - } -} diff --git a/components/devtools_bridge/test/android/javatests/src/org/chromium/components/devtools_bridge/PeerConnectionObserverMock.java b/components/devtools_bridge/test/android/javatests/src/org/chromium/components/devtools_bridge/PeerConnectionObserverMock.java new file mode 100644 index 0000000..6dda0bd --- /dev/null +++ b/components/devtools_bridge/test/android/javatests/src/org/chromium/components/devtools_bridge/PeerConnectionObserverMock.java @@ -0,0 +1,51 @@ +// Copyright 2014 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +package org.chromium.components.devtools_bridge; + +import java.util.concurrent.CountDownLatch; + +/** + * Mock for AbstractPeerConnection.Observer. + */ +public class PeerConnectionObserverMock implements AbstractPeerConnection.Observer { + public AbstractPeerConnection.SessionDescriptionType localDescriptionType; + public String localDescription; + + public final CountDownLatch localDescriptionAvailable = new CountDownLatch(1); + public final CountDownLatch failureAvailable = new CountDownLatch(1); + public final CountDownLatch remoteDescriptionSet = new CountDownLatch(1); + public final CountDownLatch connected = new CountDownLatch(1); + + public AbstractPeerConnection iceCandidatesSink; + + @Override + public void onFailure(String description) { + failureAvailable.countDown(); + } + + @Override + public void onLocalDescriptionCreatedAndSet( + AbstractPeerConnection.SessionDescriptionType type, String description) { + localDescriptionType = type; + localDescription = description; + localDescriptionAvailable.countDown(); + } + + @Override + public void onRemoteDescriptionSet() { + remoteDescriptionSet.countDown(); + } + + @Override + public void onIceCandidate(String iceCandidate) { + if (iceCandidatesSink != null) + iceCandidatesSink.addIceCandidate(iceCandidate); + } + + @Override + public void onIceConnectionChange(boolean connected) { + this.connected.countDown(); + } +} diff --git a/components/devtools_bridge/android/java/src/org/chromium/components/devtools_bridge/SocketTunnelBase.java b/components/devtools_bridge/test/android/javatests/src/org/chromium/components/devtools_bridge/SocketTunnelBase.java index e0ca732..bc15d76 100644 --- a/components/devtools_bridge/android/java/src/org/chromium/components/devtools_bridge/SocketTunnelBase.java +++ b/components/devtools_bridge/test/android/javatests/src/org/chromium/components/devtools_bridge/SocketTunnelBase.java @@ -39,7 +39,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; * if CLIENT_CLOSE has sent and SERVER_CLOSE has received with the same connection ID this * ID is safe to be reused. */ -public abstract class SocketTunnelBase { +public abstract class SocketTunnelBase implements SocketTunnel { // Data channel is threadsafe but access to the reference needs synchronization. private final ReadWriteLock mDataChanneliReferenceLock = new ReentrantReadWriteLock(); private volatile AbstractDataChannel mDataChannel; @@ -82,6 +82,7 @@ public abstract class SocketTunnelBase { // For writing in socket without blocking signaling thread. private final ExecutorService mWritingThread = Executors.newSingleThreadExecutor(); + @Override public boolean isBound() { final Lock lock = mDataChanneliReferenceLock.readLock(); lock.lock(); @@ -96,6 +97,7 @@ public abstract class SocketTunnelBase { * Binds the tunnel to the data channel. Tunnel starts its activity when data channel * open. */ + @Override public void bind(AbstractDataChannel dataChannel) { // Observer registrution must not be done in constructor. final Lock lock = mDataChanneliReferenceLock.writeLock(); @@ -112,6 +114,7 @@ public abstract class SocketTunnelBase { * Stops all tunnel activity and returns the prevously bound data channel. * It's safe to dispose the data channel after it. */ + @Override public AbstractDataChannel unbind() { final Lock lock = mDataChanneliReferenceLock.writeLock(); lock.lock(); diff --git a/components/devtools_bridge/test/android/javatests/src/org/chromium/components/devtools_bridge/SocketTunnelClient.java b/components/devtools_bridge/test/android/javatests/src/org/chromium/components/devtools_bridge/SocketTunnelClient.java index 4a83386..97e7c327 100644 --- a/components/devtools_bridge/test/android/javatests/src/org/chromium/components/devtools_bridge/SocketTunnelClient.java +++ b/components/devtools_bridge/test/android/javatests/src/org/chromium/components/devtools_bridge/SocketTunnelClient.java @@ -126,12 +126,20 @@ public class SocketTunnelClient extends SocketTunnelBase { @Override public AbstractDataChannel unbind() { AbstractDataChannel dataChannel = super.unbind(); - close(); + if (mState.compareAndSet(State.RUNNING, State.STOPPED)) { + terminateAllConnections(); + closeSocket(); + } return dataChannel; } - public void close() { - if (mState.get() != State.STOPPED) closeSocket(); + @Override + public void dispose() { + if (mState.compareAndSet(State.INITIAL, State.STOPPED)) { + closeSocket(); + } + assert mState.get() == State.STOPPED; + mThreadPool.shutdown(); } @Override @@ -222,6 +230,11 @@ public class SocketTunnelClient extends SocketTunnelBase { throw new InvalidStateException(); } + closeSocket(); + } + + private void terminateAllConnections() { + for (Connection connection : mServerConnections.values()) { connection.terminate(); } @@ -231,8 +244,6 @@ public class SocketTunnelClient extends SocketTunnelBase { } closeSocket(); - - mThreadPool.shutdown(); } private void closeSocket() { diff --git a/components/devtools_bridge/test/android/javatests/src/org/chromium/components/devtools_bridge/TestUtils.java b/components/devtools_bridge/test/android/javatests/src/org/chromium/components/devtools_bridge/TestUtils.java index d89beec..5a280e7 100644 --- a/components/devtools_bridge/test/android/javatests/src/org/chromium/components/devtools_bridge/TestUtils.java +++ b/components/devtools_bridge/test/android/javatests/src/org/chromium/components/devtools_bridge/TestUtils.java @@ -31,7 +31,7 @@ public class TestUtils { public String call() throws Exception { LocalSocket socket = new LocalSocket(); socket.connect(new LocalSocketAddress(socketName)); - writeAndShutdown(socket, request); + write(socket, request); String response = readAll(socket); socket.close(); @@ -42,17 +42,16 @@ public class TestUtils { }); } - public static void writeAndShutdown(LocalSocket socket, String data) throws IOException { + public static void write(LocalSocket socket, String data) throws IOException { socket.getOutputStream().write(data.getBytes(CHARSET)); socket.getOutputStream().flush(); - socket.shutdownOutput(); } // Reads all bytes from socket input stream until EOF and converts it to UTF-8 string. - public static String readAll(LocalSocket socket) throws IOException { - byte[] buffer = new byte[1000]; + public static String read(LocalSocket socket, int length) throws IOException { + byte[] buffer = new byte[length]; int position = 0; - while (true) { + while (position < buffer.length) { int count = socket.getInputStream().read(buffer, position, buffer.length - position); if (count == -1) break; @@ -61,6 +60,10 @@ public class TestUtils { return new String(buffer, 0, position, CHARSET); } + public static String readAll(LocalSocket socket) throws IOException { + return read(socket, 1000); + } + /** * Utility class for thread synchronization. Allow to track moving through series of steps. */ |