diff options
author | maruel@google.com <maruel@google.com@0039d316-1c4b-4281-b951-d872f2087c98> | 2008-10-21 23:00:58 +0000 |
---|---|---|
committer | maruel@google.com <maruel@google.com@0039d316-1c4b-4281-b951-d872f2087c98> | 2008-10-21 23:00:58 +0000 |
commit | fb1234375c142c49d28ddbf1fdac5ee9a0f444bc (patch) | |
tree | c52c0dabf0320ffa513f115b3dc3a3537b87a5f3 /tools/buildbot/pylibs/twisted/test/test_tcp.py | |
parent | 090b98d8ea1c8c5099a6ff010d39ccf6088e9ed1 (diff) | |
download | chromium_src-fb1234375c142c49d28ddbf1fdac5ee9a0f444bc.zip chromium_src-fb1234375c142c49d28ddbf1fdac5ee9a0f444bc.tar.gz chromium_src-fb1234375c142c49d28ddbf1fdac5ee9a0f444bc.tar.bz2 |
Move the buildbot code to trunk/tools/buildbot so it is not included in a default source checkout.
git-svn-id: svn://svn.chromium.org/chrome/trunk/src@3698 0039d316-1c4b-4281-b951-d872f2087c98
Diffstat (limited to 'tools/buildbot/pylibs/twisted/test/test_tcp.py')
-rw-r--r-- | tools/buildbot/pylibs/twisted/test/test_tcp.py | 1776 |
1 files changed, 0 insertions, 1776 deletions
diff --git a/tools/buildbot/pylibs/twisted/test/test_tcp.py b/tools/buildbot/pylibs/twisted/test/test_tcp.py deleted file mode 100644 index 96d4c02..0000000 --- a/tools/buildbot/pylibs/twisted/test/test_tcp.py +++ /dev/null @@ -1,1776 +0,0 @@ -# Copyright (c) 2001-2008 Twisted Matrix Laboratories. -# See LICENSE for details. - -""" -Tests for implementations of L{IReactorTCP}. -""" - -import socket, random, errno - -from zope.interface import implements - -from twisted.trial import unittest - -from twisted.python.log import msg -from twisted.internet import protocol, reactor, defer, interfaces -from twisted.internet import error -from twisted.internet.address import IPv4Address -from twisted.internet.interfaces import IHalfCloseableProtocol, IPullProducer -from twisted.protocols import policies - -def loopUntil(predicate, interval=0): - """ - Poor excuse for an event notification helper. This polls a condition and - calls back a Deferred when it is seen to be true. - - Do not use this function. - """ - from twisted.internet import task - d = defer.Deferred() - def check(): - res = predicate() - if res: - d.callback(res) - call = task.LoopingCall(check) - def stop(result): - call.stop() - return result - d.addCallback(stop) - d2 = call.start(interval) - d2.addErrback(d.errback) - return d - - -class ClosingProtocol(protocol.Protocol): - - def connectionMade(self): - self.transport.loseConnection() - - def connectionLost(self, reason): - reason.trap(error.ConnectionDone) - -class ClosingFactory(protocol.ServerFactory): - """Factory that closes port immediatley.""" - - def buildProtocol(self, conn): - self.port.stopListening() - return ClosingProtocol() - - -class MyProtocol(protocol.Protocol): - made = closed = failed = 0 - - closedDeferred = None - - data = "" - - factory = None - - def connectionMade(self): - self.made = 1 - if (self.factory is not None and - self.factory.protocolConnectionMade is not None): - d = self.factory.protocolConnectionMade - self.factory.protocolConnectionMade = None - d.callback(self) - - def dataReceived(self, data): - self.data += data - - def connectionLost(self, reason): - self.closed = 1 - if self.closedDeferred is not None: - d, self.closedDeferred = self.closedDeferred, None - d.callback(None) - - -class MyProtocolFactoryMixin(object): - """ - Mixin for factories which create L{MyProtocol} instances. - - @type protocolFactory: no-argument callable - @ivar protocolFactory: Factory for protocols - takes the place of the - typical C{protocol} attribute of factories (but that name is used by - this class for something else). - - @type protocolConnectionMade: L{NoneType} or L{defer.Deferred} - @ivar protocolConnectionMade: When an instance of L{MyProtocol} is - connected, if this is not C{None}, the L{Deferred} will be called - back with the protocol instance and the attribute set to C{None}. - - @type protocolConnectionLost: L{NoneType} or L{defer.Deferred} - @ivar protocolConnectionLost: When an instance of L{MyProtocol} is - created, this will be set as its C{closedDeferred} attribute and - then this attribute will be set to C{None} so the L{defer.Deferred} - is not used by more than one protocol. - - @ivar protocol: The most recently created L{MyProtocol} instance which - was returned from C{buildProtocol}. - - @type called: C{int} - @ivar called: A counter which is incremented each time C{buildProtocol} - is called. - - @ivar peerAddresses: A C{list} of the addresses passed to C{buildProtocol}. - """ - protocolFactory = MyProtocol - - protocolConnectionMade = None - protocolConnectionLost = None - protocol = None - called = 0 - - def __init__(self): - self.peerAddresses = [] - - - def buildProtocol(self, addr): - """ - Create a L{MyProtocol} and set it up to be able to perform - callbacks. - """ - self.peerAddresses.append(addr) - self.called += 1 - p = self.protocolFactory() - p.factory = self - p.closedDeferred = self.protocolConnectionLost - self.protocolConnectionLost = None - self.protocol = p - return p - - - -class MyServerFactory(MyProtocolFactoryMixin, protocol.ServerFactory): - """ - Server factory which creates L{MyProtocol} instances. - """ - - - -class MyClientFactory(MyProtocolFactoryMixin, protocol.ClientFactory): - """ - Client factory which creates L{MyProtocol} instances. - """ - failed = 0 - stopped = 0 - - def __init__(self): - MyProtocolFactoryMixin.__init__(self) - self.deferred = defer.Deferred() - self.failDeferred = defer.Deferred() - - def clientConnectionFailed(self, connector, reason): - self.failed = 1 - self.reason = reason - self.failDeferred.callback(None) - - def clientConnectionLost(self, connector, reason): - self.lostReason = reason - self.deferred.callback(None) - - def stopFactory(self): - self.stopped = 1 - - - -class ListeningTestCase(unittest.TestCase): - - def test_listen(self): - """ - L{IReactorTCP.listenTCP} returns an object which provides - L{IListeningPort}. - """ - f = MyServerFactory() - p1 = reactor.listenTCP(0, f, interface="127.0.0.1") - self.addCleanup(p1.stopListening) - self.failUnless(interfaces.IListeningPort.providedBy(p1)) - - - def testStopListening(self): - """ - The L{IListeningPort} returned by L{IReactorTCP.listenTCP} can be - stopped with its C{stopListening} method. After the L{Deferred} it - (optionally) returns has been called back, the port number can be bound - to a new server. - """ - f = MyServerFactory() - port = reactor.listenTCP(0, f, interface="127.0.0.1") - n = port.getHost().port - - def cbStopListening(ignored): - # Make sure we can rebind the port right away - port = reactor.listenTCP(n, f, interface="127.0.0.1") - return port.stopListening() - - d = defer.maybeDeferred(port.stopListening) - d.addCallback(cbStopListening) - return d - - - def testNumberedInterface(self): - f = MyServerFactory() - # listen only on the loopback interface - p1 = reactor.listenTCP(0, f, interface='127.0.0.1') - return p1.stopListening() - - def testPortRepr(self): - f = MyServerFactory() - p = reactor.listenTCP(0, f) - portNo = str(p.getHost().port) - self.failIf(repr(p).find(portNo) == -1) - def stoppedListening(ign): - self.failIf(repr(p).find(portNo) != -1) - d = defer.maybeDeferred(p.stopListening) - return d.addCallback(stoppedListening) - - - def test_serverRepr(self): - """ - Check that the repr string of the server transport get the good port - number if the server listens on 0. - """ - server = MyServerFactory() - serverConnMade = server.protocolConnectionMade = defer.Deferred() - port = reactor.listenTCP(0, server) - self.addCleanup(port.stopListening) - - client = MyClientFactory() - clientConnMade = client.protocolConnectionMade = defer.Deferred() - connector = reactor.connectTCP("127.0.0.1", - port.getHost().port, client) - self.addCleanup(connector.disconnect) - def check((serverProto, clientProto)): - portNumber = port.getHost().port - self.assertEquals(repr(serverProto.transport), - "<MyProtocol #0 on %s>" % (portNumber,)) - serverProto.transport.loseConnection() - clientProto.transport.loseConnection() - return defer.gatherResults([serverConnMade, clientConnMade] - ).addCallback(check) - - - -def callWithSpew(f): - from twisted.python.util import spewerWithLinenums as spewer - import sys - sys.settrace(spewer) - try: - f() - finally: - sys.settrace(None) - -class LoopbackTestCase(unittest.TestCase): - """ - Test loopback connections. - """ - def test_closePortInProtocolFactory(self): - """ - A port created with L{IReactorTCP.listenTCP} can be connected to with - L{IReactorTCP.connectTCP}. - """ - f = ClosingFactory() - port = reactor.listenTCP(0, f, interface="127.0.0.1") - self.addCleanup(port.stopListening) - portNumber = port.getHost().port - f.port = port - clientF = MyClientFactory() - reactor.connectTCP("127.0.0.1", portNumber, clientF) - def check(x): - self.assertTrue(clientF.protocol.made) - self.assertTrue(port.disconnected) - clientF.lostReason.trap(error.ConnectionDone) - return clientF.deferred.addCallback(check) - - def _trapCnxDone(self, obj): - getattr(obj, 'trap', lambda x: None)(error.ConnectionDone) - - - def _connectedClientAndServerTest(self, callback): - """ - Invoke the given callback with a client protocol and a server protocol - which have been connected to each other. - """ - serverFactory = MyServerFactory() - serverConnMade = defer.Deferred() - serverFactory.protocolConnectionMade = serverConnMade - port = reactor.listenTCP(0, serverFactory, interface="127.0.0.1") - self.addCleanup(port.stopListening) - - portNumber = port.getHost().port - clientF = MyClientFactory() - clientConnMade = defer.Deferred() - clientF.protocolConnectionMade = clientConnMade - reactor.connectTCP("127.0.0.1", portNumber, clientF) - - connsMade = defer.gatherResults([serverConnMade, clientConnMade]) - def connected((serverProtocol, clientProtocol)): - callback(serverProtocol, clientProtocol) - serverProtocol.transport.loseConnection() - clientProtocol.transport.loseConnection() - connsMade.addCallback(connected) - return connsMade - - - def test_tcpNoDelay(self): - """ - The transport of a protocol connected with L{IReactorTCP.connectTCP} or - L{IReactor.TCP.listenTCP} can have its I{TCP_NODELAY} state inspected - and manipulated with L{ITCPTransport.getTcpNoDelay} and - L{ITCPTransport.setTcpNoDelay}. - """ - def check(serverProtocol, clientProtocol): - for p in [serverProtocol, clientProtocol]: - transport = p.transport - self.assertEquals(transport.getTcpNoDelay(), 0) - transport.setTcpNoDelay(1) - self.assertEquals(transport.getTcpNoDelay(), 1) - transport.setTcpNoDelay(0) - self.assertEquals(transport.getTcpNoDelay(), 0) - return self._connectedClientAndServerTest(check) - - - def test_tcpKeepAlive(self): - """ - The transport of a protocol connected with L{IReactorTCP.connectTCP} or - L{IReactor.TCP.listenTCP} can have its I{SO_KEEPALIVE} state inspected - and manipulated with L{ITCPTransport.getTcpKeepAlive} and - L{ITCPTransport.setTcpKeepAlive}. - """ - def check(serverProtocol, clientProtocol): - for p in [serverProtocol, clientProtocol]: - transport = p.transport - self.assertEquals(transport.getTcpKeepAlive(), 0) - transport.setTcpKeepAlive(1) - self.assertEquals(transport.getTcpKeepAlive(), 1) - transport.setTcpKeepAlive(0) - self.assertEquals(transport.getTcpKeepAlive(), 0) - return self._connectedClientAndServerTest(check) - - - def testFailing(self): - clientF = MyClientFactory() - # XXX we assume no one is listening on TCP port 69 - reactor.connectTCP("127.0.0.1", 69, clientF, timeout=5) - def check(ignored): - clientF.reason.trap(error.ConnectionRefusedError) - return clientF.failDeferred.addCallback(check) - - - def test_connectionRefusedErrorNumber(self): - """ - Assert that the error number of the ConnectionRefusedError is - ECONNREFUSED, and not some other socket related error. - """ - - # Bind a number of ports in the operating system. We will attempt - # to connect to these in turn immediately after closing them, in the - # hopes that no one else has bound them in the mean time. Any - # connection which succeeds is ignored and causes us to move on to - # the next port. As soon as a connection attempt fails, we move on - # to making an assertion about how it failed. If they all succeed, - # the test will fail. - - # It would be nice to have a simpler, reliable way to cause a - # connection failure from the platform. - # - # On Linux (2.6.15), connecting to port 0 always fails. FreeBSD - # (5.4) rejects the connection attempt with EADDRNOTAVAIL. - # - # On FreeBSD (5.4), listening on a port and then repeatedly - # connecting to it without ever accepting any connections eventually - # leads to an ECONNREFUSED. On Linux (2.6.15), a seemingly - # unbounded number of connections succeed. - - serverSockets = [] - for i in xrange(10): - serverSocket = socket.socket() - serverSocket.bind(('127.0.0.1', 0)) - serverSocket.listen(1) - serverSockets.append(serverSocket) - random.shuffle(serverSockets) - - clientCreator = protocol.ClientCreator(reactor, protocol.Protocol) - - def tryConnectFailure(): - def connected(proto): - """ - Darn. Kill it and try again, if there are any tries left. - """ - proto.transport.loseConnection() - if serverSockets: - return tryConnectFailure() - self.fail("Could not fail to connect - could not test errno for that case.") - - serverSocket = serverSockets.pop() - serverHost, serverPort = serverSocket.getsockname() - serverSocket.close() - - connectDeferred = clientCreator.connectTCP(serverHost, serverPort) - connectDeferred.addCallback(connected) - return connectDeferred - - refusedDeferred = tryConnectFailure() - self.assertFailure(refusedDeferred, error.ConnectionRefusedError) - def connRefused(exc): - self.assertEqual(exc.osError, errno.ECONNREFUSED) - refusedDeferred.addCallback(connRefused) - def cleanup(passthrough): - while serverSockets: - serverSockets.pop().close() - return passthrough - refusedDeferred.addBoth(cleanup) - return refusedDeferred - - - def test_connectByServiceFail(self): - """ - Connecting to a named service which does not exist raises - L{error.ServiceNameUnknownError}. - """ - self.assertRaises( - error.ServiceNameUnknownError, - reactor.connectTCP, - "127.0.0.1", "thisbetternotexist", MyClientFactory()) - - - def test_connectByService(self): - """ - L{IReactorTCP.connectTCP} accepts the name of a service instead of a - port number and connects to the port number associated with that - service, as defined by L{socket.getservbyname}. - """ - serverFactory = MyServerFactory() - serverConnMade = defer.Deferred() - serverFactory.protocolConnectionMade = serverConnMade - port = reactor.listenTCP(0, serverFactory, interface="127.0.0.1") - self.addCleanup(port.stopListening) - portNumber = port.getHost().port - clientFactory = MyClientFactory() - clientConnMade = defer.Deferred() - clientFactory.protocolConnectionMade = clientConnMade - - def fakeGetServicePortByName(serviceName, protocolName): - if serviceName == 'http' and protocolName == 'tcp': - return portNumber - return 10 - self.patch(socket, 'getservbyname', fakeGetServicePortByName) - - c = reactor.connectTCP('127.0.0.1', 'http', clientFactory) - - connMade = defer.gatherResults([serverConnMade, clientConnMade]) - def connected((serverProtocol, clientProtocol)): - self.assertTrue( - serverFactory.called, - "Server factory was not called upon to build a protocol.") - serverProtocol.transport.loseConnection() - clientProtocol.transport.loseConnection() - connMade.addCallback(connected) - return connMade - - -class StartStopFactory(protocol.Factory): - - started = 0 - stopped = 0 - - def startFactory(self): - if self.started or self.stopped: - raise RuntimeError - self.started = 1 - - def stopFactory(self): - if not self.started or self.stopped: - raise RuntimeError - self.stopped = 1 - - -class ClientStartStopFactory(MyClientFactory): - - started = 0 - stopped = 0 - - def startFactory(self): - if self.started or self.stopped: - raise RuntimeError - self.started = 1 - - def stopFactory(self): - if not self.started or self.stopped: - raise RuntimeError - self.stopped = 1 - - -class FactoryTestCase(unittest.TestCase): - """Tests for factories.""" - - def test_serverStartStop(self): - """ - The factory passed to L{IReactorTCP.listenTCP} should be started only - when it transitions from being used on no ports to being used on one - port and should be stopped only when it transitions from being used on - one port to being used on no ports. - """ - # Note - this test doesn't need to use listenTCP. It is exercising - # logic implemented in Factory.doStart and Factory.doStop, so it could - # just call that directly. Some other test can make sure that - # listenTCP and stopListening correctly call doStart and - # doStop. -exarkun - - f = StartStopFactory() - - # listen on port - p1 = reactor.listenTCP(0, f, interface='127.0.0.1') - self.addCleanup(p1.stopListening) - - self.assertEqual((f.started, f.stopped), (1, 0)) - - # listen on two more ports - p2 = reactor.listenTCP(0, f, interface='127.0.0.1') - p3 = reactor.listenTCP(0, f, interface='127.0.0.1') - - self.assertEqual((f.started, f.stopped), (1, 0)) - - # close two ports - d1 = defer.maybeDeferred(p1.stopListening) - d2 = defer.maybeDeferred(p2.stopListening) - closedDeferred = defer.gatherResults([d1, d2]) - def cbClosed(ignored): - self.assertEqual((f.started, f.stopped), (1, 0)) - # Close the last port - return p3.stopListening() - closedDeferred.addCallback(cbClosed) - - def cbClosedAll(ignored): - self.assertEquals((f.started, f.stopped), (1, 1)) - closedDeferred.addCallback(cbClosedAll) - return closedDeferred - - - def test_clientStartStop(self): - """ - The factory passed to L{IReactorTCP.connectTCP} should be started when - the connection attempt starts and stopped when it is over. - """ - f = ClosingFactory() - p = reactor.listenTCP(0, f, interface="127.0.0.1") - self.addCleanup(p.stopListening) - portNumber = p.getHost().port - f.port = p - - factory = ClientStartStopFactory() - reactor.connectTCP("127.0.0.1", portNumber, factory) - self.assertTrue(factory.started) - return loopUntil(lambda: factory.stopped) - - - -class ConnectorTestCase(unittest.TestCase): - - def test_connectorIdentity(self): - """ - L{IReactorTCP.connectTCP} returns an object which provides - L{IConnector}. The destination of the connector is the address which - was passed to C{connectTCP}. The same connector object is passed to - the factory's C{startedConnecting} method as to the factory's - C{clientConnectionLost} method. - """ - serverFactory = ClosingFactory() - tcpPort = reactor.listenTCP(0, serverFactory, interface="127.0.0.1") - self.addCleanup(tcpPort.stopListening) - portNumber = tcpPort.getHost().port - serverFactory.port = tcpPort - - seenConnectors = [] - seenFailures = [] - - clientFactory = ClientStartStopFactory() - clientFactory.clientConnectionLost = ( - lambda connector, reason: (seenConnectors.append(connector), - seenFailures.append(reason))) - clientFactory.startedConnecting = seenConnectors.append - - connector = reactor.connectTCP("127.0.0.1", portNumber, clientFactory) - self.assertTrue(interfaces.IConnector.providedBy(connector)) - dest = connector.getDestination() - self.assertEquals(dest.type, "TCP") - self.assertEquals(dest.host, "127.0.0.1") - self.assertEquals(dest.port, portNumber) - - d = loopUntil(lambda: clientFactory.stopped) - def clientFactoryStopped(ignored): - seenFailures[0].trap(error.ConnectionDone) - self.assertEqual(seenConnectors, [connector, connector]) - d.addCallback(clientFactoryStopped) - return d - - - def test_userFail(self): - """ - Calling L{IConnector.stopConnecting} in C{Factory.startedConnecting} - results in C{Factory.clientConnectionFailed} being called with - L{error.UserError} as the reason. - """ - serverFactory = MyServerFactory() - tcpPort = reactor.listenTCP(0, serverFactory, interface="127.0.0.1") - self.addCleanup(tcpPort.stopListening) - portNumber = tcpPort.getHost().port - - def startedConnecting(connector): - connector.stopConnecting() - - clientFactory = ClientStartStopFactory() - clientFactory.startedConnecting = startedConnecting - reactor.connectTCP("127.0.0.1", portNumber, clientFactory) - - d = loopUntil(lambda: clientFactory.stopped) - def check(ignored): - self.assertEquals(clientFactory.failed, 1) - clientFactory.reason.trap(error.UserError) - return d.addCallback(check) - - - def test_reconnect(self): - """ - Calling L{IConnector.connect} in C{Factory.clientConnectionLost} causes - a new connection attempt to be made. - """ - serverFactory = ClosingFactory() - tcpPort = reactor.listenTCP(0, serverFactory, interface="127.0.0.1") - self.addCleanup(tcpPort.stopListening) - portNumber = tcpPort.getHost().port - serverFactory.port = tcpPort - - clientFactory = MyClientFactory() - - def clientConnectionLost(connector, reason): - connector.connect() - clientFactory.clientConnectionLost = clientConnectionLost - reactor.connectTCP("127.0.0.1", portNumber, clientFactory) - - d = loopUntil(lambda: clientFactory.failed) - def reconnectFailed(ignored): - p = clientFactory.protocol - self.assertEqual((p.made, p.closed), (1, 1)) - clientFactory.reason.trap(error.ConnectionRefusedError) - self.assertEqual(clientFactory.stopped, 1) - return d.addCallback(reconnectFailed) - - - -class CannotBindTestCase(unittest.TestCase): - """ - Tests for correct behavior when a reactor cannot bind to the required TCP - port. - """ - - def test_cannotBind(self): - """ - L{IReactorTCP.listenTCP} raises L{error.CannotListenError} if the - address to listen on is already in use. - """ - f = MyServerFactory() - - p1 = reactor.listenTCP(0, f, interface='127.0.0.1') - self.addCleanup(p1.stopListening) - n = p1.getHost().port - dest = p1.getHost() - self.assertEquals(dest.type, "TCP") - self.assertEquals(dest.host, "127.0.0.1") - self.assertEquals(dest.port, n) - - # make sure new listen raises error - self.assertRaises(error.CannotListenError, - reactor.listenTCP, n, f, interface='127.0.0.1') - - - - def _fireWhenDoneFunc(self, d, f): - """Returns closure that when called calls f and then callbacks d. - """ - from twisted.python import util as tputil - def newf(*args, **kw): - rtn = f(*args, **kw) - d.callback('') - return rtn - return tputil.mergeFunctionMetadata(f, newf) - - - def test_clientBind(self): - """ - L{IReactorTCP.connectTCP} calls C{Factory.clientConnectionFailed} with - L{error.ConnectBindError} if the bind address specified is already in - use. - """ - theDeferred = defer.Deferred() - sf = MyServerFactory() - sf.startFactory = self._fireWhenDoneFunc(theDeferred, sf.startFactory) - p = reactor.listenTCP(0, sf, interface="127.0.0.1") - self.addCleanup(p.stopListening) - - def _connect1(results): - d = defer.Deferred() - cf1 = MyClientFactory() - cf1.buildProtocol = self._fireWhenDoneFunc(d, cf1.buildProtocol) - reactor.connectTCP("127.0.0.1", p.getHost().port, cf1, - bindAddress=("127.0.0.1", 0)) - d.addCallback(_conmade, cf1) - return d - - def _conmade(results, cf1): - d = defer.Deferred() - cf1.protocol.connectionMade = self._fireWhenDoneFunc( - d, cf1.protocol.connectionMade) - d.addCallback(_check1connect2, cf1) - return d - - def _check1connect2(results, cf1): - self.assertEquals(cf1.protocol.made, 1) - - d1 = defer.Deferred() - d2 = defer.Deferred() - port = cf1.protocol.transport.getHost().port - cf2 = MyClientFactory() - cf2.clientConnectionFailed = self._fireWhenDoneFunc( - d1, cf2.clientConnectionFailed) - cf2.stopFactory = self._fireWhenDoneFunc(d2, cf2.stopFactory) - reactor.connectTCP("127.0.0.1", p.getHost().port, cf2, - bindAddress=("127.0.0.1", port)) - d1.addCallback(_check2failed, cf1, cf2) - d2.addCallback(_check2stopped, cf1, cf2) - dl = defer.DeferredList([d1, d2]) - dl.addCallback(_stop, cf1, cf2) - return dl - - def _check2failed(results, cf1, cf2): - self.assertEquals(cf2.failed, 1) - cf2.reason.trap(error.ConnectBindError) - self.assertTrue(cf2.reason.check(error.ConnectBindError)) - return results - - def _check2stopped(results, cf1, cf2): - self.assertEquals(cf2.stopped, 1) - return results - - def _stop(results, cf1, cf2): - d = defer.Deferred() - d.addCallback(_check1cleanup, cf1) - cf1.stopFactory = self._fireWhenDoneFunc(d, cf1.stopFactory) - cf1.protocol.transport.loseConnection() - return d - - def _check1cleanup(results, cf1): - self.assertEquals(cf1.stopped, 1) - - theDeferred.addCallback(_connect1) - return theDeferred - - - -class MyOtherClientFactory(protocol.ClientFactory): - def buildProtocol(self, address): - self.address = address - self.protocol = MyProtocol() - return self.protocol - - - -class LocalRemoteAddressTestCase(unittest.TestCase): - """ - Tests for correct getHost/getPeer values and that the correct address is - passed to buildProtocol. - """ - def test_hostAddress(self): - """ - L{IListeningPort.getHost} returns the same address as a client - connection's L{ITCPTransport.getPeer}. - """ - f1 = MyServerFactory() - p1 = reactor.listenTCP(0, f1, interface='127.0.0.1') - self.addCleanup(p1.stopListening) - n = p1.getHost().port - - f2 = MyOtherClientFactory() - p2 = reactor.connectTCP('127.0.0.1', n, f2) - - d = loopUntil(lambda :p2.state == "connected") - def check(ignored): - self.assertEquals(p1.getHost(), f2.address) - self.assertEquals(p1.getHost(), f2.protocol.transport.getPeer()) - return p1.stopListening() - def cleanup(ignored): - p2.transport.loseConnection() - return d.addCallback(check).addCallback(cleanup) - - -class WriterProtocol(protocol.Protocol): - def connectionMade(self): - # use everything ITransport claims to provide. If something here - # fails, the exception will be written to the log, but it will not - # directly flunk the test. The test will fail when maximum number of - # iterations have passed and the writer's factory.done has not yet - # been set. - self.transport.write("Hello Cleveland!\n") - seq = ["Goodbye", " cruel", " world", "\n"] - self.transport.writeSequence(seq) - peer = self.transport.getPeer() - if peer.type != "TCP": - print "getPeer returned non-TCP socket:", peer - self.factory.problem = 1 - us = self.transport.getHost() - if us.type != "TCP": - print "getHost returned non-TCP socket:", us - self.factory.problem = 1 - self.factory.done = 1 - - self.transport.loseConnection() - -class ReaderProtocol(protocol.Protocol): - def dataReceived(self, data): - self.factory.data += data - def connectionLost(self, reason): - self.factory.done = 1 - -class WriterClientFactory(protocol.ClientFactory): - def __init__(self): - self.done = 0 - self.data = "" - def buildProtocol(self, addr): - p = ReaderProtocol() - p.factory = self - self.protocol = p - return p - -class WriteDataTestCase(unittest.TestCase): - """ - Test that connected TCP sockets can actually write data. Try to exercise - the entire ITransport interface. - """ - - def test_writer(self): - """ - L{ITCPTransport.write} and L{ITCPTransport.writeSequence} send bytes to - the other end of the connection. - """ - f = protocol.Factory() - f.protocol = WriterProtocol - f.done = 0 - f.problem = 0 - wrappedF = WiredFactory(f) - p = reactor.listenTCP(0, wrappedF, interface="127.0.0.1") - self.addCleanup(p.stopListening) - n = p.getHost().port - clientF = WriterClientFactory() - wrappedClientF = WiredFactory(clientF) - reactor.connectTCP("127.0.0.1", n, wrappedClientF) - - def check(ignored): - self.failUnless(f.done, "writer didn't finish, it probably died") - self.failUnless(f.problem == 0, "writer indicated an error") - self.failUnless(clientF.done, - "client didn't see connection dropped") - expected = "".join(["Hello Cleveland!\n", - "Goodbye", " cruel", " world", "\n"]) - self.failUnless(clientF.data == expected, - "client didn't receive all the data it expected") - d = defer.gatherResults([wrappedF.onDisconnect, - wrappedClientF.onDisconnect]) - return d.addCallback(check) - - - def test_writeAfterShutdownWithoutReading(self): - """ - A TCP transport which is written to after the connection has been shut - down should notify its protocol that the connection has been lost, even - if the TCP transport is not actively being monitored for read events - (ie, pauseProducing was called on it). - """ - # This is an unpleasant thing. Generally tests shouldn't skip or - # run based on the name of the reactor being used (most tests - # shouldn't care _at all_ what reactor is being used, in fact). The - # Gtk reactor cannot pass this test, though, because it fails to - # implement IReactorTCP entirely correctly. Gtk is quite old at - # this point, so it's more likely that gtkreactor will be deprecated - # and removed rather than fixed to handle this case correctly. - # Since this is a pre-existing (and very long-standing) issue with - # the Gtk reactor, there's no reason for it to prevent this test - # being added to exercise the other reactors, for which the behavior - # was also untested but at least works correctly (now). See #2833 - # for information on the status of gtkreactor. - if reactor.__class__.__name__ == 'IOCPReactor': - raise unittest.SkipTest( - "iocpreactor does not, in fact, stop reading immediately after " - "pauseProducing is called. This results in a bonus disconnection " - "notification. Under some circumstances, it might be possible to " - "not receive this notifications (specifically, pauseProducing, " - "deliver some data, proceed with this test).") - if reactor.__class__.__name__ == 'GtkReactor': - raise unittest.SkipTest( - "gtkreactor does not implement unclean disconnection " - "notification correctly. This might more properly be " - "a todo, but due to technical limitations it cannot be.") - - # Called back after the protocol for the client side of the connection - # has paused its transport, preventing it from reading, therefore - # preventing it from noticing the disconnection before the rest of the - # actions which are necessary to trigger the case this test is for have - # been taken. - clientPaused = defer.Deferred() - - # Called back when the protocol for the server side of the connection - # has received connection lost notification. - serverLost = defer.Deferred() - - class Disconnecter(protocol.Protocol): - """ - Protocol for the server side of the connection which disconnects - itself in a callback on clientPaused and publishes notification - when its connection is actually lost. - """ - def connectionMade(self): - """ - Set up a callback on clientPaused to lose the connection. - """ - msg('Disconnector.connectionMade') - def disconnect(ignored): - msg('Disconnector.connectionMade disconnect') - self.transport.loseConnection() - msg('loseConnection called') - clientPaused.addCallback(disconnect) - - def connectionLost(self, reason): - """ - Notify observers that the server side of the connection has - ended. - """ - msg('Disconnecter.connectionLost') - serverLost.callback(None) - msg('serverLost called back') - - # Create the server port to which a connection will be made. - server = protocol.ServerFactory() - server.protocol = Disconnecter - port = reactor.listenTCP(0, server, interface='127.0.0.1') - self.addCleanup(port.stopListening) - addr = port.getHost() - - class Infinite(object): - """ - A producer which will write to its consumer as long as - resumeProducing is called. - - @ivar consumer: The L{IConsumer} which will be written to. - """ - implements(IPullProducer) - - def __init__(self, consumer): - self.consumer = consumer - - def resumeProducing(self): - msg('Infinite.resumeProducing') - self.consumer.write('x') - msg('Infinite.resumeProducing wrote to consumer') - - def stopProducing(self): - msg('Infinite.stopProducing') - - - class UnreadingWriter(protocol.Protocol): - """ - Trivial protocol which pauses its transport immediately and then - writes some bytes to it. - """ - def connectionMade(self): - msg('UnreadingWriter.connectionMade') - self.transport.pauseProducing() - clientPaused.callback(None) - msg('clientPaused called back') - def write(ignored): - msg('UnreadingWriter.connectionMade write') - # This needs to be enough bytes to spill over into the - # userspace Twisted send buffer - if it all fits into - # the kernel, Twisted won't even poll for OUT events, - # which means it won't poll for any events at all, so - # the disconnection is never noticed. This is due to - # #1662. When #1662 is fixed, this test will likely - # need to be adjusted, otherwise connection lost - # notification will happen too soon and the test will - # probably begin to fail with ConnectionDone instead of - # ConnectionLost (in any case, it will no longer be - # entirely correct). - producer = Infinite(self.transport) - msg('UnreadingWriter.connectionMade write created producer') - self.transport.registerProducer(producer, False) - msg('UnreadingWriter.connectionMade write registered producer') - serverLost.addCallback(write) - - # Create the client and initiate the connection - client = MyClientFactory() - client.protocolFactory = UnreadingWriter - clientConnectionLost = client.deferred - def cbClientLost(ignored): - msg('cbClientLost') - return client.lostReason - clientConnectionLost.addCallback(cbClientLost) - msg('Connecting to %s:%s' % (addr.host, addr.port)) - connector = reactor.connectTCP(addr.host, addr.port, client) - - # By the end of the test, the client should have received notification - # of unclean disconnection. - msg('Returning Deferred') - return self.assertFailure(clientConnectionLost, error.ConnectionLost) - - - -class ConnectionLosingProtocol(protocol.Protocol): - def connectionMade(self): - self.transport.write("1") - self.transport.loseConnection() - self.master._connectionMade() - self.master.ports.append(self.transport) - - - -class NoopProtocol(protocol.Protocol): - def connectionMade(self): - self.d = defer.Deferred() - self.master.serverConns.append(self.d) - - def connectionLost(self, reason): - self.d.callback(True) - - - -class ConnectionLostNotifyingProtocol(protocol.Protocol): - """ - Protocol which fires a Deferred which was previously passed to - its initializer when the connection is lost. - """ - def __init__(self, onConnectionLost): - self.onConnectionLost = onConnectionLost - - - def connectionLost(self, reason): - self.onConnectionLost.callback(self) - - - -class HandleSavingProtocol(ConnectionLostNotifyingProtocol): - """ - Protocol which grabs the platform-specific socket handle and - saves it as an attribute on itself when the connection is - established. - """ - def makeConnection(self, transport): - """ - Save the platform-specific socket handle for future - introspection. - """ - self.handle = transport.getHandle() - return protocol.Protocol.makeConnection(self, transport) - - - -class ProperlyCloseFilesMixin: - """ - Tests for platform resources properly being cleaned up. - """ - def createServer(self, address, portNumber, factory): - """ - Bind a server port to which connections will be made. The server - should use the given protocol factory. - - @return: The L{IListeningPort} for the server created. - """ - raise NotImplementedError() - - - def connectClient(self, address, portNumber, clientCreator): - """ - Establish a connection to the given address using the given - L{ClientCreator} instance. - - @return: A Deferred which will fire with the connected protocol instance. - """ - raise NotImplementedError() - - - def getHandleExceptionType(self): - """ - Return the exception class which will be raised when an operation is - attempted on a closed platform handle. - """ - raise NotImplementedError() - - - def getHandleErrorCode(self): - """ - Return the errno expected to result from writing to a closed - platform socket handle. - """ - # These platforms have been seen to give EBADF: - # - # Linux 2.4.26, Linux 2.6.15, OS X 10.4, FreeBSD 5.4 - # Windows 2000 SP 4, Windows XP SP 2 - return errno.EBADF - - - def test_properlyCloseFiles(self): - """ - Test that lost connections properly have their underlying socket - resources cleaned up. - """ - onServerConnectionLost = defer.Deferred() - serverFactory = protocol.ServerFactory() - serverFactory.protocol = lambda: ConnectionLostNotifyingProtocol( - onServerConnectionLost) - serverPort = self.createServer('127.0.0.1', 0, serverFactory) - - onClientConnectionLost = defer.Deferred() - serverAddr = serverPort.getHost() - clientCreator = protocol.ClientCreator( - reactor, lambda: HandleSavingProtocol(onClientConnectionLost)) - clientDeferred = self.connectClient( - serverAddr.host, serverAddr.port, clientCreator) - - def clientConnected(client): - """ - Disconnect the client. Return a Deferred which fires when both - the client and the server have received disconnect notification. - """ - client.transport.loseConnection() - return defer.gatherResults([ - onClientConnectionLost, onServerConnectionLost]) - clientDeferred.addCallback(clientConnected) - - def clientDisconnected((client, server)): - """ - Verify that the underlying platform socket handle has been - cleaned up. - """ - expectedErrorCode = self.getHandleErrorCode() - err = self.assertRaises( - self.getHandleExceptionType(), client.handle.send, 'bytes') - self.assertEqual(err.args[0], expectedErrorCode) - clientDeferred.addCallback(clientDisconnected) - - def cleanup(passthrough): - """ - Shut down the server port. Return a Deferred which fires when - this has completed. - """ - result = defer.maybeDeferred(serverPort.stopListening) - result.addCallback(lambda ign: passthrough) - return result - clientDeferred.addBoth(cleanup) - - return clientDeferred - - - -class ProperlyCloseFilesTestCase(unittest.TestCase, ProperlyCloseFilesMixin): - def createServer(self, address, portNumber, factory): - return reactor.listenTCP(portNumber, factory, interface=address) - - - def connectClient(self, address, portNumber, clientCreator): - return clientCreator.connectTCP(address, portNumber) - - - def getHandleExceptionType(self): - return socket.error - - - -class WiredForDeferreds(policies.ProtocolWrapper): - def __init__(self, factory, wrappedProtocol): - policies.ProtocolWrapper.__init__(self, factory, wrappedProtocol) - - def connectionMade(self): - policies.ProtocolWrapper.connectionMade(self) - self.factory.onConnect.callback(None) - - def connectionLost(self, reason): - policies.ProtocolWrapper.connectionLost(self, reason) - self.factory.onDisconnect.callback(None) - - - -class WiredFactory(policies.WrappingFactory): - protocol = WiredForDeferreds - - def __init__(self, wrappedFactory): - policies.WrappingFactory.__init__(self, wrappedFactory) - self.onConnect = defer.Deferred() - self.onDisconnect = defer.Deferred() - - - -class AddressTestCase(unittest.TestCase): - """ - Tests for address-related interactions with client and server protocols. - """ - def setUp(self): - """ - Create a port and connected client/server pair which can be used - to test factory behavior related to addresses. - - @return: A L{defer.Deferred} which will be called back when both the - client and server protocols have received their connection made - callback. - """ - class RememberingWrapper(protocol.ClientFactory): - """ - Simple wrapper factory which records the addresses which are - passed to its L{buildProtocol} method and delegates actual - protocol creation to another factory. - - @ivar addresses: A list of the objects passed to buildProtocol. - @ivar factory: The wrapped factory to which protocol creation is - delegated. - """ - def __init__(self, factory): - self.addresses = [] - self.factory = factory - - # Only bother to pass on buildProtocol calls to the wrapped - # factory - doStart, doStop, etc aren't necessary for this test - # to pass. - def buildProtocol(self, addr): - """ - Append the given address to C{self.addresses} and forward - the call to C{self.factory}. - """ - self.addresses.append(addr) - return self.factory.buildProtocol(addr) - - # Make a server which we can receive connection and disconnection - # notification for, and which will record the address passed to its - # buildProtocol. - self.server = MyServerFactory() - self.serverConnMade = self.server.protocolConnectionMade = defer.Deferred() - self.serverConnLost = self.server.protocolConnectionLost = defer.Deferred() - # RememberingWrapper is a ClientFactory, but ClientFactory is-a - # ServerFactory, so this is okay. - self.serverWrapper = RememberingWrapper(self.server) - - # Do something similar for a client. - self.client = MyClientFactory() - self.clientConnMade = self.client.protocolConnectionMade = defer.Deferred() - self.clientConnLost = self.client.protocolConnectionLost = defer.Deferred() - self.clientWrapper = RememberingWrapper(self.client) - - self.port = reactor.listenTCP(0, self.serverWrapper, interface='127.0.0.1') - self.connector = reactor.connectTCP( - self.port.getHost().host, self.port.getHost().port, self.clientWrapper) - - return defer.gatherResults([self.serverConnMade, self.clientConnMade]) - - - def tearDown(self): - """ - Disconnect the client/server pair and shutdown the port created in - L{setUp}. - """ - self.connector.disconnect() - return defer.gatherResults([ - self.serverConnLost, self.clientConnLost, - defer.maybeDeferred(self.port.stopListening)]) - - - def test_buildProtocolClient(self): - """ - L{ClientFactory.buildProtocol} should be invoked with the address of - the server to which a connection has been established, which should - be the same as the address reported by the C{getHost} method of the - transport of the server protocol and as the C{getPeer} method of the - transport of the client protocol. - """ - serverHost = self.server.protocol.transport.getHost() - clientPeer = self.client.protocol.transport.getPeer() - - self.assertEqual( - self.clientWrapper.addresses, - [IPv4Address('TCP', serverHost.host, serverHost.port)]) - self.assertEqual( - self.clientWrapper.addresses, - [IPv4Address('TCP', clientPeer.host, clientPeer.port)]) - - - def test_buildProtocolServer(self): - """ - L{ServerFactory.buildProtocol} should be invoked with the address of - the client which has connected to the port the factory is listening on, - which should be the same as the address reported by the C{getPeer} - method of the transport of the server protocol and as the C{getHost} - method of the transport of the client protocol. - """ - clientHost = self.client.protocol.transport.getHost() - serverPeer = self.server.protocol.transport.getPeer() - - self.assertEqual( - self.serverWrapper.addresses, - [IPv4Address('TCP', serverPeer.host, serverPeer.port)]) - self.assertEqual( - self.serverWrapper.addresses, - [IPv4Address('TCP', clientHost.host, clientHost.port)]) - - - -class LargeBufferWriterProtocol(protocol.Protocol): - - # Win32 sockets cannot handle single huge chunks of bytes. Write one - # massive string to make sure Twisted deals with this fact. - - def connectionMade(self): - # write 60MB - self.transport.write('X'*self.factory.len) - self.factory.done = 1 - self.transport.loseConnection() - -class LargeBufferReaderProtocol(protocol.Protocol): - def dataReceived(self, data): - self.factory.len += len(data) - def connectionLost(self, reason): - self.factory.done = 1 - -class LargeBufferReaderClientFactory(protocol.ClientFactory): - def __init__(self): - self.done = 0 - self.len = 0 - def buildProtocol(self, addr): - p = LargeBufferReaderProtocol() - p.factory = self - self.protocol = p - return p - - -class FireOnClose(policies.ProtocolWrapper): - """A wrapper around a protocol that makes it fire a deferred when - connectionLost is called. - """ - def connectionLost(self, reason): - policies.ProtocolWrapper.connectionLost(self, reason) - self.factory.deferred.callback(None) - - -class FireOnCloseFactory(policies.WrappingFactory): - protocol = FireOnClose - - def __init__(self, wrappedFactory): - policies.WrappingFactory.__init__(self, wrappedFactory) - self.deferred = defer.Deferred() - - -class LargeBufferTestCase(unittest.TestCase): - """Test that buffering large amounts of data works. - """ - - datalen = 60*1024*1024 - def testWriter(self): - f = protocol.Factory() - f.protocol = LargeBufferWriterProtocol - f.done = 0 - f.problem = 0 - f.len = self.datalen - wrappedF = FireOnCloseFactory(f) - p = reactor.listenTCP(0, wrappedF, interface="127.0.0.1") - self.addCleanup(p.stopListening) - n = p.getHost().port - clientF = LargeBufferReaderClientFactory() - wrappedClientF = FireOnCloseFactory(clientF) - reactor.connectTCP("127.0.0.1", n, wrappedClientF) - - d = defer.gatherResults([wrappedF.deferred, wrappedClientF.deferred]) - def check(ignored): - self.failUnless(f.done, "writer didn't finish, it probably died") - self.failUnless(clientF.len == self.datalen, - "client didn't receive all the data it expected " - "(%d != %d)" % (clientF.len, self.datalen)) - self.failUnless(clientF.done, - "client didn't see connection dropped") - return d.addCallback(check) - - -class MyHCProtocol(MyProtocol): - - implements(IHalfCloseableProtocol) - - readHalfClosed = False - writeHalfClosed = False - - def readConnectionLost(self): - self.readHalfClosed = True - # Invoke notification logic from the base class to simplify testing. - if self.writeHalfClosed: - self.connectionLost(None) - - def writeConnectionLost(self): - self.writeHalfClosed = True - # Invoke notification logic from the base class to simplify testing. - if self.readHalfClosed: - self.connectionLost(None) - - -class MyHCFactory(protocol.ServerFactory): - - called = 0 - protocolConnectionMade = None - - def buildProtocol(self, addr): - self.called += 1 - p = MyHCProtocol() - p.factory = self - self.protocol = p - return p - - -class HalfCloseTestCase(unittest.TestCase): - """Test half-closing connections.""" - - def setUp(self): - self.f = f = MyHCFactory() - self.p = p = reactor.listenTCP(0, f, interface="127.0.0.1") - self.addCleanup(p.stopListening) - d = loopUntil(lambda :p.connected) - - self.cf = protocol.ClientCreator(reactor, MyHCProtocol) - - d.addCallback(lambda _: self.cf.connectTCP(p.getHost().host, - p.getHost().port)) - d.addCallback(self._setUp) - return d - - def _setUp(self, client): - self.client = client - self.clientProtoConnectionLost = self.client.closedDeferred = defer.Deferred() - self.assertEquals(self.client.transport.connected, 1) - # Wait for the server to notice there is a connection, too. - return loopUntil(lambda: getattr(self.f, 'protocol', None) is not None) - - def tearDown(self): - self.assertEquals(self.client.closed, 0) - self.client.transport.loseConnection() - d = defer.maybeDeferred(self.p.stopListening) - d.addCallback(lambda ign: self.clientProtoConnectionLost) - d.addCallback(self._tearDown) - return d - - def _tearDown(self, ignored): - self.assertEquals(self.client.closed, 1) - # because we did half-close, the server also needs to - # closed explicitly. - self.assertEquals(self.f.protocol.closed, 0) - d = defer.Deferred() - def _connectionLost(reason): - self.f.protocol.closed = 1 - d.callback(None) - self.f.protocol.connectionLost = _connectionLost - self.f.protocol.transport.loseConnection() - d.addCallback(lambda x:self.assertEquals(self.f.protocol.closed, 1)) - return d - - def testCloseWriteCloser(self): - client = self.client - f = self.f - t = client.transport - - t.write("hello") - d = loopUntil(lambda :len(t._tempDataBuffer) == 0) - def loseWrite(ignored): - t.loseWriteConnection() - return loopUntil(lambda :t._writeDisconnected) - def check(ignored): - self.assertEquals(client.closed, False) - self.assertEquals(client.writeHalfClosed, True) - self.assertEquals(client.readHalfClosed, False) - return loopUntil(lambda :f.protocol.readHalfClosed) - def write(ignored): - w = client.transport.write - w(" world") - w("lalala fooled you") - self.assertEquals(0, len(client.transport._tempDataBuffer)) - self.assertEquals(f.protocol.data, "hello") - self.assertEquals(f.protocol.closed, False) - self.assertEquals(f.protocol.readHalfClosed, True) - return d.addCallback(loseWrite).addCallback(check).addCallback(write) - - def testWriteCloseNotification(self): - f = self.f - f.protocol.transport.loseWriteConnection() - - d = defer.gatherResults([ - loopUntil(lambda :f.protocol.writeHalfClosed), - loopUntil(lambda :self.client.readHalfClosed)]) - d.addCallback(lambda _: self.assertEquals( - f.protocol.readHalfClosed, False)) - return d - - -class HalfClose2TestCase(unittest.TestCase): - - def setUp(self): - self.f = f = MyServerFactory() - self.f.protocolConnectionMade = defer.Deferred() - self.p = p = reactor.listenTCP(0, f, interface="127.0.0.1") - - # XXX we don't test server side yet since we don't do it yet - d = protocol.ClientCreator(reactor, MyProtocol).connectTCP( - p.getHost().host, p.getHost().port) - d.addCallback(self._gotClient) - return d - - def _gotClient(self, client): - self.client = client - # Now wait for the server to catch up - it doesn't matter if this - # Deferred has already fired and gone away, in that case we'll - # return None and not wait at all, which is precisely correct. - return self.f.protocolConnectionMade - - def tearDown(self): - self.client.transport.loseConnection() - return self.p.stopListening() - - def testNoNotification(self): - """ - TCP protocols support half-close connections, but not all of them - support being notified of write closes. In this case, test that - half-closing the connection causes the peer's connection to be - closed. - """ - self.client.transport.write("hello") - self.client.transport.loseWriteConnection() - self.f.protocol.closedDeferred = d = defer.Deferred() - self.client.closedDeferred = d2 = defer.Deferred() - d.addCallback(lambda x: - self.assertEqual(self.f.protocol.data, 'hello')) - d.addCallback(lambda x: self.assertEqual(self.f.protocol.closed, True)) - return defer.gatherResults([d, d2]) - - def testShutdownException(self): - """ - If the other side has already closed its connection, - loseWriteConnection should pass silently. - """ - self.f.protocol.transport.loseConnection() - self.client.transport.write("X") - self.client.transport.loseWriteConnection() - self.f.protocol.closedDeferred = d = defer.Deferred() - self.client.closedDeferred = d2 = defer.Deferred() - d.addCallback(lambda x: - self.failUnlessEqual(self.f.protocol.closed, True)) - return defer.gatherResults([d, d2]) - - -class HalfCloseBuggyApplicationTests(unittest.TestCase): - """ - Test half-closing connections where notification code has bugs. - """ - - def setUp(self): - """ - Set up a server and connect a client to it. Return a Deferred which - only fires once this is done. - """ - self.serverFactory = MyHCFactory() - self.serverFactory.protocolConnectionMade = defer.Deferred() - self.port = reactor.listenTCP( - 0, self.serverFactory, interface="127.0.0.1") - self.addCleanup(self.port.stopListening) - addr = self.port.getHost() - creator = protocol.ClientCreator(reactor, MyHCProtocol) - clientDeferred = creator.connectTCP(addr.host, addr.port) - def setClient(clientProtocol): - self.clientProtocol = clientProtocol - clientDeferred.addCallback(setClient) - return defer.gatherResults([ - self.serverFactory.protocolConnectionMade, - clientDeferred]) - - - def aBug(self, *args): - """ - Fake implementation of a callback which illegally raises an - exception. - """ - raise RuntimeError("ONO I AM BUGGY CODE") - - - def _notificationRaisesTest(self): - """ - Helper for testing that an exception is logged by the time the - client protocol loses its connection. - """ - closed = self.clientProtocol.closedDeferred = defer.Deferred() - self.clientProtocol.transport.loseWriteConnection() - def check(ignored): - errors = self.flushLoggedErrors(RuntimeError) - self.assertEqual(len(errors), 1) - closed.addCallback(check) - return closed - - - def test_readNotificationRaises(self): - """ - If C{readConnectionLost} raises an exception when the transport - calls it to notify the protocol of that event, the exception should - be logged and the protocol should be disconnected completely. - """ - self.serverFactory.protocol.readConnectionLost = self.aBug - return self._notificationRaisesTest() - - - def test_writeNotificationRaises(self): - """ - If C{writeConnectionLost} raises an exception when the transport - calls it to notify the protocol of that event, the exception should - be logged and the protocol should be disconnected completely. - """ - self.clientProtocol.writeConnectionLost = self.aBug - return self._notificationRaisesTest() - - - -class LogTestCase(unittest.TestCase): - """ - Test logging facility of TCP base classes. - """ - - def test_logstrClientSetup(self): - """ - Check that the log customization of the client transport happens - once the client is connected. - """ - server = MyServerFactory() - - client = MyClientFactory() - client.protocolConnectionMade = defer.Deferred() - - port = reactor.listenTCP(0, server, interface='127.0.0.1') - self.addCleanup(port.stopListening) - - connector = reactor.connectTCP( - port.getHost().host, port.getHost().port, client) - self.addCleanup(connector.disconnect) - - # It should still have the default value - self.assertEquals(connector.transport.logstr, - "Uninitialized") - - def cb(ign): - self.assertEquals(connector.transport.logstr, - "MyProtocol,client") - client.protocolConnectionMade.addCallback(cb) - return client.protocolConnectionMade - - - -class PauseProducingTestCase(unittest.TestCase): - """ - Test some behaviors of pausing the production of a transport. - """ - - def test_pauseProducingInConnectionMade(self): - """ - In C{connectionMade} of a client protocol, C{pauseProducing} used to be - ignored: this test is here to ensure it's not ignored. - """ - server = MyServerFactory() - - client = MyClientFactory() - client.protocolConnectionMade = defer.Deferred() - - port = reactor.listenTCP(0, server, interface='127.0.0.1') - self.addCleanup(port.stopListening) - - connector = reactor.connectTCP( - port.getHost().host, port.getHost().port, client) - self.addCleanup(connector.disconnect) - - def checkInConnectionMade(proto): - tr = proto.transport - # The transport should already be monitored - self.assertIn(tr, reactor.getReaders() + - reactor.getWriters()) - proto.transport.pauseProducing() - self.assertNotIn(tr, reactor.getReaders() + - reactor.getWriters()) - d = defer.Deferred() - d.addCallback(checkAfterConnectionMade) - reactor.callLater(0, d.callback, proto) - return d - def checkAfterConnectionMade(proto): - tr = proto.transport - # The transport should still not be monitored - self.assertNotIn(tr, reactor.getReaders() + - reactor.getWriters()) - client.protocolConnectionMade.addCallback(checkInConnectionMade) - return client.protocolConnectionMade - - if not interfaces.IReactorFDSet.providedBy(reactor): - test_pauseProducingInConnectionMade.skip = "Reactor not providing IReactorFDSet" - - - -class CallBackOrderTestCase(unittest.TestCase): - """ - Test the order of reactor callbacks - """ - - def test_loseOrder(self): - """ - Check that Protocol.connectionLost is called before factory's - clientConnectionLost - """ - server = MyServerFactory() - server.protocolConnectionMade = (defer.Deferred() - .addCallback(lambda proto: self.addCleanup( - proto.transport.loseConnection))) - - client = MyClientFactory() - client.protocolConnectionLost = defer.Deferred() - client.protocolConnectionMade = defer.Deferred() - - def _cbCM(res): - """ - protocol.connectionMade callback - """ - reactor.callLater(0, client.protocol.transport.loseConnection) - - client.protocolConnectionMade.addCallback(_cbCM) - - port = reactor.listenTCP(0, server, interface='127.0.0.1') - self.addCleanup(port.stopListening) - - connector = reactor.connectTCP( - port.getHost().host, port.getHost().port, client) - self.addCleanup(connector.disconnect) - - def _cbCCL(res): - """ - factory.clientConnectionLost callback - """ - return 'CCL' - - def _cbCL(res): - """ - protocol.connectionLost callback - """ - return 'CL' - - def _cbGather(res): - self.assertEquals(res, ['CL', 'CCL']) - - d = defer.gatherResults([ - client.protocolConnectionLost.addCallback(_cbCL), - client.deferred.addCallback(_cbCCL)]) - return d.addCallback(_cbGather) - - - -try: - import resource -except ImportError: - pass -else: - numRounds = resource.getrlimit(resource.RLIMIT_NOFILE)[0] + 10 - ProperlyCloseFilesTestCase.numberRounds = numRounds |