#!/usr/bin/env python # test_ftpd.py # ====================================================================== # Copyright (C) 2007 Giampaolo Rodola' # # All Rights Reserved # # Permission to use, copy, modify, and distribute this software and # its documentation for any purpose and without fee is hereby # granted, provided that the above copyright notice appear in all # copies and that both that copyright notice and this permission # notice appear in supporting documentation, and that the name of # Giampaolo Rodola' not be used in advertising or publicity pertaining to # distribution of the software without specific, written prior # permission. # # Giampaolo Rodola' DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, # INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN # NO EVENT Giampaolo Rodola' BE LIABLE FOR ANY SPECIAL, INDIRECT OR # CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS # OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN # CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. # ====================================================================== # This test suite has been run successfully on the following systems: # # ----------------------------------------------------------- # System | Python version # ----------------------------------------------------------- # Linux Ubuntu 2.6.20-15 | 2.4, 2.5 # Linux Kubuntu 8.04 32 & 64 bits | 2.5.2 # Linux Debian 2.4.27-2-386 | 2.3.5 # Windows XP prof SP3 | 2.3, 2.4, 2.5, 2.6-RC2 # Windows Vista Ultimate 64 bit | 2.5.1 # Windows Vista Business 32 bit | 2.5.1 # Windows Server 2008 64bit | 2.5.1 # Windows Mobile 6.1 | PythonCE 2.5 # OS X 10.4.10 | 2.3, 2.4, 2.5 # FreeBSD 7.0 | 2.4, 2.5 # ----------------------------------------------------------- import threading import unittest import socket import os import time import re import tempfile import ftplib import random import warnings import sys from pyftpdlib import ftpserver __release__ = 'pyftpdlib 0.5.0' # Attempt to use IP rather than hostname (test suite will run a lot faster) try: HOST = socket.gethostbyname('localhost') except socket.error: HOST = 'localhost' USER = 'user' PASSWD = '12345' HOME = os.getcwd() try: from test.test_support import TESTFN except ImportError: TESTFN = 'temp-fname' TESTFN2 = TESTFN + '2' TESTFN3 = TESTFN + '3' def try_address(host, port=0): """Try to bind a daemon on the given host:port and return True if that has been possible.""" try: ftpserver.FTPServer((host, port), None) except socket.error: return False else: return True SUPPORTS_IPV4 = try_address('127.0.0.1') SUPPORTS_IPV6 = socket.has_ipv6 and try_address('::1') class TestAbstractedFS(unittest.TestCase): """Test for conversion utility methods of AbstractedFS class.""" def test_ftpnorm(self): # Tests for ftpnorm method. ae = self.assertEquals fs = ftpserver.AbstractedFS() fs.cwd = '/' ae(fs.ftpnorm(''), '/') ae(fs.ftpnorm('/'), '/') ae(fs.ftpnorm('.'), '/') ae(fs.ftpnorm('..'), '/') ae(fs.ftpnorm('a'), '/a') ae(fs.ftpnorm('/a'), '/a') ae(fs.ftpnorm('/a/'), '/a') ae(fs.ftpnorm('a/..'), '/') ae(fs.ftpnorm('a/b'), '/a/b') ae(fs.ftpnorm('a/b/..'), '/a') ae(fs.ftpnorm('a/b/../..'), '/') fs.cwd = '/sub' ae(fs.ftpnorm(''), '/sub') ae(fs.ftpnorm('/'), '/') ae(fs.ftpnorm('.'), '/sub') ae(fs.ftpnorm('..'), '/') ae(fs.ftpnorm('a'), '/sub/a') ae(fs.ftpnorm('a/'), '/sub/a') ae(fs.ftpnorm('a/..'), '/sub') ae(fs.ftpnorm('a/b'), '/sub/a/b') ae(fs.ftpnorm('a/b/'), '/sub/a/b') ae(fs.ftpnorm('a/b/..'), '/sub/a') ae(fs.ftpnorm('a/b/../..'), '/sub') ae(fs.ftpnorm('a/b/../../..'), '/') ae(fs.ftpnorm('//'), '/') # UNC paths must be collapsed def test_ftp2fs(self): # Tests for ftp2fs method. ae = self.assertEquals fs = ftpserver.AbstractedFS() join = lambda x, y: os.path.join(x, y.replace('/', os.sep)) def goforit(root): fs.root = root fs.cwd = '/' ae(fs.ftp2fs(''), root) ae(fs.ftp2fs('/'), root) ae(fs.ftp2fs('.'), root) ae(fs.ftp2fs('..'), root) ae(fs.ftp2fs('a'), join(root, 'a')) ae(fs.ftp2fs('/a'), join(root, 'a')) ae(fs.ftp2fs('/a/'), join(root, 'a')) ae(fs.ftp2fs('a/..'), root) ae(fs.ftp2fs('a/b'), join(root, r'a/b')) ae(fs.ftp2fs('/a/b'), join(root, r'a/b')) ae(fs.ftp2fs('/a/b/..'), join(root, 'a')) ae(fs.ftp2fs('/a/b/../..'), root) fs.cwd = '/sub' ae(fs.ftp2fs(''), join(root, 'sub')) ae(fs.ftp2fs('/'), root) ae(fs.ftp2fs('.'), join(root, 'sub')) ae(fs.ftp2fs('..'), root) ae(fs.ftp2fs('a'), join(root, 'sub/a')) ae(fs.ftp2fs('a/'), join(root, 'sub/a')) ae(fs.ftp2fs('a/..'), join(root, 'sub')) ae(fs.ftp2fs('a/b'), join(root, 'sub/a/b')) ae(fs.ftp2fs('a/b/..'), join(root, 'sub/a')) ae(fs.ftp2fs('a/b/../..'), join(root, 'sub')) ae(fs.ftp2fs('a/b/../../..'), root) ae(fs.ftp2fs('//a'), join(root, 'a')) # UNC paths must be collapsed if os.sep == '\\': goforit(r'C:\dir') goforit('C:\\') # on DOS-derived filesystems (e.g. Windows) this is the same # as specifying the current drive directory (e.g. 'C:\\') goforit('\\') elif os.sep == '/': goforit('/home/user') goforit('/') else: # os.sep == ':'? Don't know... let's try it anyway goforit(os.getcwd()) def test_fs2ftp(self): # Tests for fs2ftp method. ae = self.assertEquals fs = ftpserver.AbstractedFS() join = lambda x, y: os.path.join(x, y.replace('/', os.sep)) def goforit(root): fs.root = root ae(fs.fs2ftp(root), '/') ae(fs.fs2ftp(join(root, '/')), '/') ae(fs.fs2ftp(join(root, '.')), '/') ae(fs.fs2ftp(join(root, '..')), '/') # can't escape from root ae(fs.fs2ftp(join(root, 'a')), '/a') ae(fs.fs2ftp(join(root, 'a/')), '/a') ae(fs.fs2ftp(join(root, 'a/..')), '/') ae(fs.fs2ftp(join(root, 'a/b')), '/a/b') ae(fs.fs2ftp(join(root, 'a/b')), '/a/b') ae(fs.fs2ftp(join(root, 'a/b/..')), '/a') ae(fs.fs2ftp(join(root, '/a/b/../..')), '/') fs.cwd = '/sub' ae(fs.fs2ftp(join(root, 'a/')), '/a') if os.sep == '\\': goforit(r'C:\dir') goforit('C:\\') # on DOS-derived filesystems (e.g. Windows) this is the same # as specifying the current drive directory (e.g. 'C:\\') goforit('\\') fs.root = r'C:\dir' ae(fs.fs2ftp('C:\\'), '/') ae(fs.fs2ftp('D:\\'), '/') ae(fs.fs2ftp('D:\\dir'), '/') elif os.sep == '/': goforit('/') assert os.path.realpath('/__home/user') == '/__home/user', \ 'Test skipped (symlinks not allowed).' goforit('/__home/user') fs.root = '/__home/user' ae(fs.fs2ftp('/__home'), '/') ae(fs.fs2ftp('/'), '/') ae(fs.fs2ftp('/__home/userx'), '/') else: # os.sep == ':'? Don't know... let's try it anyway goforit(os.getcwd()) def test_validpath(self): # Tests for validpath method. fs = ftpserver.AbstractedFS() fs.root = HOME self.failUnless(fs.validpath(HOME)) self.failUnless(fs.validpath(HOME + '/')) self.failIf(fs.validpath(HOME + 'xxx')) if hasattr(os, 'symlink'): # Tests for validpath on systems supporting symbolic links. def _safe_remove(self, path): # convenience function for removing temporary files try: os.remove(path) except os.error: pass def test_validpath_validlink(self): # Test validpath by issuing a symlink pointing to a path # inside the root directory. fs = ftpserver.AbstractedFS() fs.root = HOME try: open(TESTFN, 'w') os.symlink(TESTFN, TESTFN2) self.failUnless(fs.validpath(TESTFN)) finally: self._safe_remove(TESTFN) self._safe_remove(TESTFN2) def test_validpath_external_symlink(self): # Test validpath by issuing a symlink pointing to a path # outside the root directory. fs = ftpserver.AbstractedFS() fs.root = HOME try: # tempfile should create our file in /tmp directory # which should be outside the user root. If it is not # we just skip the test. file = tempfile.NamedTemporaryFile() if HOME == os.path.dirname(file.name): return os.symlink(file.name, TESTFN) self.failIf(fs.validpath(TESTFN)) finally: self._safe_remove(TESTFN) file.close() class TestDummyAuthorizer(unittest.TestCase): """Tests for DummyAuthorizer class.""" # temporarily change warnings to exceptions for the purposes of testing def setUp(self): self.tempdir = tempfile.mkdtemp(dir=HOME) self.subtempdir = tempfile.mkdtemp(dir=os.path.join(HOME, self.tempdir)) self.tempfile = open(os.path.join(self.tempdir, TESTFN), 'w').name self.subtempfile = open(os.path.join(self.subtempdir, TESTFN), 'w').name warnings.filterwarnings("error") def tearDown(self): os.remove(self.tempfile) os.remove(self.subtempfile) os.rmdir(self.subtempdir) os.rmdir(self.tempdir) warnings.resetwarnings() def assertRaisesWithMsg(self, excClass, msg, callableObj, *args, **kwargs): try: callableObj(*args, **kwargs) except excClass, why: if str(why) == msg: return raise self.failureException("%s != %s" %(str(why), msg)) else: if hasattr(excClass,'__name__'): excName = excClass.__name__ else: excName = str(excClass) raise self.failureException, "%s not raised" % excName def test_common_methods(self): auth = ftpserver.DummyAuthorizer() # create user auth.add_user(USER, PASSWD, HOME) auth.add_anonymous(HOME) # check credentials self.failUnless(auth.validate_authentication(USER, PASSWD)) self.failIf(auth.validate_authentication(USER, 'wrongpwd')) # remove them auth.remove_user(USER) auth.remove_user('anonymous') # raise exc if user does not exists self.assertRaises(KeyError, auth.remove_user, USER) # raise exc if path does not exist self.assertRaisesWithMsg(ftpserver.AuthorizerError, 'No such directory: "%s"' %'?:\\', auth.add_user, USER, PASSWD, '?:\\') self.assertRaisesWithMsg(ftpserver.AuthorizerError, 'No such directory: "%s"' %'?:\\', auth.add_anonymous, '?:\\') # raise exc if user already exists auth.add_user(USER, PASSWD, HOME) auth.add_anonymous(HOME) self.assertRaisesWithMsg(ftpserver.AuthorizerError, 'User "%s" already exists' %USER, auth.add_user, USER, PASSWD, HOME) self.assertRaisesWithMsg(ftpserver.AuthorizerError, 'User "anonymous" already exists', auth.add_anonymous, HOME) auth.remove_user(USER) auth.remove_user('anonymous') # raise on wrong permission self.assertRaisesWithMsg(ftpserver.AuthorizerError, 'No such permission "?"', auth.add_user, USER, PASSWD, HOME, perm='?') self.assertRaisesWithMsg(ftpserver.AuthorizerError, 'No such permission "?"', auth.add_anonymous, HOME, perm='?') # expect warning on write permissions assigned to anonymous user for x in "adfmw": self.assertRaisesWithMsg(RuntimeWarning, "Write permissions assigned to anonymous user.", auth.add_anonymous, HOME, perm=x) def test_override_perm_interface(self): auth = ftpserver.DummyAuthorizer() auth.add_user(USER, PASSWD, HOME, perm='elr') # raise exc if user does not exists self.assertRaises(KeyError, auth.override_perm, USER+'w', HOME, 'elr') # raise exc if path does not exist or it's not a directory self.assertRaisesWithMsg(ftpserver.AuthorizerError, 'No such directory: "%s"' %'?:\\', auth.override_perm, USER, '?:\\', 'elr') self.assertRaisesWithMsg(ftpserver.AuthorizerError, 'No such directory: "%s"' %self.tempfile, auth.override_perm, USER, self.tempfile, 'elr') # raise on wrong permission self.assertRaisesWithMsg(ftpserver.AuthorizerError, 'No such permission "?"', auth.override_perm, USER, HOME, perm='?') # expect warning on write permissions assigned to anonymous user auth.add_anonymous(HOME) for p in "adfmw": self.assertRaisesWithMsg(RuntimeWarning, "Write permissions assigned to anonymous user.", auth.override_perm, 'anonymous', HOME, p) # raise on attempt to override home directory permissions self.assertRaisesWithMsg(ftpserver.AuthorizerError, "Can't override home directory permissions", auth.override_perm, USER, HOME, perm='w') # raise on attempt to override a path escaping home directory if os.path.dirname(HOME) != HOME: self.assertRaisesWithMsg(ftpserver.AuthorizerError, "Path escapes user home directory", auth.override_perm, USER, os.path.dirname(HOME), perm='w') # try to re-set an overridden permission auth.override_perm(USER, self.tempdir, perm='w') auth.override_perm(USER, self.tempdir, perm='wr') def test_override_perm_recursive_paths(self): auth = ftpserver.DummyAuthorizer() auth.add_user(USER, PASSWD, HOME, perm='elr') self.assert_(auth.has_perm(USER, 'w', self.tempdir) is False) auth.override_perm(USER, self.tempdir, perm='w', recursive=True) self.assert_(auth.has_perm(USER, 'w', HOME) is False) self.assert_(auth.has_perm(USER, 'w', self.tempdir) is True) self.assert_(auth.has_perm(USER, 'w', self.tempfile) is True) self.assert_(auth.has_perm(USER, 'w', self.subtempdir) is True) self.assert_(auth.has_perm(USER, 'w', self.subtempfile) is True) self.assert_(auth.has_perm(USER, 'w', HOME + '@') is False) self.assert_(auth.has_perm(USER, 'w', self.tempdir + '@') is False) path = os.path.join(self.tempdir + '@', os.path.basename(self.tempfile)) self.assert_(auth.has_perm(USER, 'w', path) is False) # test case-sensitiveness if (os.name in ('nt', 'ce')) or (sys.platform == 'cygwin'): self.assert_(auth.has_perm(USER, 'w', self.tempdir.upper()) is True) def test_override_perm_not_recursive_paths(self): auth = ftpserver.DummyAuthorizer() auth.add_user(USER, PASSWD, HOME, perm='elr') self.assert_(auth.has_perm(USER, 'w', self.tempdir) is False) auth.override_perm(USER, self.tempdir, perm='w') self.assert_(auth.has_perm(USER, 'w', HOME) is False) self.assert_(auth.has_perm(USER, 'w', self.tempdir) is True) self.assert_(auth.has_perm(USER, 'w', self.tempfile) is True) self.assert_(auth.has_perm(USER, 'w', self.subtempdir) is False) self.assert_(auth.has_perm(USER, 'w', self.subtempfile) is False) self.assert_(auth.has_perm(USER, 'w', HOME + '@') is False) self.assert_(auth.has_perm(USER, 'w', self.tempdir + '@') is False) path = os.path.join(self.tempdir + '@', os.path.basename(self.tempfile)) self.assert_(auth.has_perm(USER, 'w', path) is False) # test case-sensitiveness if (os.name in ('nt', 'ce')) or (sys.platform == 'cygwin'): self.assert_(auth.has_perm(USER, 'w', self.tempdir.upper()) is True) class TestCallLater(unittest.TestCase): """Tests for CallLater class.""" def setUp(self): for task in ftpserver._tasks: if not task.cancelled: task.cancel() del ftpserver._tasks[:] def scheduler(self, timeout=0.01, count=100): while ftpserver._tasks and count > 0: ftpserver._scheduler() count -= 1 time.sleep(timeout) def test_interface(self): fun = lambda: 0 self.assertRaises(AssertionError, ftpserver.CallLater, -1, fun) x = ftpserver.CallLater(3, fun) self.assertRaises(AssertionError, x.delay, -1) self.assert_(x.cancelled is False) x.cancel() self.assert_(x.cancelled is True) self.assertRaises(AssertionError, x.call) self.assertRaises(AssertionError, x.reset) self.assertRaises(AssertionError, x.delay, 2) self.assertRaises(AssertionError, x.cancel) def test_order(self): l = [] fun = lambda x: l.append(x) for x in [0.05, 0.04, 0.03, 0.02, 0.01]: ftpserver.CallLater(x, fun, x) self.scheduler() self.assertEqual(l, [0.01, 0.02, 0.03, 0.04, 0.05]) def test_delay(self): l = [] fun = lambda x: l.append(x) ftpserver.CallLater(0.01, fun, 0.01).delay(0.07) ftpserver.CallLater(0.02, fun, 0.02).delay(0.08) ftpserver.CallLater(0.03, fun, 0.03) ftpserver.CallLater(0.04, fun, 0.04) ftpserver.CallLater(0.05, fun, 0.05) ftpserver.CallLater(0.06, fun, 0.06).delay(0.001) self.scheduler() self.assertEqual(l, [0.06, 0.03, 0.04, 0.05, 0.01, 0.02]) def test_reset(self): # will fail on such systems where time.time() does not provide # time with a better precision than 1 second. l = [] fun = lambda x: l.append(x) ftpserver.CallLater(0.01, fun, 0.01) ftpserver.CallLater(0.02, fun, 0.02) ftpserver.CallLater(0.03, fun, 0.03) x = ftpserver.CallLater(0.04, fun, 0.04) ftpserver.CallLater(0.05, fun, 0.05) time.sleep(0.1) x.reset() self.scheduler() self.assertEqual(l, [0.01, 0.02, 0.03, 0.05, 0.04]) def test_cancel(self): l = [] fun = lambda x: l.append(x) ftpserver.CallLater(0.01, fun, 0.01).cancel() ftpserver.CallLater(0.02, fun, 0.02) ftpserver.CallLater(0.03, fun, 0.03) ftpserver.CallLater(0.04, fun, 0.04) ftpserver.CallLater(0.05, fun, 0.05).cancel() self.scheduler() self.assertEqual(l, [0.02, 0.03, 0.04]) class TestFtpAuthentication(unittest.TestCase): "test: USER, PASS, REIN." def setUp(self): self.server = FTPd() self.server.start() self.client = ftplib.FTP() self.client.connect(self.server.host, self.server.port) self.f1 = open(TESTFN, 'w+b') self.f2 = open(TESTFN2, 'w+b') def tearDown(self): self.client.close() self.server.stop() if not self.f1.closed: self.f1.close() if not self.f2.closed: self.f2.close() os.remove(TESTFN) os.remove(TESTFN2) def test_auth_ok(self): self.client.login(user=USER, passwd=PASSWD) def test_anon_auth(self): self.client.login(user='anonymous', passwd='anon@') self.client.login(user='AnonYmoUs', passwd='anon@') self.client.login(user='anonymous', passwd='') # Commented after delayed response on wrong credentials has been # introduced because tests take too much to complete. ## def test_auth_failed(self): ## self.assertRaises(ftplib.error_perm, self.client.login, USER, 'wrong') ## self.assertRaises(ftplib.error_perm, self.client.login, 'wrong', PASSWD) ## self.assertRaises(ftplib.error_perm, self.client.login, 'wrong', 'wrong') ## def test_max_auth(self): ## self.assertRaises(ftplib.error_perm, self.client.login, USER, 'wrong') ## self.assertRaises(ftplib.error_perm, self.client.login, USER, 'wrong') ## self.assertRaises(ftplib.error_perm, self.client.login, USER, 'wrong') ## # If authentication fails for 3 times ftpd disconnects the ## # client. We can check if that happens by using self.client.sendcmd() ## # on the 'dead' socket object. If socket object is really ## # closed it should be raised a socket.error exception (Windows) ## # or a EOFError exception (Linux). ## self.assertRaises((socket.error, EOFError), self.client.sendcmd, '') def test_rein(self): self.client.login(user=USER, passwd=PASSWD) self.client.sendcmd('rein') # user not authenticated, error response expected self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'pwd') # by logging-in again we should be able to execute a # file-system command self.client.login(user=USER, passwd=PASSWD) self.client.sendcmd('pwd') def test_rein_during_transfer(self): self.client.login(user=USER, passwd=PASSWD) data = 'abcde12345' * 100000 self.f1.write(data) self.f1.close() self.client.voidcmd('TYPE I') conn = self.client.transfercmd('retr ' + TESTFN) rein_sent = 0 while 1: chunk = conn.recv(8192) if not chunk: break self.f2.write(chunk) if not rein_sent: rein_sent = 1 # flush account, error response expected self.client.sendcmd('rein') self.assertRaises(ftplib.error_perm, self.client.dir) # a 226 response is expected once tranfer finishes self.assertEqual(self.client.voidresp()[:3], '226') # account is still flushed, error response is still expected self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'size ' + TESTFN) # by logging-in again we should be able to execute a # filesystem command self.client.login(user=USER, passwd=PASSWD) self.client.sendcmd('pwd') self.f2.seek(0) self.assertEqual(hash(data), hash (self.f2.read())) def test_user(self): # Test USER while already authenticated and no transfer # is in progress. self.client.login(user=USER, passwd=PASSWD) self.client.sendcmd('user ' + USER) # authentication flushed self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'pwd') self.client.sendcmd('pass ' + PASSWD) self.client.sendcmd('pwd') def test_user_on_transfer(self): # Test USER while already authenticated and a transfer is # in progress. self.client.login(user=USER, passwd=PASSWD) data = 'abcde12345' * 100000 self.f1.write(data) self.f1.close() self.client.voidcmd('TYPE I') conn = self.client.transfercmd('retr ' + TESTFN) rein_sent = 0 while 1: chunk = conn.recv(8192) if not chunk: break self.f2.write(chunk) # stop transfer while it isn't finished yet if not rein_sent: rein_sent = 1 # flush account, expect an error response self.client.sendcmd('user ' + USER) self.assertRaises(ftplib.error_perm, self.client.dir) # a 226 response is expected once tranfer finishes self.assertEqual(self.client.voidresp()[:3], '226') # account is still flushed, error response is still expected self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'pwd') # by logging-in again we should be able to execute a # filesystem command self.client.sendcmd('pass ' + PASSWD) self.client.sendcmd('pwd') self.f2.seek(0) self.assertEqual(hash(data), hash (self.f2.read())) class TestFtpDummyCmds(unittest.TestCase): "test: TYPE, STRU, MODE, NOOP, SYST, ALLO, HELP" def setUp(self): self.server = FTPd() self.server.start() self.client = ftplib.FTP() self.client.connect(self.server.host, self.server.port) self.client.login(USER, PASSWD) def tearDown(self): self.client.close() self.server.stop() def test_type(self): self.client.sendcmd('type a') self.client.sendcmd('type i') self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'type ?!?') def test_stru(self): self.client.sendcmd('stru f') self.client.sendcmd('stru F') self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'stru ?!?') def test_mode(self): self.client.sendcmd('mode s') self.client.sendcmd('mode S') self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'mode ?!?') def test_noop(self): self.client.sendcmd('noop') def test_syst(self): self.client.sendcmd('syst') def test_allo(self): self.client.sendcmd('allo x') def test_quit(self): self.client.sendcmd('quit') def test_help(self): self.client.sendcmd('help') cmd = random.choice(ftpserver.proto_cmds.keys()) self.client.sendcmd('help %s' %cmd) self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'help ?!?') def test_rest(self): # test error conditions only; # restored data-transfer is tested later self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'rest') self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'rest str') self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'rest -1') def test_opts_feat(self): self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'opts mlst bad_fact') self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'opts mlst type ;') self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'opts not_mlst') # utility function which used for extracting the MLST "facts" # string from the FEAT response def mlst(): resp = self.client.sendcmd('feat') return re.search(r'^\s*MLST\s+(\S+)$', resp, re.MULTILINE).group(1) # we rely on "type", "perm", "size", and "modify" facts which # are those available on all platforms self.failUnless('type*;perm*;size*;modify*;' in mlst()) self.assertEqual(self.client.sendcmd('opts mlst type;'), '200 MLST OPTS type;') self.assertEqual(self.client.sendcmd('opts mLSt TypE;'), '200 MLST OPTS type;') self.failUnless('type*;perm;size;modify;' in mlst()) self.assertEqual(self.client.sendcmd('opts mlst'), '200 MLST OPTS ') self.failUnless(not '*' in mlst()) self.assertEqual(self.client.sendcmd('opts mlst fish;cakes;'), '200 MLST OPTS ') self.failUnless(not '*' in mlst()) self.assertEqual(self.client.sendcmd('opts mlst fish;cakes;type;'), \ '200 MLST OPTS type;') self.failUnless('type*;perm;size;modify;' in mlst()) class TestFtpCmdsSemantic(unittest.TestCase): arg_cmds = ('allo','appe','dele','eprt','mdtm','mode','mkd','opts','port', 'rest','retr','rmd','rnfr','rnto','size', 'stor', 'stru','type', 'user','xmkd','xrmd') def setUp(self): self.server = FTPd() self.server.start() self.client = ftplib.FTP() self.client.connect(self.server.host, self.server.port) self.client.login(USER, PASSWD) def tearDown(self): self.client.close() self.server.stop() def test_arg_cmds(self): # test commands requiring an argument expected = "501 Syntax error: command needs an argument." for cmd in self.arg_cmds: self.client.putcmd(cmd) resp = self.client.getmultiline() self.assertEqual(resp, expected) def test_no_arg_cmds(self): # test commands accepting no arguments expected = "501 Syntax error: command does not accept arguments." for cmd in ('abor','cdup','feat','noop','pasv','pwd','quit','rein', 'syst','xcup','xpwd'): self.client.putcmd(cmd + ' arg') resp = self.client.getmultiline() self.assertEqual(resp, expected) def test_auth_cmds(self): # test those commands requiring client to be authenticated expected = "530 Log in with USER and PASS first." self.client.sendcmd('rein') for cmd in ftpserver.proto_cmds: cmd = cmd.lower() if cmd in ('feat','help','noop','user','pass','stat','syst','quit'): continue if cmd in self.arg_cmds: cmd = cmd + ' arg' self.client.putcmd(cmd) resp = self.client.getmultiline() self.assertEqual(resp, expected) def test_no_auth_cmds(self): # test those commands that do not require client to be authenticated self.client.sendcmd('rein') for cmd in ('feat','help','noop','stat','syst'): self.client.sendcmd(cmd) # STAT provided with an argument is equal to LIST hence not allowed # if not authenticated self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'stat /') self.client.sendcmd('quit') class TestFtpFsOperations(unittest.TestCase): "test: PWD, CWD, CDUP, SIZE, RNFR, RNTO, DELE, MKD, RMD, MDTM, STAT" def setUp(self): self.server = FTPd() self.server.start() self.client = ftplib.FTP() self.client.connect(self.server.host, self.server.port) self.client.login(USER, PASSWD) self.tempfile = os.path.basename(open(TESTFN, 'w+b').name) self.tempdir = os.path.basename(tempfile.mktemp(dir=HOME)) os.mkdir(self.tempdir) def tearDown(self): self.client.close() self.server.stop() if os.path.exists(self.tempfile): os.remove(self.tempfile) if os.path.exists(self.tempdir): os.rmdir(self.tempdir) def test_cwd(self): self.client.cwd(self.tempdir) self.assertEqual(self.client.pwd(), '/' + self.tempdir) self.assertRaises(ftplib.error_perm, self.client.cwd, 'subtempdir') # cwd provided with no arguments is supposed to move us to the # root directory self.client.sendcmd('cwd') self.assertEqual(self.client.pwd(), '/') def test_pwd(self): self.assertEqual(self.client.pwd(), '/') self.client.cwd(self.tempdir) self.assertEqual(self.client.pwd(), '/' + self.tempdir) def test_cdup(self): self.client.cwd(self.tempdir) self.assertEqual(self.client.pwd(), '/' + self.tempdir) self.client.sendcmd('cdup') self.assertEqual(self.client.pwd(), '/') # make sure we can't escape from root directory self.client.sendcmd('cdup') self.assertEqual(self.client.pwd(), '/') def test_mkd(self): tempdir = os.path.basename(tempfile.mktemp(dir=HOME)) self.client.mkd(tempdir) # make sure we can't create directories which already exist # (probably not really necessary); # let's use a try/except statement to avoid leaving behind # orphaned temporary directory in the event of a test failure. try: self.client.mkd(tempdir) except ftplib.error_perm: os.rmdir(tempdir) # ok else: self.fail('ftplib.error_perm not raised.') def test_rmd(self): self.client.rmd(self.tempdir) self.assertRaises(ftplib.error_perm, self.client.rmd, self.tempfile) # make sure we can't remove the root directory self.assertRaises(ftplib.error_perm, self.client.rmd, '/') def test_dele(self): self.client.delete(self.tempfile) self.assertRaises(ftplib.error_perm, self.client.delete, self.tempdir) def test_rnfr_rnto(self): # rename file tempname = os.path.basename(tempfile.mktemp(dir=HOME)) self.client.rename(self.tempfile, tempname) self.client.rename(tempname, self.tempfile) # rename dir tempname = os.path.basename(tempfile.mktemp(dir=HOME)) self.client.rename(self.tempdir, tempname) self.client.rename(tempname, self.tempdir) # rnfr/rnto over non-existing paths bogus = os.path.basename(tempfile.mktemp(dir=HOME)) self.assertRaises(ftplib.error_perm, self.client.rename, bogus, '/x') self.assertRaises(ftplib.error_perm, self.client.rename, self.tempfile, '/') # make sure we can't rename root directory self.assertRaises(ftplib.error_perm, self.client.rename, '/', '/x') def test_mdtm(self): self.client.sendcmd('mdtm ' + self.tempfile) # make sure we can't use mdtm against directories try: self.client.sendcmd('mdtm ' + self.tempdir) except ftplib.error_perm, err: self.failUnless("not retrievable" in str(err)) else: self.fail('Exception not raised') def test_size(self): self.client.size(self.tempfile) # make sure we can't use size against directories try: self.client.sendcmd('size ' + self.tempdir) except ftplib.error_perm, err: self.failUnless("not retrievable" in str(err)) else: self.fail('Exception not raised') class TestFtpRetrieveData(unittest.TestCase): "test: RETR, REST, LIST, NLST, argumented STAT" def setUp(self): self.server = FTPd() self.server.start() self.client = ftplib.FTP() self.client.connect(self.server.host, self.server.port) self.client.login(USER, PASSWD) self.f1 = open(TESTFN, 'w+b') self.f2 = open(TESTFN2, 'w+b') def tearDown(self): self.client.close() self.server.stop() if not self.f1.closed: self.f1.close() if not self.f2.closed: self.f2.close() os.remove(TESTFN) os.remove(TESTFN2) def test_retr(self): data = 'abcde12345' * 100000 self.f1.write(data) self.f1.close() self.client.retrbinary("retr " + TESTFN, self.f2.write) self.f2.seek(0) self.assertEqual(hash(data), hash(self.f2.read())) def test_restore_on_retr(self): data = 'abcde12345' * 100000 fname_1 = os.path.basename(self.f1.name) self.f1.write(data) self.f1.close() # look at ftplib.FTP.retrbinary method to understand this mess self.client.voidcmd('TYPE I') conn = self.client.transfercmd('retr ' + fname_1) chunk = conn.recv(len(data) / 2) self.f2.write(chunk) conn.close() # transfer wasn't finished yet so we expect a 426 response self.assertRaises(ftplib.error_temp, self.client.voidresp) # resuming transfer by using a marker value greater than the # file size stored on the server should result in an error # on retr (RFC-1123) file_size = self.client.size(fname_1) self.client.sendcmd('rest %s' %((file_size + 1))) self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'retr ' + fname_1) # test resume self.client.sendcmd('rest %s' %len(chunk)) self.client.retrbinary("retr " + fname_1, self.f2.write) self.f2.seek(0) self.assertEqual(hash(data), hash (self.f2.read())) def _test_listing_cmds(self, cmd): """Tests common to LIST NLST and MLSD commands.""" # assume that no argument has the same meaning of "/" l1 = l2 = [] self.client.retrlines(cmd, l1.append) self.client.retrlines(cmd + ' /', l2.append) self.assertEqual(l1, l2) if cmd.lower() != 'mlsd': # if pathname is a file one line is expected x = [] self.client.retrlines('%s ' %cmd + TESTFN, x.append) self.assertEqual(len(x), 1) self.failUnless(''.join(x).endswith(TESTFN)) # non-existent path, 550 response is expected bogus = os.path.basename(tempfile.mktemp(dir=HOME)) self.assertRaises(ftplib.error_perm, self.client.retrlines, '%s ' %cmd + bogus, lambda x: x) # for an empty directory we excpect that the data channel is # opened anyway and that no data is received x = [] tempdir = os.path.basename(tempfile.mkdtemp(dir=HOME)) try: self.client.retrlines('%s %s' %(cmd, tempdir), x.append) self.assertEqual(x, []) finally: os.rmdir(tempdir) def test_nlst(self): # common tests self._test_listing_cmds('nlst') def test_list(self): # common tests self._test_listing_cmds('list') # known incorrect pathname arguments (e.g. old clients) are # expected to be treated as if pathname would be == '/' l1 = l2 = l3 = l4 = l5 = [] self.client.retrlines('list /', l1.append) self.client.retrlines('list -a', l2.append) self.client.retrlines('list -l', l3.append) self.client.retrlines('list -al', l4.append) self.client.retrlines('list -la', l5.append) tot = (l1, l2, l3, l4, l5) for x in range(len(tot) - 1): self.assertEqual(tot[x], tot[x+1]) def test_mlst(self): # utility function for extracting the line of interest mlstline = lambda cmd: self.client.voidcmd(cmd).split('\n')[1] # the fact set must be preceded by a space self.failUnless(mlstline('mlst').startswith(' ')) # where TVFS is supported, a fully qualified pathname is expected self.failUnless(mlstline('mlst ' + TESTFN).endswith('/' + TESTFN)) self.failUnless(mlstline('mlst').endswith('/')) # assume that no argument has the same meaning of "/" self.assertEqual(mlstline('mlst'), mlstline('mlst /')) # non-existent path bogus = os.path.basename(tempfile.mktemp(dir=HOME)) self.assertRaises(ftplib.error_perm, mlstline, bogus) # test file/dir notations self.failUnless('type=dir' in mlstline('mlst')) self.failUnless('type=file' in mlstline('mlst ' + TESTFN)) # let's add some tests for OPTS command self.client.sendcmd('opts mlst type;') self.assertEqual(mlstline('mlst'), ' type=dir; /') # where no facts are present, two leading spaces before the # pathname are required (RFC-3659) self.client.sendcmd('opts mlst') self.assertEqual(mlstline('mlst'), ' /') def test_mlsd(self): # common tests self._test_listing_cmds('mlsd') dir = os.path.basename(tempfile.mkdtemp(dir=HOME)) try: try: self.client.retrlines('mlsd ' + TESTFN, lambda x: x) except ftplib.error_perm, resp: # if path is a file a 501 response code is expected self.assertEqual(str(resp)[0:3], "501") else: self.fail("Exception not raised") finally: os.rmdir(dir) def test_stat(self): # test STAT provided with argument which is equal to LIST self.client.sendcmd('stat /') self.client.sendcmd('stat ' + TESTFN) self.client.putcmd('stat *') resp = self.client.getmultiline() self.assertEqual(resp, '550 Globbing not supported.') bogus = os.path.basename(tempfile.mktemp(dir=HOME)) self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'stat ' + bogus) class TestFtpAbort(unittest.TestCase): "test: ABOR" def setUp(self): self.server = FTPd() self.server.start() self.client = ftplib.FTP() self.client.connect(self.server.host, self.server.port) self.client.login(USER, PASSWD) self.f1 = open(TESTFN, 'w+b') self.f2 = open(TESTFN2, 'w+b') def tearDown(self): self.client.close() self.server.stop() if not self.f1.closed: self.f1.close() if not self.f2.closed: self.f2.close() os.remove(self.f1.name) os.remove(self.f2.name) def test_abor_no_data(self): # Case 1: ABOR while no data channel is opened: respond with 225. resp = self.client.sendcmd('ABOR') self.failUnlessEqual('225 No transfer to abort.', resp) def test_abor_pasv(self): # Case 2: user sends a PASV, a data-channel socket is listening # but not connected, and ABOR is sent: close listening data # socket, respond with 225. self.client.makepasv() respcode = self.client.sendcmd('ABOR')[:3] self.failUnlessEqual('225', respcode) def test_abor_port(self): # Case 3: data channel opened with PASV or PORT, but ABOR sent # before a data transfer has been started: close data channel, # respond with 225 self.client.makeport() respcode = self.client.sendcmd('ABOR')[:3] self.failUnlessEqual('225', respcode) def test_abor(self): # Case 4: ABOR while a data transfer on DTP channel is in # progress: close data channel, respond with 426, respond # with 226. data = 'abcde12345' * 100000 self.f1.write(data) self.f1.close() # this ugly loop construct is to simulate an interrupted # transfer since ftplib doesn't like running storbinary() # in a separate thread self.client.voidcmd('TYPE I') conn = self.client.transfercmd('retr ' + TESTFN) chunk = conn.recv(len(data) / 2) # stop transfer while it isn't finished yet self.client.putcmd('ABOR') # transfer isn't finished yet so ftpd should respond with 426 self.assertRaises(ftplib.error_temp, self.client.voidresp) # transfer successfully aborted, so should now respond with a 226 self.failUnlessEqual('226', self.client.voidresp()[:3]) if hasattr(socket, 'MSG_OOB'): def test_oob_abor(self): # Send ABOR by following the RFC-959 directives of sending # Telnet IP/Synch sequence as OOB data. # On some systems like FreeBSD this happened to be a problem # due to a different SO_OOBINLINE behavior. # On some platforms (e.g. Python CE) the test may fail # although the MSG_OOB constant is defined. self.client.sock.sendall(chr(244), socket.MSG_OOB) self.client.sock.sendall(chr(242), socket.MSG_OOB) self.client.sock.sendall('abor\r\n') self.client.sock.settimeout(1) self.assertEqual(self.client.getresp()[:3], '225') class TestFtpStoreData(unittest.TestCase): "test: STOR, STOU, APPE, REST" def setUp(self): self.server = FTPd() self.server.start() self.client = ftplib.FTP() self.client.connect(self.server.host, self.server.port) self.client.login(USER, PASSWD) self.f1 = open(TESTFN, 'w+b') self.f2 = open(TESTFN2, 'w+b') def tearDown(self): self.client.close() self.server.stop() if not self.f1.closed: self.f1.close() if not self.f2.closed: self.f2.close() os.remove(TESTFN) os.remove(TESTFN2) def test_stor(self): # TESTFN3 is the remote file name try: data = 'abcde12345' * 100000 self.f1.write(data) self.f1.seek(0) self.client.storbinary('stor ' + TESTFN3, self.f1) self.client.retrbinary('retr ' + TESTFN3, self.f2.write) self.f2.seek(0) self.assertEqual(hash(data), hash (self.f2.read())) finally: # we do not use os.remove because file could be still # locked by ftpd thread if os.path.exists(TESTFN3): self.client.delete(TESTFN3) def test_stou(self): data = 'abcde12345' * 100000 self.f1.write(data) self.f1.seek(0) self.client.voidcmd('TYPE I') # filename comes in as "1xx FILE: " filename = self.client.sendcmd('stou').split('FILE: ')[1] try: sock = self.client.makeport() conn, sockaddr = sock.accept() while 1: buf = self.f1.read(8192) if not buf: break conn.sendall(buf) conn.close() # transfer finished, a 226 response is expected self.client.voidresp() self.client.retrbinary('retr ' + filename, self.f2.write) self.f2.seek(0) self.assertEqual(hash(data), hash (self.f2.read())) finally: # we do not use os.remove because file could be # still locked by ftpd thread if os.path.exists(filename): self.client.delete(filename) def test_stou_rest(self): # watch for STOU preceded by REST, which makes no sense. self.client.sendcmd('rest 10') self.assertRaises(ftplib.error_temp, self.client.sendcmd, 'stou') def test_appe(self): # TESTFN3 is the remote file name try: data1 = 'abcde12345' * 100000 self.f1.write(data1) self.f1.seek(0) self.client.storbinary('stor ' + TESTFN3, self.f1) data2 = 'fghil67890' * 100000 self.f1.write(data2) self.f1.seek(self.client.size(TESTFN3)) self.client.storbinary('appe ' + TESTFN3, self.f1) self.client.retrbinary("retr " + TESTFN3, self.f2.write) self.f2.seek(0) self.assertEqual(hash(data1 + data2), hash (self.f2.read())) finally: # we do not use os.remove because file could be still # locked by ftpd thread if os.path.exists(TESTFN3): self.client.delete(TESTFN3) def test_appe_rest(self): # watch for APPE preceded by REST, which makes no sense. self.client.sendcmd('rest 10') self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'appe x') def test_rest_on_stor(self): # TESTFN3 is the remote file name data = 'abcde12345' * 100000 self.f1.write(data) self.f1.seek(0) self.client.voidcmd('TYPE I') conn = self.client.transfercmd('stor ' + TESTFN3) bytes_sent = 0 while 1: chunk = self.f1.read(8192) conn.sendall(chunk) bytes_sent += len(chunk) # stop transfer while it isn't finished yet if bytes_sent >= 524288: # 2^19 break elif not chunk: break conn.close() # transfer wasn't finished yet so we expect a 426 response self.client.voidresp() # resuming transfer by using a marker value greater than the # file size stored on the server should result in an error # on stor file_size = self.client.size(TESTFN3) self.assertEqual(file_size, bytes_sent) self.client.sendcmd('rest %s' %((file_size + 1))) self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'stor ' + TESTFN3) self.client.sendcmd('rest %s' %bytes_sent) self.client.storbinary('stor ' + TESTFN3, self.f1) self.client.retrbinary('retr ' + TESTFN3, self.f2.write) self.f1.seek(0) self.f2.seek(0) self.assertEqual(hash(self.f1.read()), hash(self.f2.read())) self.client.delete(TESTFN3) class TestTimeouts(unittest.TestCase): """Test idle-timeout capabilities of control and data channels. Some tests may fail on slow machines. """ def _setUp(self, idle_timeout=300, data_timeout=300, pasv_timeout=30): self.server = FTPd() self.server.handler.timeout = idle_timeout self.server.handler.dtp_handler.timeout = data_timeout self.server.handler.passive_dtp.timeout = pasv_timeout self.server.start() self.client = ftplib.FTP() self.client.connect(self.server.host, self.server.port) self.client.login(USER, PASSWD) def tearDown(self): self.client.close() self.server.handler.timeout = 300 self.server.handler.dtp_handler.timeout = 300 self.server.handler.passive_dtp.timeout = 30 self.server.stop() def test_idle_timeout(self): # Test control channel timeout. The client which does not send # any command within the time specified in FTPHandler.timeout is # supposed to be kicked off. self._setUp(idle_timeout=0.1) # fail if no msg is received within 1 second self.client.sock.settimeout(1) data = self.client.sock.recv(1024) self.assertEqual(data, "421 Control connection timed out.\r\n") # ensure client has been kicked off self.assertRaises((socket.error, EOFError), self.client.sendcmd, 'noop') def test_data_timeout(self): # Test data channel timeout. The client which does not send # or receive any data within the time specified in # DTPHandler.timeout is supposed to be kicked off. self._setUp(data_timeout=0.1) addr = self.client.makepasv() s = socket.socket() s.connect(addr) # fail if no msg is received within 1 second self.client.sock.settimeout(1) data = self.client.sock.recv(1024) self.assertEqual(data, "421 Data connection timed out.\r\n") # ensure client has been kicked off self.assertRaises((socket.error, EOFError), self.client.sendcmd, 'noop') def test_idle_data_timeout1(self): # Tests that the control connection timeout is suspended while # the data channel is opened self._setUp(idle_timeout=0.1, data_timeout=0.2) addr = self.client.makepasv() s = socket.socket() s.connect(addr) # fail if no msg is received within 1 second self.client.sock.settimeout(1) data = self.client.sock.recv(1024) self.assertEqual(data, "421 Data connection timed out.\r\n") # ensure client has been kicked off self.assertRaises((socket.error, EOFError), self.client.sendcmd, 'noop') def test_idle_data_timeout2(self): # Tests that the control connection timeout is restarted after # data channel has been closed self._setUp(idle_timeout=0.1, data_timeout=0.2) addr = self.client.makepasv() s = socket.socket() s.connect(addr) # close data channel self.client.sendcmd('abor') self.client.sock.settimeout(1) data = self.client.sock.recv(1024) self.assertEqual(data, "421 Control connection timed out.\r\n") # ensure client has been kicked off self.assertRaises((socket.error, EOFError), self.client.sendcmd, 'noop') def test_pasv_timeout(self): # Test pasv data channel timeout. The client which does not connect # to the listening data socket within the time specified in # PassiveDTP.timeout is supposed to receive a 421 response. self._setUp(pasv_timeout=0.1) self.client.makepasv() # fail if no msg is received within 1 second self.client.sock.settimeout(1) data = self.client.sock.recv(1024) self.assertEqual(data, "421 Passive data channel timed out.\r\n") # client is not expected to be kicked off self.client.sendcmd('noop') class TestMaxConnections(unittest.TestCase): """Test maximum connections (FTPServer.max_cons).""" def setUp(self): self.server = FTPd() self.server.server.max_cons = 3 self.server.start() def tearDown(self): self.server.server.max_cons = 0 self.server.stop() def test_max_connections(self): c1 = ftplib.FTP() c2 = ftplib.FTP() c3 = ftplib.FTP() try: c1.connect(self.server.host, self.server.port) c2.connect(self.server.host, self.server.port) self.assertRaises(ftplib.error_temp, c3.connect, self.server.host, self.server.port) # with passive data channel established c2.close() c1.login(USER, PASSWD) c1.makepasv() self.assertRaises(ftplib.error_temp, c2.connect, self.server.host, self.server.port) # with passive data socket waiting for connection c1.login(USER, PASSWD) c1.sendcmd('pasv') self.assertRaises(ftplib.error_temp, c2.connect, self.server.host, self.server.port) # with active data channel established c1.login(USER, PASSWD) c1.makeport() self.assertRaises(ftplib.error_temp, c2.connect, self.server.host, self.server.port) finally: c1.close() c2.close() c3.close() class _TestNetworkProtocols(unittest.TestCase): """Test PASV, EPSV, PORT and EPRT commands. Do not use this class directly. Let TestIPv4Environment and TestIPv6Environment classes use it instead. """ HOST = HOST def setUp(self): self.server = FTPd(self.HOST) self.server.start() self.client = ftplib.FTP() self.client.connect(self.server.host, self.server.port) self.client.login(USER, PASSWD) if self.client.af == socket.AF_INET: self.proto = "1" self.other_proto = "2" else: self.proto = "2" self.other_proto = "1" def tearDown(self): self.client.close() self.server.stop() def cmdresp(self, cmd): """Send a command and return response, also if the command failed.""" try: return self.client.sendcmd(cmd) except ftplib.Error, err: return str(err) def test_eprt(self): # test wrong proto try: self.client.sendcmd('eprt |%s|%s|%s|' %(self.other_proto, self.server.host, self.server.port)) except ftplib.error_perm, err: self.assertEqual(str(err)[0:3], "522") else: self.fail("Exception not raised") # test bad args msg = "501 Invalid EPRT format." # len('|') > 3 self.assertEqual(self.cmdresp('eprt ||||'), msg) # len('|') < 3 self.assertEqual(self.cmdresp('eprt ||'), msg) # port > 65535 self.assertEqual(self.cmdresp('eprt |%s|%s|65536|' %(self.proto, self.HOST)), msg) # port < 0 self.assertEqual(self.cmdresp('eprt |%s|%s|-1|' %(self.proto, self.HOST)), msg) # port < 1024 self.assertEqual(self.cmdresp('eprt |%s|%s|222|' %(self.proto, self.HOST)), "501 Can't connect over a privileged port.") # test connection sock = socket.socket(self.client.af, socket.SOCK_STREAM) sock.bind((self.client.sock.getsockname()[0], 0)) sock.listen(5) sock.settimeout(2) ip, port = sock.getsockname()[:2] self.client.sendcmd('eprt |%s|%s|%s|' %(self.proto, ip, port)) try: try: sock.accept() except socket.timeout: self.fail("Server didn't connect to passive socket") finally: sock.close() def test_epsv(self): # test wrong proto try: self.client.sendcmd('epsv ' + self.other_proto) except ftplib.error_perm, err: self.assertEqual(str(err)[0:3], "522") else: self.fail("Exception not raised") # test connection for cmd in ('EPSV', 'EPSV ' + self.proto): host, port = ftplib.parse229(self.client.sendcmd(cmd), self.client.sock.getpeername()) s = socket.socket(self.client.af, socket.SOCK_STREAM) s.settimeout(2) try: s.connect((host, port)) self.client.sendcmd('abor') finally: s.close() def test_epsv_all(self): self.client.sendcmd('epsv all') self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'pasv') self.assertRaises(ftplib.error_perm, self.client.sendport, self.HOST, 2000) self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'eprt |%s|%s|%s|' %(self.proto, self.HOST, 2000)) class TestIPv4Environment(_TestNetworkProtocols): """Test PASV, EPSV, PORT and EPRT commands. Runs tests contained in _TestNetworkProtocols class by using IPv4 plus some additional specific tests. """ HOST = '127.0.0.1' def test_port_v4(self): # test connection self.client.makeport() self.client.sendcmd('abor') # test bad arguments ae = self.assertEqual msg = "501 Invalid PORT format." ae(self.cmdresp('port 127,0,0,1,1.1'), msg) # sep != ',' ae(self.cmdresp('port X,0,0,1,1,1'), msg) # value != int ae(self.cmdresp('port 127,0,0,1,1,1,1'), msg) # len(args) > 6 ae(self.cmdresp('port 127,0,0,1'), msg) # len(args) < 6 ae(self.cmdresp('port 256,0,0,1,1,1'), msg) # oct > 255 ae(self.cmdresp('port 127,0,0,1,256,1'), msg) # port > 65535 ae(self.cmdresp('port 127,0,0,1,-1,0'), msg) # port < 0 msg = "501 Can't connect over a privileged port." ae(self.cmdresp('port %s,1,1' %self.HOST.replace('.',',')),msg) # port < 1024 if "1.2.3.4" != self.HOST: msg = "501 Can't connect to a foreign address." ae(self.cmdresp('port 1,2,3,4,4,4'), msg) def test_eprt_v4(self): self.assertEqual(self.cmdresp('eprt |1|0.10.10.10|2222|'), "501 Can't connect to a foreign address.") def test_pasv_v4(self): host, port = ftplib.parse227(self.client.sendcmd('pasv')) s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.settimeout(2) try: s.connect((host, port)) finally: s.close() class TestIPv6Environment(_TestNetworkProtocols): """Test PASV, EPSV, PORT and EPRT commands. Runs tests contained in _TestNetworkProtocols class by using IPv6 plus some additional specific tests. """ HOST = '::1' def test_port_v6(self): # 425 expected self.assertRaises(ftplib.error_temp, self.client.sendport, self.server.host, self.server.port) def test_pasv_v6(self): # 425 expected self.assertRaises(ftplib.error_temp, self.client.sendcmd, 'pasv') def test_eprt_v6(self): self.assertEqual(self.cmdresp('eprt |2|::xxx|2222|'), "501 Can't connect to a foreign address.") class FTPd(threading.Thread): """A threaded FTP server used for running tests.""" def __init__(self, host=HOST, port=0, verbose=False): threading.Thread.__init__(self) self.active = False if not verbose: ftpserver.log = ftpserver.logline = lambda x: x self.authorizer = ftpserver.DummyAuthorizer() self.authorizer.add_user(USER, PASSWD, HOME, perm='elradfmw') # full perms self.authorizer.add_anonymous(HOME) self.handler = ftpserver.FTPHandler self.handler.authorizer = self.authorizer self.server = ftpserver.FTPServer((host, port), self.handler) self.host, self.port = self.server.socket.getsockname()[:2] self.active_lock = threading.Lock() def start(self): assert not self.active self.__flag = threading.Event() threading.Thread.start(self) self.__flag.wait() def run(self): self.active = True self.__flag.set() while self.active: self.active_lock.acquire() self.server.serve_forever(timeout=0.001, count=1) self.active_lock.release() self.server.close_all(ignore_all=True) def stop(self): assert self.active self.active = False self.join() def remove_test_files(): "Convenience function for removing temporary test files" for file in [TESTFN, TESTFN2, TESTFN3]: try: os.remove(file) except os.error: pass def test_main(tests=None): test_suite = unittest.TestSuite() if tests is None: tests = [ TestAbstractedFS, TestDummyAuthorizer, TestCallLater, TestFtpAuthentication, TestFtpDummyCmds, TestFtpCmdsSemantic, TestFtpFsOperations, TestFtpRetrieveData, TestFtpAbort, TestFtpStoreData, TestTimeouts, TestMaxConnections ] if SUPPORTS_IPV4: tests.append(TestIPv4Environment) if SUPPORTS_IPV6: tests.append(TestIPv6Environment) for test in tests: test_suite.addTest(unittest.makeSuite(test)) remove_test_files() unittest.TextTestRunner(verbosity=2).run(test_suite) remove_test_files() if __name__ == '__main__': test_main()