diff options
Diffstat (limited to 'third_party/pyftpdlib/test/test_ftpd.py')
-rw-r--r-- | third_party/pyftpdlib/test/test_ftpd.py | 1623 |
1 files changed, 0 insertions, 1623 deletions
diff --git a/third_party/pyftpdlib/test/test_ftpd.py b/third_party/pyftpdlib/test/test_ftpd.py deleted file mode 100644 index ffd8557..0000000 --- a/third_party/pyftpdlib/test/test_ftpd.py +++ /dev/null @@ -1,1623 +0,0 @@ -#!/usr/bin/env python -# test_ftpd.py - -# ====================================================================== -# Copyright (C) 2007 Giampaolo Rodola' <g.rodola@gmail.com> -# -# All Rights Reserved -# -# Permission to use, copy, modify, and distribute this software and -# its documentation for any purpose and without fee is hereby -# granted, provided that the above copyright notice appear in all -# copies and that both that copyright notice and this permission -# notice appear in supporting documentation, and that the name of -# Giampaolo Rodola' not be used in advertising or publicity pertaining to -# distribution of the software without specific, written prior -# permission. -# -# Giampaolo Rodola' DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, -# INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN -# NO EVENT Giampaolo Rodola' BE LIABLE FOR ANY SPECIAL, INDIRECT OR -# CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS -# OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, -# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN -# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. -# ====================================================================== - - -# This test suite has been run successfully on the following systems: -# -# ----------------------------------------------------------- -# System | Python version -# ----------------------------------------------------------- -# Linux Ubuntu 2.6.20-15 | 2.4, 2.5 -# Linux Kubuntu 8.04 32 & 64 bits | 2.5.2 -# Linux Debian 2.4.27-2-386 | 2.3.5 -# Windows XP prof SP3 | 2.3, 2.4, 2.5, 2.6-RC2 -# Windows Vista Ultimate 64 bit | 2.5.1 -# Windows Vista Business 32 bit | 2.5.1 -# Windows Server 2008 64bit | 2.5.1 -# Windows Mobile 6.1 | PythonCE 2.5 -# OS X 10.4.10 | 2.3, 2.4, 2.5 -# FreeBSD 7.0 | 2.4, 2.5 -# ----------------------------------------------------------- - - -import threading -import unittest -import socket -import os -import time -import re -import tempfile -import ftplib -import random -import warnings -import sys - -from pyftpdlib import ftpserver - - -__release__ = 'pyftpdlib 0.5.0' - -# Attempt to use IP rather than hostname (test suite will run a lot faster) -try: - HOST = socket.gethostbyname('localhost') -except socket.error: - HOST = 'localhost' -USER = 'user' -PASSWD = '12345' -HOME = os.getcwd() -try: - from test.test_support import TESTFN -except ImportError: - TESTFN = 'temp-fname' -TESTFN2 = TESTFN + '2' -TESTFN3 = TESTFN + '3' - -def try_address(host, port=0): - """Try to bind a daemon on the given host:port and return True - if that has been possible.""" - try: - ftpserver.FTPServer((host, port), None) - except socket.error: - return False - else: - return True - -SUPPORTS_IPV4 = try_address('127.0.0.1') -SUPPORTS_IPV6 = socket.has_ipv6 and try_address('::1') - - -class TestAbstractedFS(unittest.TestCase): - """Test for conversion utility methods of AbstractedFS class.""" - - def test_ftpnorm(self): - # Tests for ftpnorm method. - ae = self.assertEquals - fs = ftpserver.AbstractedFS() - - fs.cwd = '/' - ae(fs.ftpnorm(''), '/') - ae(fs.ftpnorm('/'), '/') - ae(fs.ftpnorm('.'), '/') - ae(fs.ftpnorm('..'), '/') - ae(fs.ftpnorm('a'), '/a') - ae(fs.ftpnorm('/a'), '/a') - ae(fs.ftpnorm('/a/'), '/a') - ae(fs.ftpnorm('a/..'), '/') - ae(fs.ftpnorm('a/b'), '/a/b') - ae(fs.ftpnorm('a/b/..'), '/a') - ae(fs.ftpnorm('a/b/../..'), '/') - fs.cwd = '/sub' - ae(fs.ftpnorm(''), '/sub') - ae(fs.ftpnorm('/'), '/') - ae(fs.ftpnorm('.'), '/sub') - ae(fs.ftpnorm('..'), '/') - ae(fs.ftpnorm('a'), '/sub/a') - ae(fs.ftpnorm('a/'), '/sub/a') - ae(fs.ftpnorm('a/..'), '/sub') - ae(fs.ftpnorm('a/b'), '/sub/a/b') - ae(fs.ftpnorm('a/b/'), '/sub/a/b') - ae(fs.ftpnorm('a/b/..'), '/sub/a') - ae(fs.ftpnorm('a/b/../..'), '/sub') - ae(fs.ftpnorm('a/b/../../..'), '/') - ae(fs.ftpnorm('//'), '/') # UNC paths must be collapsed - - def test_ftp2fs(self): - # Tests for ftp2fs method. - ae = self.assertEquals - fs = ftpserver.AbstractedFS() - join = lambda x, y: os.path.join(x, y.replace('/', os.sep)) - - def goforit(root): - fs.root = root - fs.cwd = '/' - ae(fs.ftp2fs(''), root) - ae(fs.ftp2fs('/'), root) - ae(fs.ftp2fs('.'), root) - ae(fs.ftp2fs('..'), root) - ae(fs.ftp2fs('a'), join(root, 'a')) - ae(fs.ftp2fs('/a'), join(root, 'a')) - ae(fs.ftp2fs('/a/'), join(root, 'a')) - ae(fs.ftp2fs('a/..'), root) - ae(fs.ftp2fs('a/b'), join(root, r'a/b')) - ae(fs.ftp2fs('/a/b'), join(root, r'a/b')) - ae(fs.ftp2fs('/a/b/..'), join(root, 'a')) - ae(fs.ftp2fs('/a/b/../..'), root) - fs.cwd = '/sub' - ae(fs.ftp2fs(''), join(root, 'sub')) - ae(fs.ftp2fs('/'), root) - ae(fs.ftp2fs('.'), join(root, 'sub')) - ae(fs.ftp2fs('..'), root) - ae(fs.ftp2fs('a'), join(root, 'sub/a')) - ae(fs.ftp2fs('a/'), join(root, 'sub/a')) - ae(fs.ftp2fs('a/..'), join(root, 'sub')) - ae(fs.ftp2fs('a/b'), join(root, 'sub/a/b')) - ae(fs.ftp2fs('a/b/..'), join(root, 'sub/a')) - ae(fs.ftp2fs('a/b/../..'), join(root, 'sub')) - ae(fs.ftp2fs('a/b/../../..'), root) - ae(fs.ftp2fs('//a'), join(root, 'a')) # UNC paths must be collapsed - - if os.sep == '\\': - goforit(r'C:\dir') - goforit('C:\\') - # on DOS-derived filesystems (e.g. Windows) this is the same - # as specifying the current drive directory (e.g. 'C:\\') - goforit('\\') - elif os.sep == '/': - goforit('/home/user') - goforit('/') - else: - # os.sep == ':'? Don't know... let's try it anyway - goforit(os.getcwd()) - - def test_fs2ftp(self): - # Tests for fs2ftp method. - ae = self.assertEquals - fs = ftpserver.AbstractedFS() - join = lambda x, y: os.path.join(x, y.replace('/', os.sep)) - - def goforit(root): - fs.root = root - ae(fs.fs2ftp(root), '/') - ae(fs.fs2ftp(join(root, '/')), '/') - ae(fs.fs2ftp(join(root, '.')), '/') - ae(fs.fs2ftp(join(root, '..')), '/') # can't escape from root - ae(fs.fs2ftp(join(root, 'a')), '/a') - ae(fs.fs2ftp(join(root, 'a/')), '/a') - ae(fs.fs2ftp(join(root, 'a/..')), '/') - ae(fs.fs2ftp(join(root, 'a/b')), '/a/b') - ae(fs.fs2ftp(join(root, 'a/b')), '/a/b') - ae(fs.fs2ftp(join(root, 'a/b/..')), '/a') - ae(fs.fs2ftp(join(root, '/a/b/../..')), '/') - fs.cwd = '/sub' - ae(fs.fs2ftp(join(root, 'a/')), '/a') - - if os.sep == '\\': - goforit(r'C:\dir') - goforit('C:\\') - # on DOS-derived filesystems (e.g. Windows) this is the same - # as specifying the current drive directory (e.g. 'C:\\') - goforit('\\') - fs.root = r'C:\dir' - ae(fs.fs2ftp('C:\\'), '/') - ae(fs.fs2ftp('D:\\'), '/') - ae(fs.fs2ftp('D:\\dir'), '/') - elif os.sep == '/': - goforit('/') - assert os.path.realpath('/__home/user') == '/__home/user', \ - 'Test skipped (symlinks not allowed).' - goforit('/__home/user') - fs.root = '/__home/user' - ae(fs.fs2ftp('/__home'), '/') - ae(fs.fs2ftp('/'), '/') - ae(fs.fs2ftp('/__home/userx'), '/') - else: - # os.sep == ':'? Don't know... let's try it anyway - goforit(os.getcwd()) - - def test_validpath(self): - # Tests for validpath method. - fs = ftpserver.AbstractedFS() - fs.root = HOME - self.failUnless(fs.validpath(HOME)) - self.failUnless(fs.validpath(HOME + '/')) - self.failIf(fs.validpath(HOME + 'xxx')) - - if hasattr(os, 'symlink'): - # Tests for validpath on systems supporting symbolic links. - - def _safe_remove(self, path): - # convenience function for removing temporary files - try: - os.remove(path) - except os.error: - pass - - def test_validpath_validlink(self): - # Test validpath by issuing a symlink pointing to a path - # inside the root directory. - fs = ftpserver.AbstractedFS() - fs.root = HOME - try: - open(TESTFN, 'w') - os.symlink(TESTFN, TESTFN2) - self.failUnless(fs.validpath(TESTFN)) - finally: - self._safe_remove(TESTFN) - self._safe_remove(TESTFN2) - - def test_validpath_external_symlink(self): - # Test validpath by issuing a symlink pointing to a path - # outside the root directory. - fs = ftpserver.AbstractedFS() - fs.root = HOME - try: - # tempfile should create our file in /tmp directory - # which should be outside the user root. If it is not - # we just skip the test. - file = tempfile.NamedTemporaryFile() - if HOME == os.path.dirname(file.name): - return - os.symlink(file.name, TESTFN) - self.failIf(fs.validpath(TESTFN)) - finally: - self._safe_remove(TESTFN) - file.close() - - -class TestDummyAuthorizer(unittest.TestCase): - """Tests for DummyAuthorizer class.""" - - # temporarily change warnings to exceptions for the purposes of testing - def setUp(self): - self.tempdir = tempfile.mkdtemp(dir=HOME) - self.subtempdir = tempfile.mkdtemp(dir=os.path.join(HOME, self.tempdir)) - self.tempfile = open(os.path.join(self.tempdir, TESTFN), 'w').name - self.subtempfile = open(os.path.join(self.subtempdir, TESTFN), 'w').name - warnings.filterwarnings("error") - - def tearDown(self): - os.remove(self.tempfile) - os.remove(self.subtempfile) - os.rmdir(self.subtempdir) - os.rmdir(self.tempdir) - warnings.resetwarnings() - - def assertRaisesWithMsg(self, excClass, msg, callableObj, *args, **kwargs): - try: - callableObj(*args, **kwargs) - except excClass, why: - if str(why) == msg: - return - raise self.failureException("%s != %s" %(str(why), msg)) - else: - if hasattr(excClass,'__name__'): excName = excClass.__name__ - else: excName = str(excClass) - raise self.failureException, "%s not raised" % excName - - def test_common_methods(self): - auth = ftpserver.DummyAuthorizer() - # create user - auth.add_user(USER, PASSWD, HOME) - auth.add_anonymous(HOME) - # check credentials - self.failUnless(auth.validate_authentication(USER, PASSWD)) - self.failIf(auth.validate_authentication(USER, 'wrongpwd')) - # remove them - auth.remove_user(USER) - auth.remove_user('anonymous') - # raise exc if user does not exists - self.assertRaises(KeyError, auth.remove_user, USER) - # raise exc if path does not exist - self.assertRaisesWithMsg(ftpserver.AuthorizerError, - 'No such directory: "%s"' %'?:\\', - auth.add_user, USER, PASSWD, '?:\\') - self.assertRaisesWithMsg(ftpserver.AuthorizerError, - 'No such directory: "%s"' %'?:\\', - auth.add_anonymous, '?:\\') - # raise exc if user already exists - auth.add_user(USER, PASSWD, HOME) - auth.add_anonymous(HOME) - self.assertRaisesWithMsg(ftpserver.AuthorizerError, - 'User "%s" already exists' %USER, - auth.add_user, USER, PASSWD, HOME) - self.assertRaisesWithMsg(ftpserver.AuthorizerError, - 'User "anonymous" already exists', - auth.add_anonymous, HOME) - auth.remove_user(USER) - auth.remove_user('anonymous') - # raise on wrong permission - self.assertRaisesWithMsg(ftpserver.AuthorizerError, - 'No such permission "?"', - auth.add_user, USER, PASSWD, HOME, perm='?') - self.assertRaisesWithMsg(ftpserver.AuthorizerError, - 'No such permission "?"', - auth.add_anonymous, HOME, perm='?') - # expect warning on write permissions assigned to anonymous user - for x in "adfmw": - self.assertRaisesWithMsg(RuntimeWarning, - "Write permissions assigned to anonymous user.", - auth.add_anonymous, HOME, perm=x) - - def test_override_perm_interface(self): - auth = ftpserver.DummyAuthorizer() - auth.add_user(USER, PASSWD, HOME, perm='elr') - # raise exc if user does not exists - self.assertRaises(KeyError, auth.override_perm, USER+'w', HOME, 'elr') - # raise exc if path does not exist or it's not a directory - self.assertRaisesWithMsg(ftpserver.AuthorizerError, - 'No such directory: "%s"' %'?:\\', - auth.override_perm, USER, '?:\\', 'elr') - self.assertRaisesWithMsg(ftpserver.AuthorizerError, - 'No such directory: "%s"' %self.tempfile, - auth.override_perm, USER, self.tempfile, 'elr') - # raise on wrong permission - self.assertRaisesWithMsg(ftpserver.AuthorizerError, - 'No such permission "?"', auth.override_perm, - USER, HOME, perm='?') - # expect warning on write permissions assigned to anonymous user - auth.add_anonymous(HOME) - for p in "adfmw": - self.assertRaisesWithMsg(RuntimeWarning, - "Write permissions assigned to anonymous user.", - auth.override_perm, 'anonymous', HOME, p) - # raise on attempt to override home directory permissions - self.assertRaisesWithMsg(ftpserver.AuthorizerError, - "Can't override home directory permissions", - auth.override_perm, USER, HOME, perm='w') - # raise on attempt to override a path escaping home directory - if os.path.dirname(HOME) != HOME: - self.assertRaisesWithMsg(ftpserver.AuthorizerError, - "Path escapes user home directory", - auth.override_perm, USER, - os.path.dirname(HOME), perm='w') - # try to re-set an overridden permission - auth.override_perm(USER, self.tempdir, perm='w') - auth.override_perm(USER, self.tempdir, perm='wr') - - def test_override_perm_recursive_paths(self): - auth = ftpserver.DummyAuthorizer() - auth.add_user(USER, PASSWD, HOME, perm='elr') - self.assert_(auth.has_perm(USER, 'w', self.tempdir) is False) - auth.override_perm(USER, self.tempdir, perm='w', recursive=True) - self.assert_(auth.has_perm(USER, 'w', HOME) is False) - self.assert_(auth.has_perm(USER, 'w', self.tempdir) is True) - self.assert_(auth.has_perm(USER, 'w', self.tempfile) is True) - self.assert_(auth.has_perm(USER, 'w', self.subtempdir) is True) - self.assert_(auth.has_perm(USER, 'w', self.subtempfile) is True) - - self.assert_(auth.has_perm(USER, 'w', HOME + '@') is False) - self.assert_(auth.has_perm(USER, 'w', self.tempdir + '@') is False) - path = os.path.join(self.tempdir + '@', os.path.basename(self.tempfile)) - self.assert_(auth.has_perm(USER, 'w', path) is False) - # test case-sensitiveness - if (os.name in ('nt', 'ce')) or (sys.platform == 'cygwin'): - self.assert_(auth.has_perm(USER, 'w', self.tempdir.upper()) is True) - - def test_override_perm_not_recursive_paths(self): - auth = ftpserver.DummyAuthorizer() - auth.add_user(USER, PASSWD, HOME, perm='elr') - self.assert_(auth.has_perm(USER, 'w', self.tempdir) is False) - auth.override_perm(USER, self.tempdir, perm='w') - self.assert_(auth.has_perm(USER, 'w', HOME) is False) - self.assert_(auth.has_perm(USER, 'w', self.tempdir) is True) - self.assert_(auth.has_perm(USER, 'w', self.tempfile) is True) - self.assert_(auth.has_perm(USER, 'w', self.subtempdir) is False) - self.assert_(auth.has_perm(USER, 'w', self.subtempfile) is False) - - self.assert_(auth.has_perm(USER, 'w', HOME + '@') is False) - self.assert_(auth.has_perm(USER, 'w', self.tempdir + '@') is False) - path = os.path.join(self.tempdir + '@', os.path.basename(self.tempfile)) - self.assert_(auth.has_perm(USER, 'w', path) is False) - # test case-sensitiveness - if (os.name in ('nt', 'ce')) or (sys.platform == 'cygwin'): - self.assert_(auth.has_perm(USER, 'w', self.tempdir.upper()) is True) - - -class TestCallLater(unittest.TestCase): - """Tests for CallLater class.""" - - def setUp(self): - for task in ftpserver._tasks: - if not task.cancelled: - task.cancel() - del ftpserver._tasks[:] - - def scheduler(self, timeout=0.01, count=100): - while ftpserver._tasks and count > 0: - ftpserver._scheduler() - count -= 1 - time.sleep(timeout) - - def test_interface(self): - fun = lambda: 0 - self.assertRaises(AssertionError, ftpserver.CallLater, -1, fun) - x = ftpserver.CallLater(3, fun) - self.assertRaises(AssertionError, x.delay, -1) - self.assert_(x.cancelled is False) - x.cancel() - self.assert_(x.cancelled is True) - self.assertRaises(AssertionError, x.call) - self.assertRaises(AssertionError, x.reset) - self.assertRaises(AssertionError, x.delay, 2) - self.assertRaises(AssertionError, x.cancel) - - def test_order(self): - l = [] - fun = lambda x: l.append(x) - for x in [0.05, 0.04, 0.03, 0.02, 0.01]: - ftpserver.CallLater(x, fun, x) - self.scheduler() - self.assertEqual(l, [0.01, 0.02, 0.03, 0.04, 0.05]) - - def test_delay(self): - l = [] - fun = lambda x: l.append(x) - ftpserver.CallLater(0.01, fun, 0.01).delay(0.07) - ftpserver.CallLater(0.02, fun, 0.02).delay(0.08) - ftpserver.CallLater(0.03, fun, 0.03) - ftpserver.CallLater(0.04, fun, 0.04) - ftpserver.CallLater(0.05, fun, 0.05) - ftpserver.CallLater(0.06, fun, 0.06).delay(0.001) - self.scheduler() - self.assertEqual(l, [0.06, 0.03, 0.04, 0.05, 0.01, 0.02]) - - def test_reset(self): - # will fail on such systems where time.time() does not provide - # time with a better precision than 1 second. - l = [] - fun = lambda x: l.append(x) - ftpserver.CallLater(0.01, fun, 0.01) - ftpserver.CallLater(0.02, fun, 0.02) - ftpserver.CallLater(0.03, fun, 0.03) - x = ftpserver.CallLater(0.04, fun, 0.04) - ftpserver.CallLater(0.05, fun, 0.05) - time.sleep(0.1) - x.reset() - self.scheduler() - self.assertEqual(l, [0.01, 0.02, 0.03, 0.05, 0.04]) - - def test_cancel(self): - l = [] - fun = lambda x: l.append(x) - ftpserver.CallLater(0.01, fun, 0.01).cancel() - ftpserver.CallLater(0.02, fun, 0.02) - ftpserver.CallLater(0.03, fun, 0.03) - ftpserver.CallLater(0.04, fun, 0.04) - ftpserver.CallLater(0.05, fun, 0.05).cancel() - self.scheduler() - self.assertEqual(l, [0.02, 0.03, 0.04]) - - -class TestFtpAuthentication(unittest.TestCase): - "test: USER, PASS, REIN." - - def setUp(self): - self.server = FTPd() - self.server.start() - self.client = ftplib.FTP() - self.client.connect(self.server.host, self.server.port) - self.f1 = open(TESTFN, 'w+b') - self.f2 = open(TESTFN2, 'w+b') - - def tearDown(self): - self.client.close() - self.server.stop() - if not self.f1.closed: - self.f1.close() - if not self.f2.closed: - self.f2.close() - os.remove(TESTFN) - os.remove(TESTFN2) - - def test_auth_ok(self): - self.client.login(user=USER, passwd=PASSWD) - - def test_anon_auth(self): - self.client.login(user='anonymous', passwd='anon@') - self.client.login(user='AnonYmoUs', passwd='anon@') - self.client.login(user='anonymous', passwd='') - - # Commented after delayed response on wrong credentials has been - # introduced because tests take too much to complete. - -## def test_auth_failed(self): -## self.assertRaises(ftplib.error_perm, self.client.login, USER, 'wrong') -## self.assertRaises(ftplib.error_perm, self.client.login, 'wrong', PASSWD) -## self.assertRaises(ftplib.error_perm, self.client.login, 'wrong', 'wrong') - -## def test_max_auth(self): -## self.assertRaises(ftplib.error_perm, self.client.login, USER, 'wrong') -## self.assertRaises(ftplib.error_perm, self.client.login, USER, 'wrong') -## self.assertRaises(ftplib.error_perm, self.client.login, USER, 'wrong') -## # If authentication fails for 3 times ftpd disconnects the -## # client. We can check if that happens by using self.client.sendcmd() -## # on the 'dead' socket object. If socket object is really -## # closed it should be raised a socket.error exception (Windows) -## # or a EOFError exception (Linux). -## self.assertRaises((socket.error, EOFError), self.client.sendcmd, '') - - def test_rein(self): - self.client.login(user=USER, passwd=PASSWD) - self.client.sendcmd('rein') - # user not authenticated, error response expected - self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'pwd') - # by logging-in again we should be able to execute a - # file-system command - self.client.login(user=USER, passwd=PASSWD) - self.client.sendcmd('pwd') - - def test_rein_during_transfer(self): - self.client.login(user=USER, passwd=PASSWD) - data = 'abcde12345' * 100000 - self.f1.write(data) - self.f1.close() - - self.client.voidcmd('TYPE I') - conn = self.client.transfercmd('retr ' + TESTFN) - rein_sent = 0 - while 1: - chunk = conn.recv(8192) - if not chunk: - break - self.f2.write(chunk) - if not rein_sent: - rein_sent = 1 - # flush account, error response expected - self.client.sendcmd('rein') - self.assertRaises(ftplib.error_perm, self.client.dir) - - # a 226 response is expected once tranfer finishes - self.assertEqual(self.client.voidresp()[:3], '226') - # account is still flushed, error response is still expected - self.assertRaises(ftplib.error_perm, self.client.sendcmd, - 'size ' + TESTFN) - # by logging-in again we should be able to execute a - # filesystem command - self.client.login(user=USER, passwd=PASSWD) - self.client.sendcmd('pwd') - self.f2.seek(0) - self.assertEqual(hash(data), hash (self.f2.read())) - - def test_user(self): - # Test USER while already authenticated and no transfer - # is in progress. - self.client.login(user=USER, passwd=PASSWD) - self.client.sendcmd('user ' + USER) # authentication flushed - self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'pwd') - self.client.sendcmd('pass ' + PASSWD) - self.client.sendcmd('pwd') - - def test_user_on_transfer(self): - # Test USER while already authenticated and a transfer is - # in progress. - self.client.login(user=USER, passwd=PASSWD) - data = 'abcde12345' * 100000 - self.f1.write(data) - self.f1.close() - - self.client.voidcmd('TYPE I') - conn = self.client.transfercmd('retr ' + TESTFN) - rein_sent = 0 - while 1: - chunk = conn.recv(8192) - if not chunk: - break - self.f2.write(chunk) - # stop transfer while it isn't finished yet - if not rein_sent: - rein_sent = 1 - # flush account, expect an error response - self.client.sendcmd('user ' + USER) - self.assertRaises(ftplib.error_perm, self.client.dir) - - # a 226 response is expected once tranfer finishes - self.assertEqual(self.client.voidresp()[:3], '226') - # account is still flushed, error response is still expected - self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'pwd') - # by logging-in again we should be able to execute a - # filesystem command - self.client.sendcmd('pass ' + PASSWD) - self.client.sendcmd('pwd') - self.f2.seek(0) - self.assertEqual(hash(data), hash (self.f2.read())) - - -class TestFtpDummyCmds(unittest.TestCase): - "test: TYPE, STRU, MODE, NOOP, SYST, ALLO, HELP" - - def setUp(self): - self.server = FTPd() - self.server.start() - self.client = ftplib.FTP() - self.client.connect(self.server.host, self.server.port) - self.client.login(USER, PASSWD) - - def tearDown(self): - self.client.close() - self.server.stop() - - def test_type(self): - self.client.sendcmd('type a') - self.client.sendcmd('type i') - self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'type ?!?') - - def test_stru(self): - self.client.sendcmd('stru f') - self.client.sendcmd('stru F') - self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'stru ?!?') - - def test_mode(self): - self.client.sendcmd('mode s') - self.client.sendcmd('mode S') - self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'mode ?!?') - - def test_noop(self): - self.client.sendcmd('noop') - - def test_syst(self): - self.client.sendcmd('syst') - - def test_allo(self): - self.client.sendcmd('allo x') - - def test_quit(self): - self.client.sendcmd('quit') - - def test_help(self): - self.client.sendcmd('help') - cmd = random.choice(ftpserver.proto_cmds.keys()) - self.client.sendcmd('help %s' %cmd) - self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'help ?!?') - - def test_rest(self): - # test error conditions only; - # restored data-transfer is tested later - self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'rest') - self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'rest str') - self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'rest -1') - - def test_opts_feat(self): - self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'opts mlst bad_fact') - self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'opts mlst type ;') - self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'opts not_mlst') - # utility function which used for extracting the MLST "facts" - # string from the FEAT response - def mlst(): - resp = self.client.sendcmd('feat') - return re.search(r'^\s*MLST\s+(\S+)$', resp, re.MULTILINE).group(1) - # we rely on "type", "perm", "size", and "modify" facts which - # are those available on all platforms - self.failUnless('type*;perm*;size*;modify*;' in mlst()) - self.assertEqual(self.client.sendcmd('opts mlst type;'), '200 MLST OPTS type;') - self.assertEqual(self.client.sendcmd('opts mLSt TypE;'), '200 MLST OPTS type;') - self.failUnless('type*;perm;size;modify;' in mlst()) - - self.assertEqual(self.client.sendcmd('opts mlst'), '200 MLST OPTS ') - self.failUnless(not '*' in mlst()) - - self.assertEqual(self.client.sendcmd('opts mlst fish;cakes;'), '200 MLST OPTS ') - self.failUnless(not '*' in mlst()) - self.assertEqual(self.client.sendcmd('opts mlst fish;cakes;type;'), \ - '200 MLST OPTS type;') - self.failUnless('type*;perm;size;modify;' in mlst()) - - -class TestFtpCmdsSemantic(unittest.TestCase): - - arg_cmds = ('allo','appe','dele','eprt','mdtm','mode','mkd','opts','port', - 'rest','retr','rmd','rnfr','rnto','size', 'stor', 'stru','type', - 'user','xmkd','xrmd') - - def setUp(self): - self.server = FTPd() - self.server.start() - self.client = ftplib.FTP() - self.client.connect(self.server.host, self.server.port) - self.client.login(USER, PASSWD) - - def tearDown(self): - self.client.close() - self.server.stop() - - def test_arg_cmds(self): - # test commands requiring an argument - expected = "501 Syntax error: command needs an argument." - for cmd in self.arg_cmds: - self.client.putcmd(cmd) - resp = self.client.getmultiline() - self.assertEqual(resp, expected) - - def test_no_arg_cmds(self): - # test commands accepting no arguments - expected = "501 Syntax error: command does not accept arguments." - for cmd in ('abor','cdup','feat','noop','pasv','pwd','quit','rein', - 'syst','xcup','xpwd'): - self.client.putcmd(cmd + ' arg') - resp = self.client.getmultiline() - self.assertEqual(resp, expected) - - def test_auth_cmds(self): - # test those commands requiring client to be authenticated - expected = "530 Log in with USER and PASS first." - self.client.sendcmd('rein') - for cmd in ftpserver.proto_cmds: - cmd = cmd.lower() - if cmd in ('feat','help','noop','user','pass','stat','syst','quit'): - continue - if cmd in self.arg_cmds: - cmd = cmd + ' arg' - self.client.putcmd(cmd) - resp = self.client.getmultiline() - self.assertEqual(resp, expected) - - def test_no_auth_cmds(self): - # test those commands that do not require client to be authenticated - self.client.sendcmd('rein') - for cmd in ('feat','help','noop','stat','syst'): - self.client.sendcmd(cmd) - # STAT provided with an argument is equal to LIST hence not allowed - # if not authenticated - self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'stat /') - self.client.sendcmd('quit') - - -class TestFtpFsOperations(unittest.TestCase): - "test: PWD, CWD, CDUP, SIZE, RNFR, RNTO, DELE, MKD, RMD, MDTM, STAT" - - def setUp(self): - self.server = FTPd() - self.server.start() - self.client = ftplib.FTP() - self.client.connect(self.server.host, self.server.port) - self.client.login(USER, PASSWD) - self.tempfile = os.path.basename(open(TESTFN, 'w+b').name) - self.tempdir = os.path.basename(tempfile.mktemp(dir=HOME)) - os.mkdir(self.tempdir) - - def tearDown(self): - self.client.close() - self.server.stop() - if os.path.exists(self.tempfile): - os.remove(self.tempfile) - if os.path.exists(self.tempdir): - os.rmdir(self.tempdir) - - def test_cwd(self): - self.client.cwd(self.tempdir) - self.assertEqual(self.client.pwd(), '/' + self.tempdir) - self.assertRaises(ftplib.error_perm, self.client.cwd, 'subtempdir') - # cwd provided with no arguments is supposed to move us to the - # root directory - self.client.sendcmd('cwd') - self.assertEqual(self.client.pwd(), '/') - - def test_pwd(self): - self.assertEqual(self.client.pwd(), '/') - self.client.cwd(self.tempdir) - self.assertEqual(self.client.pwd(), '/' + self.tempdir) - - def test_cdup(self): - self.client.cwd(self.tempdir) - self.assertEqual(self.client.pwd(), '/' + self.tempdir) - self.client.sendcmd('cdup') - self.assertEqual(self.client.pwd(), '/') - # make sure we can't escape from root directory - self.client.sendcmd('cdup') - self.assertEqual(self.client.pwd(), '/') - - def test_mkd(self): - tempdir = os.path.basename(tempfile.mktemp(dir=HOME)) - self.client.mkd(tempdir) - # make sure we can't create directories which already exist - # (probably not really necessary); - # let's use a try/except statement to avoid leaving behind - # orphaned temporary directory in the event of a test failure. - try: - self.client.mkd(tempdir) - except ftplib.error_perm: - os.rmdir(tempdir) # ok - else: - self.fail('ftplib.error_perm not raised.') - - def test_rmd(self): - self.client.rmd(self.tempdir) - self.assertRaises(ftplib.error_perm, self.client.rmd, self.tempfile) - # make sure we can't remove the root directory - self.assertRaises(ftplib.error_perm, self.client.rmd, '/') - - def test_dele(self): - self.client.delete(self.tempfile) - self.assertRaises(ftplib.error_perm, self.client.delete, self.tempdir) - - def test_rnfr_rnto(self): - # rename file - tempname = os.path.basename(tempfile.mktemp(dir=HOME)) - self.client.rename(self.tempfile, tempname) - self.client.rename(tempname, self.tempfile) - # rename dir - tempname = os.path.basename(tempfile.mktemp(dir=HOME)) - self.client.rename(self.tempdir, tempname) - self.client.rename(tempname, self.tempdir) - # rnfr/rnto over non-existing paths - bogus = os.path.basename(tempfile.mktemp(dir=HOME)) - self.assertRaises(ftplib.error_perm, self.client.rename, bogus, '/x') - self.assertRaises(ftplib.error_perm, self.client.rename, self.tempfile, '/') - # make sure we can't rename root directory - self.assertRaises(ftplib.error_perm, self.client.rename, '/', '/x') - - def test_mdtm(self): - self.client.sendcmd('mdtm ' + self.tempfile) - # make sure we can't use mdtm against directories - try: - self.client.sendcmd('mdtm ' + self.tempdir) - except ftplib.error_perm, err: - self.failUnless("not retrievable" in str(err)) - else: - self.fail('Exception not raised') - - def test_size(self): - self.client.size(self.tempfile) - # make sure we can't use size against directories - try: - self.client.sendcmd('size ' + self.tempdir) - except ftplib.error_perm, err: - self.failUnless("not retrievable" in str(err)) - else: - self.fail('Exception not raised') - - -class TestFtpRetrieveData(unittest.TestCase): - "test: RETR, REST, LIST, NLST, argumented STAT" - - def setUp(self): - self.server = FTPd() - self.server.start() - self.client = ftplib.FTP() - self.client.connect(self.server.host, self.server.port) - self.client.login(USER, PASSWD) - self.f1 = open(TESTFN, 'w+b') - self.f2 = open(TESTFN2, 'w+b') - - def tearDown(self): - self.client.close() - self.server.stop() - if not self.f1.closed: - self.f1.close() - if not self.f2.closed: - self.f2.close() - os.remove(TESTFN) - os.remove(TESTFN2) - - def test_retr(self): - data = 'abcde12345' * 100000 - self.f1.write(data) - self.f1.close() - self.client.retrbinary("retr " + TESTFN, self.f2.write) - self.f2.seek(0) - self.assertEqual(hash(data), hash(self.f2.read())) - - def test_restore_on_retr(self): - data = 'abcde12345' * 100000 - fname_1 = os.path.basename(self.f1.name) - self.f1.write(data) - self.f1.close() - - # look at ftplib.FTP.retrbinary method to understand this mess - self.client.voidcmd('TYPE I') - conn = self.client.transfercmd('retr ' + fname_1) - chunk = conn.recv(len(data) / 2) - self.f2.write(chunk) - conn.close() - # transfer wasn't finished yet so we expect a 426 response - self.assertRaises(ftplib.error_temp, self.client.voidresp) - - # resuming transfer by using a marker value greater than the - # file size stored on the server should result in an error - # on retr (RFC-1123) - file_size = self.client.size(fname_1) - self.client.sendcmd('rest %s' %((file_size + 1))) - self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'retr ' + fname_1) - - # test resume - self.client.sendcmd('rest %s' %len(chunk)) - self.client.retrbinary("retr " + fname_1, self.f2.write) - self.f2.seek(0) - self.assertEqual(hash(data), hash (self.f2.read())) - - def _test_listing_cmds(self, cmd): - """Tests common to LIST NLST and MLSD commands.""" - # assume that no argument has the same meaning of "/" - l1 = l2 = [] - self.client.retrlines(cmd, l1.append) - self.client.retrlines(cmd + ' /', l2.append) - self.assertEqual(l1, l2) - if cmd.lower() != 'mlsd': - # if pathname is a file one line is expected - x = [] - self.client.retrlines('%s ' %cmd + TESTFN, x.append) - self.assertEqual(len(x), 1) - self.failUnless(''.join(x).endswith(TESTFN)) - # non-existent path, 550 response is expected - bogus = os.path.basename(tempfile.mktemp(dir=HOME)) - self.assertRaises(ftplib.error_perm, self.client.retrlines, - '%s ' %cmd + bogus, lambda x: x) - # for an empty directory we excpect that the data channel is - # opened anyway and that no data is received - x = [] - tempdir = os.path.basename(tempfile.mkdtemp(dir=HOME)) - try: - self.client.retrlines('%s %s' %(cmd, tempdir), x.append) - self.assertEqual(x, []) - finally: - os.rmdir(tempdir) - - def test_nlst(self): - # common tests - self._test_listing_cmds('nlst') - - def test_list(self): - # common tests - self._test_listing_cmds('list') - # known incorrect pathname arguments (e.g. old clients) are - # expected to be treated as if pathname would be == '/' - l1 = l2 = l3 = l4 = l5 = [] - self.client.retrlines('list /', l1.append) - self.client.retrlines('list -a', l2.append) - self.client.retrlines('list -l', l3.append) - self.client.retrlines('list -al', l4.append) - self.client.retrlines('list -la', l5.append) - tot = (l1, l2, l3, l4, l5) - for x in range(len(tot) - 1): - self.assertEqual(tot[x], tot[x+1]) - - def test_mlst(self): - # utility function for extracting the line of interest - mlstline = lambda cmd: self.client.voidcmd(cmd).split('\n')[1] - - # the fact set must be preceded by a space - self.failUnless(mlstline('mlst').startswith(' ')) - # where TVFS is supported, a fully qualified pathname is expected - self.failUnless(mlstline('mlst ' + TESTFN).endswith('/' + TESTFN)) - self.failUnless(mlstline('mlst').endswith('/')) - # assume that no argument has the same meaning of "/" - self.assertEqual(mlstline('mlst'), mlstline('mlst /')) - # non-existent path - bogus = os.path.basename(tempfile.mktemp(dir=HOME)) - self.assertRaises(ftplib.error_perm, mlstline, bogus) - # test file/dir notations - self.failUnless('type=dir' in mlstline('mlst')) - self.failUnless('type=file' in mlstline('mlst ' + TESTFN)) - # let's add some tests for OPTS command - self.client.sendcmd('opts mlst type;') - self.assertEqual(mlstline('mlst'), ' type=dir; /') - # where no facts are present, two leading spaces before the - # pathname are required (RFC-3659) - self.client.sendcmd('opts mlst') - self.assertEqual(mlstline('mlst'), ' /') - - def test_mlsd(self): - # common tests - self._test_listing_cmds('mlsd') - dir = os.path.basename(tempfile.mkdtemp(dir=HOME)) - try: - try: - self.client.retrlines('mlsd ' + TESTFN, lambda x: x) - except ftplib.error_perm, resp: - # if path is a file a 501 response code is expected - self.assertEqual(str(resp)[0:3], "501") - else: - self.fail("Exception not raised") - finally: - os.rmdir(dir) - - def test_stat(self): - # test STAT provided with argument which is equal to LIST - self.client.sendcmd('stat /') - self.client.sendcmd('stat ' + TESTFN) - self.client.putcmd('stat *') - resp = self.client.getmultiline() - self.assertEqual(resp, '550 Globbing not supported.') - bogus = os.path.basename(tempfile.mktemp(dir=HOME)) - self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'stat ' + bogus) - - -class TestFtpAbort(unittest.TestCase): - "test: ABOR" - - def setUp(self): - self.server = FTPd() - self.server.start() - self.client = ftplib.FTP() - self.client.connect(self.server.host, self.server.port) - self.client.login(USER, PASSWD) - self.f1 = open(TESTFN, 'w+b') - self.f2 = open(TESTFN2, 'w+b') - - def tearDown(self): - self.client.close() - self.server.stop() - if not self.f1.closed: - self.f1.close() - if not self.f2.closed: - self.f2.close() - os.remove(self.f1.name) - os.remove(self.f2.name) - - def test_abor_no_data(self): - # Case 1: ABOR while no data channel is opened: respond with 225. - resp = self.client.sendcmd('ABOR') - self.failUnlessEqual('225 No transfer to abort.', resp) - - def test_abor_pasv(self): - # Case 2: user sends a PASV, a data-channel socket is listening - # but not connected, and ABOR is sent: close listening data - # socket, respond with 225. - self.client.makepasv() - respcode = self.client.sendcmd('ABOR')[:3] - self.failUnlessEqual('225', respcode) - - def test_abor_port(self): - # Case 3: data channel opened with PASV or PORT, but ABOR sent - # before a data transfer has been started: close data channel, - # respond with 225 - self.client.makeport() - respcode = self.client.sendcmd('ABOR')[:3] - self.failUnlessEqual('225', respcode) - - def test_abor(self): - # Case 4: ABOR while a data transfer on DTP channel is in - # progress: close data channel, respond with 426, respond - # with 226. - data = 'abcde12345' * 100000 - self.f1.write(data) - self.f1.close() - - # this ugly loop construct is to simulate an interrupted - # transfer since ftplib doesn't like running storbinary() - # in a separate thread - self.client.voidcmd('TYPE I') - conn = self.client.transfercmd('retr ' + TESTFN) - chunk = conn.recv(len(data) / 2) - # stop transfer while it isn't finished yet - self.client.putcmd('ABOR') - - # transfer isn't finished yet so ftpd should respond with 426 - self.assertRaises(ftplib.error_temp, self.client.voidresp) - - # transfer successfully aborted, so should now respond with a 226 - self.failUnlessEqual('226', self.client.voidresp()[:3]) - - if hasattr(socket, 'MSG_OOB'): - def test_oob_abor(self): - # Send ABOR by following the RFC-959 directives of sending - # Telnet IP/Synch sequence as OOB data. - # On some systems like FreeBSD this happened to be a problem - # due to a different SO_OOBINLINE behavior. - # On some platforms (e.g. Python CE) the test may fail - # although the MSG_OOB constant is defined. - self.client.sock.sendall(chr(244), socket.MSG_OOB) - self.client.sock.sendall(chr(242), socket.MSG_OOB) - self.client.sock.sendall('abor\r\n') - self.client.sock.settimeout(1) - self.assertEqual(self.client.getresp()[:3], '225') - - -class TestFtpStoreData(unittest.TestCase): - "test: STOR, STOU, APPE, REST" - - def setUp(self): - self.server = FTPd() - self.server.start() - self.client = ftplib.FTP() - self.client.connect(self.server.host, self.server.port) - self.client.login(USER, PASSWD) - self.f1 = open(TESTFN, 'w+b') - self.f2 = open(TESTFN2, 'w+b') - - def tearDown(self): - self.client.close() - self.server.stop() - if not self.f1.closed: - self.f1.close() - if not self.f2.closed: - self.f2.close() - os.remove(TESTFN) - os.remove(TESTFN2) - - def test_stor(self): - # TESTFN3 is the remote file name - try: - data = 'abcde12345' * 100000 - self.f1.write(data) - self.f1.seek(0) - self.client.storbinary('stor ' + TESTFN3, self.f1) - self.client.retrbinary('retr ' + TESTFN3, self.f2.write) - self.f2.seek(0) - self.assertEqual(hash(data), hash (self.f2.read())) - finally: - # we do not use os.remove because file could be still - # locked by ftpd thread - if os.path.exists(TESTFN3): - self.client.delete(TESTFN3) - - def test_stou(self): - data = 'abcde12345' * 100000 - self.f1.write(data) - self.f1.seek(0) - - self.client.voidcmd('TYPE I') - # filename comes in as "1xx FILE: <filename>" - filename = self.client.sendcmd('stou').split('FILE: ')[1] - try: - sock = self.client.makeport() - conn, sockaddr = sock.accept() - while 1: - buf = self.f1.read(8192) - if not buf: - break - conn.sendall(buf) - conn.close() - # transfer finished, a 226 response is expected - self.client.voidresp() - self.client.retrbinary('retr ' + filename, self.f2.write) - self.f2.seek(0) - self.assertEqual(hash(data), hash (self.f2.read())) - finally: - # we do not use os.remove because file could be - # still locked by ftpd thread - if os.path.exists(filename): - self.client.delete(filename) - - def test_stou_rest(self): - # watch for STOU preceded by REST, which makes no sense. - self.client.sendcmd('rest 10') - self.assertRaises(ftplib.error_temp, self.client.sendcmd, 'stou') - - def test_appe(self): - # TESTFN3 is the remote file name - try: - data1 = 'abcde12345' * 100000 - self.f1.write(data1) - self.f1.seek(0) - self.client.storbinary('stor ' + TESTFN3, self.f1) - - data2 = 'fghil67890' * 100000 - self.f1.write(data2) - self.f1.seek(self.client.size(TESTFN3)) - self.client.storbinary('appe ' + TESTFN3, self.f1) - - self.client.retrbinary("retr " + TESTFN3, self.f2.write) - self.f2.seek(0) - self.assertEqual(hash(data1 + data2), hash (self.f2.read())) - finally: - # we do not use os.remove because file could be still - # locked by ftpd thread - if os.path.exists(TESTFN3): - self.client.delete(TESTFN3) - - def test_appe_rest(self): - # watch for APPE preceded by REST, which makes no sense. - self.client.sendcmd('rest 10') - self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'appe x') - - def test_rest_on_stor(self): - # TESTFN3 is the remote file name - data = 'abcde12345' * 100000 - self.f1.write(data) - self.f1.seek(0) - - self.client.voidcmd('TYPE I') - conn = self.client.transfercmd('stor ' + TESTFN3) - bytes_sent = 0 - while 1: - chunk = self.f1.read(8192) - conn.sendall(chunk) - bytes_sent += len(chunk) - # stop transfer while it isn't finished yet - if bytes_sent >= 524288: # 2^19 - break - elif not chunk: - break - conn.close() - # transfer wasn't finished yet so we expect a 426 response - self.client.voidresp() - - # resuming transfer by using a marker value greater than the - # file size stored on the server should result in an error - # on stor - file_size = self.client.size(TESTFN3) - self.assertEqual(file_size, bytes_sent) - self.client.sendcmd('rest %s' %((file_size + 1))) - self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'stor ' + TESTFN3) - - self.client.sendcmd('rest %s' %bytes_sent) - self.client.storbinary('stor ' + TESTFN3, self.f1) - - self.client.retrbinary('retr ' + TESTFN3, self.f2.write) - self.f1.seek(0) - self.f2.seek(0) - self.assertEqual(hash(self.f1.read()), hash(self.f2.read())) - self.client.delete(TESTFN3) - - -class TestTimeouts(unittest.TestCase): - """Test idle-timeout capabilities of control and data channels. - Some tests may fail on slow machines. - """ - - def _setUp(self, idle_timeout=300, data_timeout=300, pasv_timeout=30): - self.server = FTPd() - self.server.handler.timeout = idle_timeout - self.server.handler.dtp_handler.timeout = data_timeout - self.server.handler.passive_dtp.timeout = pasv_timeout - self.server.start() - self.client = ftplib.FTP() - self.client.connect(self.server.host, self.server.port) - self.client.login(USER, PASSWD) - - def tearDown(self): - self.client.close() - self.server.handler.timeout = 300 - self.server.handler.dtp_handler.timeout = 300 - self.server.handler.passive_dtp.timeout = 30 - self.server.stop() - - def test_idle_timeout(self): - # Test control channel timeout. The client which does not send - # any command within the time specified in FTPHandler.timeout is - # supposed to be kicked off. - self._setUp(idle_timeout=0.1) - # fail if no msg is received within 1 second - self.client.sock.settimeout(1) - data = self.client.sock.recv(1024) - self.assertEqual(data, "421 Control connection timed out.\r\n") - # ensure client has been kicked off - self.assertRaises((socket.error, EOFError), self.client.sendcmd, 'noop') - - def test_data_timeout(self): - # Test data channel timeout. The client which does not send - # or receive any data within the time specified in - # DTPHandler.timeout is supposed to be kicked off. - self._setUp(data_timeout=0.1) - addr = self.client.makepasv() - s = socket.socket() - s.connect(addr) - # fail if no msg is received within 1 second - self.client.sock.settimeout(1) - data = self.client.sock.recv(1024) - self.assertEqual(data, "421 Data connection timed out.\r\n") - # ensure client has been kicked off - self.assertRaises((socket.error, EOFError), self.client.sendcmd, 'noop') - - def test_idle_data_timeout1(self): - # Tests that the control connection timeout is suspended while - # the data channel is opened - self._setUp(idle_timeout=0.1, data_timeout=0.2) - addr = self.client.makepasv() - s = socket.socket() - s.connect(addr) - # fail if no msg is received within 1 second - self.client.sock.settimeout(1) - data = self.client.sock.recv(1024) - self.assertEqual(data, "421 Data connection timed out.\r\n") - # ensure client has been kicked off - self.assertRaises((socket.error, EOFError), self.client.sendcmd, 'noop') - - def test_idle_data_timeout2(self): - # Tests that the control connection timeout is restarted after - # data channel has been closed - self._setUp(idle_timeout=0.1, data_timeout=0.2) - addr = self.client.makepasv() - s = socket.socket() - s.connect(addr) - # close data channel - self.client.sendcmd('abor') - self.client.sock.settimeout(1) - data = self.client.sock.recv(1024) - self.assertEqual(data, "421 Control connection timed out.\r\n") - # ensure client has been kicked off - self.assertRaises((socket.error, EOFError), self.client.sendcmd, 'noop') - - def test_pasv_timeout(self): - # Test pasv data channel timeout. The client which does not connect - # to the listening data socket within the time specified in - # PassiveDTP.timeout is supposed to receive a 421 response. - self._setUp(pasv_timeout=0.1) - self.client.makepasv() - # fail if no msg is received within 1 second - self.client.sock.settimeout(1) - data = self.client.sock.recv(1024) - self.assertEqual(data, "421 Passive data channel timed out.\r\n") - # client is not expected to be kicked off - self.client.sendcmd('noop') - - -class TestMaxConnections(unittest.TestCase): - """Test maximum connections (FTPServer.max_cons).""" - - def setUp(self): - self.server = FTPd() - self.server.server.max_cons = 3 - self.server.start() - - def tearDown(self): - self.server.server.max_cons = 0 - self.server.stop() - - def test_max_connections(self): - c1 = ftplib.FTP() - c2 = ftplib.FTP() - c3 = ftplib.FTP() - try: - c1.connect(self.server.host, self.server.port) - c2.connect(self.server.host, self.server.port) - self.assertRaises(ftplib.error_temp, c3.connect, self.server.host, - self.server.port) - # with passive data channel established - c2.close() - c1.login(USER, PASSWD) - c1.makepasv() - self.assertRaises(ftplib.error_temp, c2.connect, self.server.host, - self.server.port) - # with passive data socket waiting for connection - c1.login(USER, PASSWD) - c1.sendcmd('pasv') - self.assertRaises(ftplib.error_temp, c2.connect, self.server.host, - self.server.port) - # with active data channel established - c1.login(USER, PASSWD) - c1.makeport() - self.assertRaises(ftplib.error_temp, c2.connect, self.server.host, - self.server.port) - finally: - c1.close() - c2.close() - c3.close() - - -class _TestNetworkProtocols(unittest.TestCase): - """Test PASV, EPSV, PORT and EPRT commands. - - Do not use this class directly. Let TestIPv4Environment and - TestIPv6Environment classes use it instead. - """ - HOST = HOST - - def setUp(self): - self.server = FTPd(self.HOST) - self.server.start() - self.client = ftplib.FTP() - self.client.connect(self.server.host, self.server.port) - self.client.login(USER, PASSWD) - if self.client.af == socket.AF_INET: - self.proto = "1" - self.other_proto = "2" - else: - self.proto = "2" - self.other_proto = "1" - - def tearDown(self): - self.client.close() - self.server.stop() - - def cmdresp(self, cmd): - """Send a command and return response, also if the command failed.""" - try: - return self.client.sendcmd(cmd) - except ftplib.Error, err: - return str(err) - - def test_eprt(self): - # test wrong proto - try: - self.client.sendcmd('eprt |%s|%s|%s|' %(self.other_proto, - self.server.host, self.server.port)) - except ftplib.error_perm, err: - self.assertEqual(str(err)[0:3], "522") - else: - self.fail("Exception not raised") - - # test bad args - msg = "501 Invalid EPRT format." - # len('|') > 3 - self.assertEqual(self.cmdresp('eprt ||||'), msg) - # len('|') < 3 - self.assertEqual(self.cmdresp('eprt ||'), msg) - # port > 65535 - self.assertEqual(self.cmdresp('eprt |%s|%s|65536|' %(self.proto, - self.HOST)), msg) - # port < 0 - self.assertEqual(self.cmdresp('eprt |%s|%s|-1|' %(self.proto, - self.HOST)), msg) - # port < 1024 - self.assertEqual(self.cmdresp('eprt |%s|%s|222|' %(self.proto, - self.HOST)), "501 Can't connect over a privileged port.") - - # test connection - sock = socket.socket(self.client.af, socket.SOCK_STREAM) - sock.bind((self.client.sock.getsockname()[0], 0)) - sock.listen(5) - sock.settimeout(2) - ip, port = sock.getsockname()[:2] - self.client.sendcmd('eprt |%s|%s|%s|' %(self.proto, ip, port)) - try: - try: - sock.accept() - except socket.timeout: - self.fail("Server didn't connect to passive socket") - finally: - sock.close() - - def test_epsv(self): - # test wrong proto - try: - self.client.sendcmd('epsv ' + self.other_proto) - except ftplib.error_perm, err: - self.assertEqual(str(err)[0:3], "522") - else: - self.fail("Exception not raised") - - # test connection - for cmd in ('EPSV', 'EPSV ' + self.proto): - host, port = ftplib.parse229(self.client.sendcmd(cmd), - self.client.sock.getpeername()) - s = socket.socket(self.client.af, socket.SOCK_STREAM) - s.settimeout(2) - try: - s.connect((host, port)) - self.client.sendcmd('abor') - finally: - s.close() - - def test_epsv_all(self): - self.client.sendcmd('epsv all') - self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'pasv') - self.assertRaises(ftplib.error_perm, self.client.sendport, self.HOST, 2000) - self.assertRaises(ftplib.error_perm, self.client.sendcmd, - 'eprt |%s|%s|%s|' %(self.proto, self.HOST, 2000)) - - -class TestIPv4Environment(_TestNetworkProtocols): - """Test PASV, EPSV, PORT and EPRT commands. - - Runs tests contained in _TestNetworkProtocols class by using IPv4 - plus some additional specific tests. - """ - HOST = '127.0.0.1' - - def test_port_v4(self): - # test connection - self.client.makeport() - self.client.sendcmd('abor') - # test bad arguments - ae = self.assertEqual - msg = "501 Invalid PORT format." - ae(self.cmdresp('port 127,0,0,1,1.1'), msg) # sep != ',' - ae(self.cmdresp('port X,0,0,1,1,1'), msg) # value != int - ae(self.cmdresp('port 127,0,0,1,1,1,1'), msg) # len(args) > 6 - ae(self.cmdresp('port 127,0,0,1'), msg) # len(args) < 6 - ae(self.cmdresp('port 256,0,0,1,1,1'), msg) # oct > 255 - ae(self.cmdresp('port 127,0,0,1,256,1'), msg) # port > 65535 - ae(self.cmdresp('port 127,0,0,1,-1,0'), msg) # port < 0 - msg = "501 Can't connect over a privileged port." - ae(self.cmdresp('port %s,1,1' %self.HOST.replace('.',',')),msg) # port < 1024 - if "1.2.3.4" != self.HOST: - msg = "501 Can't connect to a foreign address." - ae(self.cmdresp('port 1,2,3,4,4,4'), msg) - - def test_eprt_v4(self): - self.assertEqual(self.cmdresp('eprt |1|0.10.10.10|2222|'), - "501 Can't connect to a foreign address.") - - def test_pasv_v4(self): - host, port = ftplib.parse227(self.client.sendcmd('pasv')) - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - s.settimeout(2) - try: - s.connect((host, port)) - finally: - s.close() - - -class TestIPv6Environment(_TestNetworkProtocols): - """Test PASV, EPSV, PORT and EPRT commands. - - Runs tests contained in _TestNetworkProtocols class by using IPv6 - plus some additional specific tests. - """ - HOST = '::1' - - def test_port_v6(self): - # 425 expected - self.assertRaises(ftplib.error_temp, self.client.sendport, - self.server.host, self.server.port) - - def test_pasv_v6(self): - # 425 expected - self.assertRaises(ftplib.error_temp, self.client.sendcmd, 'pasv') - - def test_eprt_v6(self): - self.assertEqual(self.cmdresp('eprt |2|::xxx|2222|'), - "501 Can't connect to a foreign address.") - - -class FTPd(threading.Thread): - """A threaded FTP server used for running tests.""" - - def __init__(self, host=HOST, port=0, verbose=False): - threading.Thread.__init__(self) - self.active = False - if not verbose: - ftpserver.log = ftpserver.logline = lambda x: x - self.authorizer = ftpserver.DummyAuthorizer() - self.authorizer.add_user(USER, PASSWD, HOME, perm='elradfmw') # full perms - self.authorizer.add_anonymous(HOME) - self.handler = ftpserver.FTPHandler - self.handler.authorizer = self.authorizer - self.server = ftpserver.FTPServer((host, port), self.handler) - self.host, self.port = self.server.socket.getsockname()[:2] - self.active_lock = threading.Lock() - - def start(self): - assert not self.active - self.__flag = threading.Event() - threading.Thread.start(self) - self.__flag.wait() - - def run(self): - self.active = True - self.__flag.set() - while self.active: - self.active_lock.acquire() - self.server.serve_forever(timeout=0.001, count=1) - self.active_lock.release() - self.server.close_all(ignore_all=True) - - def stop(self): - assert self.active - self.active = False - self.join() - - -def remove_test_files(): - "Convenience function for removing temporary test files" - for file in [TESTFN, TESTFN2, TESTFN3]: - try: - os.remove(file) - except os.error: - pass - -def test_main(tests=None): - test_suite = unittest.TestSuite() - if tests is None: - tests = [ - TestAbstractedFS, - TestDummyAuthorizer, - TestCallLater, - TestFtpAuthentication, - TestFtpDummyCmds, - TestFtpCmdsSemantic, - TestFtpFsOperations, - TestFtpRetrieveData, - TestFtpAbort, - TestFtpStoreData, - TestTimeouts, - TestMaxConnections - ] - if SUPPORTS_IPV4: - tests.append(TestIPv4Environment) - if SUPPORTS_IPV6: - tests.append(TestIPv6Environment) - - for test in tests: - test_suite.addTest(unittest.makeSuite(test)) - remove_test_files() - unittest.TextTestRunner(verbosity=2).run(test_suite) - remove_test_files() - - -if __name__ == '__main__': - test_main() |