summaryrefslogtreecommitdiffstats
path: root/third_party/pyftpdlib/test/test_ftpd.py
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/pyftpdlib/test/test_ftpd.py')
-rw-r--r--third_party/pyftpdlib/test/test_ftpd.py1623
1 files changed, 0 insertions, 1623 deletions
diff --git a/third_party/pyftpdlib/test/test_ftpd.py b/third_party/pyftpdlib/test/test_ftpd.py
deleted file mode 100644
index ffd8557..0000000
--- a/third_party/pyftpdlib/test/test_ftpd.py
+++ /dev/null
@@ -1,1623 +0,0 @@
-#!/usr/bin/env python
-# test_ftpd.py
-
-# ======================================================================
-# Copyright (C) 2007 Giampaolo Rodola' <g.rodola@gmail.com>
-#
-# All Rights Reserved
-#
-# Permission to use, copy, modify, and distribute this software and
-# its documentation for any purpose and without fee is hereby
-# granted, provided that the above copyright notice appear in all
-# copies and that both that copyright notice and this permission
-# notice appear in supporting documentation, and that the name of
-# Giampaolo Rodola' not be used in advertising or publicity pertaining to
-# distribution of the software without specific, written prior
-# permission.
-#
-# Giampaolo Rodola' DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE,
-# INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN
-# NO EVENT Giampaolo Rodola' BE LIABLE FOR ANY SPECIAL, INDIRECT OR
-# CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
-# OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
-# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
-# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
-# ======================================================================
-
-
-# This test suite has been run successfully on the following systems:
-#
-# -----------------------------------------------------------
-# System | Python version
-# -----------------------------------------------------------
-# Linux Ubuntu 2.6.20-15 | 2.4, 2.5
-# Linux Kubuntu 8.04 32 & 64 bits | 2.5.2
-# Linux Debian 2.4.27-2-386 | 2.3.5
-# Windows XP prof SP3 | 2.3, 2.4, 2.5, 2.6-RC2
-# Windows Vista Ultimate 64 bit | 2.5.1
-# Windows Vista Business 32 bit | 2.5.1
-# Windows Server 2008 64bit | 2.5.1
-# Windows Mobile 6.1 | PythonCE 2.5
-# OS X 10.4.10 | 2.3, 2.4, 2.5
-# FreeBSD 7.0 | 2.4, 2.5
-# -----------------------------------------------------------
-
-
-import threading
-import unittest
-import socket
-import os
-import time
-import re
-import tempfile
-import ftplib
-import random
-import warnings
-import sys
-
-from pyftpdlib import ftpserver
-
-
-__release__ = 'pyftpdlib 0.5.0'
-
-# Attempt to use IP rather than hostname (test suite will run a lot faster)
-try:
- HOST = socket.gethostbyname('localhost')
-except socket.error:
- HOST = 'localhost'
-USER = 'user'
-PASSWD = '12345'
-HOME = os.getcwd()
-try:
- from test.test_support import TESTFN
-except ImportError:
- TESTFN = 'temp-fname'
-TESTFN2 = TESTFN + '2'
-TESTFN3 = TESTFN + '3'
-
-def try_address(host, port=0):
- """Try to bind a daemon on the given host:port and return True
- if that has been possible."""
- try:
- ftpserver.FTPServer((host, port), None)
- except socket.error:
- return False
- else:
- return True
-
-SUPPORTS_IPV4 = try_address('127.0.0.1')
-SUPPORTS_IPV6 = socket.has_ipv6 and try_address('::1')
-
-
-class TestAbstractedFS(unittest.TestCase):
- """Test for conversion utility methods of AbstractedFS class."""
-
- def test_ftpnorm(self):
- # Tests for ftpnorm method.
- ae = self.assertEquals
- fs = ftpserver.AbstractedFS()
-
- fs.cwd = '/'
- ae(fs.ftpnorm(''), '/')
- ae(fs.ftpnorm('/'), '/')
- ae(fs.ftpnorm('.'), '/')
- ae(fs.ftpnorm('..'), '/')
- ae(fs.ftpnorm('a'), '/a')
- ae(fs.ftpnorm('/a'), '/a')
- ae(fs.ftpnorm('/a/'), '/a')
- ae(fs.ftpnorm('a/..'), '/')
- ae(fs.ftpnorm('a/b'), '/a/b')
- ae(fs.ftpnorm('a/b/..'), '/a')
- ae(fs.ftpnorm('a/b/../..'), '/')
- fs.cwd = '/sub'
- ae(fs.ftpnorm(''), '/sub')
- ae(fs.ftpnorm('/'), '/')
- ae(fs.ftpnorm('.'), '/sub')
- ae(fs.ftpnorm('..'), '/')
- ae(fs.ftpnorm('a'), '/sub/a')
- ae(fs.ftpnorm('a/'), '/sub/a')
- ae(fs.ftpnorm('a/..'), '/sub')
- ae(fs.ftpnorm('a/b'), '/sub/a/b')
- ae(fs.ftpnorm('a/b/'), '/sub/a/b')
- ae(fs.ftpnorm('a/b/..'), '/sub/a')
- ae(fs.ftpnorm('a/b/../..'), '/sub')
- ae(fs.ftpnorm('a/b/../../..'), '/')
- ae(fs.ftpnorm('//'), '/') # UNC paths must be collapsed
-
- def test_ftp2fs(self):
- # Tests for ftp2fs method.
- ae = self.assertEquals
- fs = ftpserver.AbstractedFS()
- join = lambda x, y: os.path.join(x, y.replace('/', os.sep))
-
- def goforit(root):
- fs.root = root
- fs.cwd = '/'
- ae(fs.ftp2fs(''), root)
- ae(fs.ftp2fs('/'), root)
- ae(fs.ftp2fs('.'), root)
- ae(fs.ftp2fs('..'), root)
- ae(fs.ftp2fs('a'), join(root, 'a'))
- ae(fs.ftp2fs('/a'), join(root, 'a'))
- ae(fs.ftp2fs('/a/'), join(root, 'a'))
- ae(fs.ftp2fs('a/..'), root)
- ae(fs.ftp2fs('a/b'), join(root, r'a/b'))
- ae(fs.ftp2fs('/a/b'), join(root, r'a/b'))
- ae(fs.ftp2fs('/a/b/..'), join(root, 'a'))
- ae(fs.ftp2fs('/a/b/../..'), root)
- fs.cwd = '/sub'
- ae(fs.ftp2fs(''), join(root, 'sub'))
- ae(fs.ftp2fs('/'), root)
- ae(fs.ftp2fs('.'), join(root, 'sub'))
- ae(fs.ftp2fs('..'), root)
- ae(fs.ftp2fs('a'), join(root, 'sub/a'))
- ae(fs.ftp2fs('a/'), join(root, 'sub/a'))
- ae(fs.ftp2fs('a/..'), join(root, 'sub'))
- ae(fs.ftp2fs('a/b'), join(root, 'sub/a/b'))
- ae(fs.ftp2fs('a/b/..'), join(root, 'sub/a'))
- ae(fs.ftp2fs('a/b/../..'), join(root, 'sub'))
- ae(fs.ftp2fs('a/b/../../..'), root)
- ae(fs.ftp2fs('//a'), join(root, 'a')) # UNC paths must be collapsed
-
- if os.sep == '\\':
- goforit(r'C:\dir')
- goforit('C:\\')
- # on DOS-derived filesystems (e.g. Windows) this is the same
- # as specifying the current drive directory (e.g. 'C:\\')
- goforit('\\')
- elif os.sep == '/':
- goforit('/home/user')
- goforit('/')
- else:
- # os.sep == ':'? Don't know... let's try it anyway
- goforit(os.getcwd())
-
- def test_fs2ftp(self):
- # Tests for fs2ftp method.
- ae = self.assertEquals
- fs = ftpserver.AbstractedFS()
- join = lambda x, y: os.path.join(x, y.replace('/', os.sep))
-
- def goforit(root):
- fs.root = root
- ae(fs.fs2ftp(root), '/')
- ae(fs.fs2ftp(join(root, '/')), '/')
- ae(fs.fs2ftp(join(root, '.')), '/')
- ae(fs.fs2ftp(join(root, '..')), '/') # can't escape from root
- ae(fs.fs2ftp(join(root, 'a')), '/a')
- ae(fs.fs2ftp(join(root, 'a/')), '/a')
- ae(fs.fs2ftp(join(root, 'a/..')), '/')
- ae(fs.fs2ftp(join(root, 'a/b')), '/a/b')
- ae(fs.fs2ftp(join(root, 'a/b')), '/a/b')
- ae(fs.fs2ftp(join(root, 'a/b/..')), '/a')
- ae(fs.fs2ftp(join(root, '/a/b/../..')), '/')
- fs.cwd = '/sub'
- ae(fs.fs2ftp(join(root, 'a/')), '/a')
-
- if os.sep == '\\':
- goforit(r'C:\dir')
- goforit('C:\\')
- # on DOS-derived filesystems (e.g. Windows) this is the same
- # as specifying the current drive directory (e.g. 'C:\\')
- goforit('\\')
- fs.root = r'C:\dir'
- ae(fs.fs2ftp('C:\\'), '/')
- ae(fs.fs2ftp('D:\\'), '/')
- ae(fs.fs2ftp('D:\\dir'), '/')
- elif os.sep == '/':
- goforit('/')
- assert os.path.realpath('/__home/user') == '/__home/user', \
- 'Test skipped (symlinks not allowed).'
- goforit('/__home/user')
- fs.root = '/__home/user'
- ae(fs.fs2ftp('/__home'), '/')
- ae(fs.fs2ftp('/'), '/')
- ae(fs.fs2ftp('/__home/userx'), '/')
- else:
- # os.sep == ':'? Don't know... let's try it anyway
- goforit(os.getcwd())
-
- def test_validpath(self):
- # Tests for validpath method.
- fs = ftpserver.AbstractedFS()
- fs.root = HOME
- self.failUnless(fs.validpath(HOME))
- self.failUnless(fs.validpath(HOME + '/'))
- self.failIf(fs.validpath(HOME + 'xxx'))
-
- if hasattr(os, 'symlink'):
- # Tests for validpath on systems supporting symbolic links.
-
- def _safe_remove(self, path):
- # convenience function for removing temporary files
- try:
- os.remove(path)
- except os.error:
- pass
-
- def test_validpath_validlink(self):
- # Test validpath by issuing a symlink pointing to a path
- # inside the root directory.
- fs = ftpserver.AbstractedFS()
- fs.root = HOME
- try:
- open(TESTFN, 'w')
- os.symlink(TESTFN, TESTFN2)
- self.failUnless(fs.validpath(TESTFN))
- finally:
- self._safe_remove(TESTFN)
- self._safe_remove(TESTFN2)
-
- def test_validpath_external_symlink(self):
- # Test validpath by issuing a symlink pointing to a path
- # outside the root directory.
- fs = ftpserver.AbstractedFS()
- fs.root = HOME
- try:
- # tempfile should create our file in /tmp directory
- # which should be outside the user root. If it is not
- # we just skip the test.
- file = tempfile.NamedTemporaryFile()
- if HOME == os.path.dirname(file.name):
- return
- os.symlink(file.name, TESTFN)
- self.failIf(fs.validpath(TESTFN))
- finally:
- self._safe_remove(TESTFN)
- file.close()
-
-
-class TestDummyAuthorizer(unittest.TestCase):
- """Tests for DummyAuthorizer class."""
-
- # temporarily change warnings to exceptions for the purposes of testing
- def setUp(self):
- self.tempdir = tempfile.mkdtemp(dir=HOME)
- self.subtempdir = tempfile.mkdtemp(dir=os.path.join(HOME, self.tempdir))
- self.tempfile = open(os.path.join(self.tempdir, TESTFN), 'w').name
- self.subtempfile = open(os.path.join(self.subtempdir, TESTFN), 'w').name
- warnings.filterwarnings("error")
-
- def tearDown(self):
- os.remove(self.tempfile)
- os.remove(self.subtempfile)
- os.rmdir(self.subtempdir)
- os.rmdir(self.tempdir)
- warnings.resetwarnings()
-
- def assertRaisesWithMsg(self, excClass, msg, callableObj, *args, **kwargs):
- try:
- callableObj(*args, **kwargs)
- except excClass, why:
- if str(why) == msg:
- return
- raise self.failureException("%s != %s" %(str(why), msg))
- else:
- if hasattr(excClass,'__name__'): excName = excClass.__name__
- else: excName = str(excClass)
- raise self.failureException, "%s not raised" % excName
-
- def test_common_methods(self):
- auth = ftpserver.DummyAuthorizer()
- # create user
- auth.add_user(USER, PASSWD, HOME)
- auth.add_anonymous(HOME)
- # check credentials
- self.failUnless(auth.validate_authentication(USER, PASSWD))
- self.failIf(auth.validate_authentication(USER, 'wrongpwd'))
- # remove them
- auth.remove_user(USER)
- auth.remove_user('anonymous')
- # raise exc if user does not exists
- self.assertRaises(KeyError, auth.remove_user, USER)
- # raise exc if path does not exist
- self.assertRaisesWithMsg(ftpserver.AuthorizerError,
- 'No such directory: "%s"' %'?:\\',
- auth.add_user, USER, PASSWD, '?:\\')
- self.assertRaisesWithMsg(ftpserver.AuthorizerError,
- 'No such directory: "%s"' %'?:\\',
- auth.add_anonymous, '?:\\')
- # raise exc if user already exists
- auth.add_user(USER, PASSWD, HOME)
- auth.add_anonymous(HOME)
- self.assertRaisesWithMsg(ftpserver.AuthorizerError,
- 'User "%s" already exists' %USER,
- auth.add_user, USER, PASSWD, HOME)
- self.assertRaisesWithMsg(ftpserver.AuthorizerError,
- 'User "anonymous" already exists',
- auth.add_anonymous, HOME)
- auth.remove_user(USER)
- auth.remove_user('anonymous')
- # raise on wrong permission
- self.assertRaisesWithMsg(ftpserver.AuthorizerError,
- 'No such permission "?"',
- auth.add_user, USER, PASSWD, HOME, perm='?')
- self.assertRaisesWithMsg(ftpserver.AuthorizerError,
- 'No such permission "?"',
- auth.add_anonymous, HOME, perm='?')
- # expect warning on write permissions assigned to anonymous user
- for x in "adfmw":
- self.assertRaisesWithMsg(RuntimeWarning,
- "Write permissions assigned to anonymous user.",
- auth.add_anonymous, HOME, perm=x)
-
- def test_override_perm_interface(self):
- auth = ftpserver.DummyAuthorizer()
- auth.add_user(USER, PASSWD, HOME, perm='elr')
- # raise exc if user does not exists
- self.assertRaises(KeyError, auth.override_perm, USER+'w', HOME, 'elr')
- # raise exc if path does not exist or it's not a directory
- self.assertRaisesWithMsg(ftpserver.AuthorizerError,
- 'No such directory: "%s"' %'?:\\',
- auth.override_perm, USER, '?:\\', 'elr')
- self.assertRaisesWithMsg(ftpserver.AuthorizerError,
- 'No such directory: "%s"' %self.tempfile,
- auth.override_perm, USER, self.tempfile, 'elr')
- # raise on wrong permission
- self.assertRaisesWithMsg(ftpserver.AuthorizerError,
- 'No such permission "?"', auth.override_perm,
- USER, HOME, perm='?')
- # expect warning on write permissions assigned to anonymous user
- auth.add_anonymous(HOME)
- for p in "adfmw":
- self.assertRaisesWithMsg(RuntimeWarning,
- "Write permissions assigned to anonymous user.",
- auth.override_perm, 'anonymous', HOME, p)
- # raise on attempt to override home directory permissions
- self.assertRaisesWithMsg(ftpserver.AuthorizerError,
- "Can't override home directory permissions",
- auth.override_perm, USER, HOME, perm='w')
- # raise on attempt to override a path escaping home directory
- if os.path.dirname(HOME) != HOME:
- self.assertRaisesWithMsg(ftpserver.AuthorizerError,
- "Path escapes user home directory",
- auth.override_perm, USER,
- os.path.dirname(HOME), perm='w')
- # try to re-set an overridden permission
- auth.override_perm(USER, self.tempdir, perm='w')
- auth.override_perm(USER, self.tempdir, perm='wr')
-
- def test_override_perm_recursive_paths(self):
- auth = ftpserver.DummyAuthorizer()
- auth.add_user(USER, PASSWD, HOME, perm='elr')
- self.assert_(auth.has_perm(USER, 'w', self.tempdir) is False)
- auth.override_perm(USER, self.tempdir, perm='w', recursive=True)
- self.assert_(auth.has_perm(USER, 'w', HOME) is False)
- self.assert_(auth.has_perm(USER, 'w', self.tempdir) is True)
- self.assert_(auth.has_perm(USER, 'w', self.tempfile) is True)
- self.assert_(auth.has_perm(USER, 'w', self.subtempdir) is True)
- self.assert_(auth.has_perm(USER, 'w', self.subtempfile) is True)
-
- self.assert_(auth.has_perm(USER, 'w', HOME + '@') is False)
- self.assert_(auth.has_perm(USER, 'w', self.tempdir + '@') is False)
- path = os.path.join(self.tempdir + '@', os.path.basename(self.tempfile))
- self.assert_(auth.has_perm(USER, 'w', path) is False)
- # test case-sensitiveness
- if (os.name in ('nt', 'ce')) or (sys.platform == 'cygwin'):
- self.assert_(auth.has_perm(USER, 'w', self.tempdir.upper()) is True)
-
- def test_override_perm_not_recursive_paths(self):
- auth = ftpserver.DummyAuthorizer()
- auth.add_user(USER, PASSWD, HOME, perm='elr')
- self.assert_(auth.has_perm(USER, 'w', self.tempdir) is False)
- auth.override_perm(USER, self.tempdir, perm='w')
- self.assert_(auth.has_perm(USER, 'w', HOME) is False)
- self.assert_(auth.has_perm(USER, 'w', self.tempdir) is True)
- self.assert_(auth.has_perm(USER, 'w', self.tempfile) is True)
- self.assert_(auth.has_perm(USER, 'w', self.subtempdir) is False)
- self.assert_(auth.has_perm(USER, 'w', self.subtempfile) is False)
-
- self.assert_(auth.has_perm(USER, 'w', HOME + '@') is False)
- self.assert_(auth.has_perm(USER, 'w', self.tempdir + '@') is False)
- path = os.path.join(self.tempdir + '@', os.path.basename(self.tempfile))
- self.assert_(auth.has_perm(USER, 'w', path) is False)
- # test case-sensitiveness
- if (os.name in ('nt', 'ce')) or (sys.platform == 'cygwin'):
- self.assert_(auth.has_perm(USER, 'w', self.tempdir.upper()) is True)
-
-
-class TestCallLater(unittest.TestCase):
- """Tests for CallLater class."""
-
- def setUp(self):
- for task in ftpserver._tasks:
- if not task.cancelled:
- task.cancel()
- del ftpserver._tasks[:]
-
- def scheduler(self, timeout=0.01, count=100):
- while ftpserver._tasks and count > 0:
- ftpserver._scheduler()
- count -= 1
- time.sleep(timeout)
-
- def test_interface(self):
- fun = lambda: 0
- self.assertRaises(AssertionError, ftpserver.CallLater, -1, fun)
- x = ftpserver.CallLater(3, fun)
- self.assertRaises(AssertionError, x.delay, -1)
- self.assert_(x.cancelled is False)
- x.cancel()
- self.assert_(x.cancelled is True)
- self.assertRaises(AssertionError, x.call)
- self.assertRaises(AssertionError, x.reset)
- self.assertRaises(AssertionError, x.delay, 2)
- self.assertRaises(AssertionError, x.cancel)
-
- def test_order(self):
- l = []
- fun = lambda x: l.append(x)
- for x in [0.05, 0.04, 0.03, 0.02, 0.01]:
- ftpserver.CallLater(x, fun, x)
- self.scheduler()
- self.assertEqual(l, [0.01, 0.02, 0.03, 0.04, 0.05])
-
- def test_delay(self):
- l = []
- fun = lambda x: l.append(x)
- ftpserver.CallLater(0.01, fun, 0.01).delay(0.07)
- ftpserver.CallLater(0.02, fun, 0.02).delay(0.08)
- ftpserver.CallLater(0.03, fun, 0.03)
- ftpserver.CallLater(0.04, fun, 0.04)
- ftpserver.CallLater(0.05, fun, 0.05)
- ftpserver.CallLater(0.06, fun, 0.06).delay(0.001)
- self.scheduler()
- self.assertEqual(l, [0.06, 0.03, 0.04, 0.05, 0.01, 0.02])
-
- def test_reset(self):
- # will fail on such systems where time.time() does not provide
- # time with a better precision than 1 second.
- l = []
- fun = lambda x: l.append(x)
- ftpserver.CallLater(0.01, fun, 0.01)
- ftpserver.CallLater(0.02, fun, 0.02)
- ftpserver.CallLater(0.03, fun, 0.03)
- x = ftpserver.CallLater(0.04, fun, 0.04)
- ftpserver.CallLater(0.05, fun, 0.05)
- time.sleep(0.1)
- x.reset()
- self.scheduler()
- self.assertEqual(l, [0.01, 0.02, 0.03, 0.05, 0.04])
-
- def test_cancel(self):
- l = []
- fun = lambda x: l.append(x)
- ftpserver.CallLater(0.01, fun, 0.01).cancel()
- ftpserver.CallLater(0.02, fun, 0.02)
- ftpserver.CallLater(0.03, fun, 0.03)
- ftpserver.CallLater(0.04, fun, 0.04)
- ftpserver.CallLater(0.05, fun, 0.05).cancel()
- self.scheduler()
- self.assertEqual(l, [0.02, 0.03, 0.04])
-
-
-class TestFtpAuthentication(unittest.TestCase):
- "test: USER, PASS, REIN."
-
- def setUp(self):
- self.server = FTPd()
- self.server.start()
- self.client = ftplib.FTP()
- self.client.connect(self.server.host, self.server.port)
- self.f1 = open(TESTFN, 'w+b')
- self.f2 = open(TESTFN2, 'w+b')
-
- def tearDown(self):
- self.client.close()
- self.server.stop()
- if not self.f1.closed:
- self.f1.close()
- if not self.f2.closed:
- self.f2.close()
- os.remove(TESTFN)
- os.remove(TESTFN2)
-
- def test_auth_ok(self):
- self.client.login(user=USER, passwd=PASSWD)
-
- def test_anon_auth(self):
- self.client.login(user='anonymous', passwd='anon@')
- self.client.login(user='AnonYmoUs', passwd='anon@')
- self.client.login(user='anonymous', passwd='')
-
- # Commented after delayed response on wrong credentials has been
- # introduced because tests take too much to complete.
-
-## def test_auth_failed(self):
-## self.assertRaises(ftplib.error_perm, self.client.login, USER, 'wrong')
-## self.assertRaises(ftplib.error_perm, self.client.login, 'wrong', PASSWD)
-## self.assertRaises(ftplib.error_perm, self.client.login, 'wrong', 'wrong')
-
-## def test_max_auth(self):
-## self.assertRaises(ftplib.error_perm, self.client.login, USER, 'wrong')
-## self.assertRaises(ftplib.error_perm, self.client.login, USER, 'wrong')
-## self.assertRaises(ftplib.error_perm, self.client.login, USER, 'wrong')
-## # If authentication fails for 3 times ftpd disconnects the
-## # client. We can check if that happens by using self.client.sendcmd()
-## # on the 'dead' socket object. If socket object is really
-## # closed it should be raised a socket.error exception (Windows)
-## # or a EOFError exception (Linux).
-## self.assertRaises((socket.error, EOFError), self.client.sendcmd, '')
-
- def test_rein(self):
- self.client.login(user=USER, passwd=PASSWD)
- self.client.sendcmd('rein')
- # user not authenticated, error response expected
- self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'pwd')
- # by logging-in again we should be able to execute a
- # file-system command
- self.client.login(user=USER, passwd=PASSWD)
- self.client.sendcmd('pwd')
-
- def test_rein_during_transfer(self):
- self.client.login(user=USER, passwd=PASSWD)
- data = 'abcde12345' * 100000
- self.f1.write(data)
- self.f1.close()
-
- self.client.voidcmd('TYPE I')
- conn = self.client.transfercmd('retr ' + TESTFN)
- rein_sent = 0
- while 1:
- chunk = conn.recv(8192)
- if not chunk:
- break
- self.f2.write(chunk)
- if not rein_sent:
- rein_sent = 1
- # flush account, error response expected
- self.client.sendcmd('rein')
- self.assertRaises(ftplib.error_perm, self.client.dir)
-
- # a 226 response is expected once tranfer finishes
- self.assertEqual(self.client.voidresp()[:3], '226')
- # account is still flushed, error response is still expected
- self.assertRaises(ftplib.error_perm, self.client.sendcmd,
- 'size ' + TESTFN)
- # by logging-in again we should be able to execute a
- # filesystem command
- self.client.login(user=USER, passwd=PASSWD)
- self.client.sendcmd('pwd')
- self.f2.seek(0)
- self.assertEqual(hash(data), hash (self.f2.read()))
-
- def test_user(self):
- # Test USER while already authenticated and no transfer
- # is in progress.
- self.client.login(user=USER, passwd=PASSWD)
- self.client.sendcmd('user ' + USER) # authentication flushed
- self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'pwd')
- self.client.sendcmd('pass ' + PASSWD)
- self.client.sendcmd('pwd')
-
- def test_user_on_transfer(self):
- # Test USER while already authenticated and a transfer is
- # in progress.
- self.client.login(user=USER, passwd=PASSWD)
- data = 'abcde12345' * 100000
- self.f1.write(data)
- self.f1.close()
-
- self.client.voidcmd('TYPE I')
- conn = self.client.transfercmd('retr ' + TESTFN)
- rein_sent = 0
- while 1:
- chunk = conn.recv(8192)
- if not chunk:
- break
- self.f2.write(chunk)
- # stop transfer while it isn't finished yet
- if not rein_sent:
- rein_sent = 1
- # flush account, expect an error response
- self.client.sendcmd('user ' + USER)
- self.assertRaises(ftplib.error_perm, self.client.dir)
-
- # a 226 response is expected once tranfer finishes
- self.assertEqual(self.client.voidresp()[:3], '226')
- # account is still flushed, error response is still expected
- self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'pwd')
- # by logging-in again we should be able to execute a
- # filesystem command
- self.client.sendcmd('pass ' + PASSWD)
- self.client.sendcmd('pwd')
- self.f2.seek(0)
- self.assertEqual(hash(data), hash (self.f2.read()))
-
-
-class TestFtpDummyCmds(unittest.TestCase):
- "test: TYPE, STRU, MODE, NOOP, SYST, ALLO, HELP"
-
- def setUp(self):
- self.server = FTPd()
- self.server.start()
- self.client = ftplib.FTP()
- self.client.connect(self.server.host, self.server.port)
- self.client.login(USER, PASSWD)
-
- def tearDown(self):
- self.client.close()
- self.server.stop()
-
- def test_type(self):
- self.client.sendcmd('type a')
- self.client.sendcmd('type i')
- self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'type ?!?')
-
- def test_stru(self):
- self.client.sendcmd('stru f')
- self.client.sendcmd('stru F')
- self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'stru ?!?')
-
- def test_mode(self):
- self.client.sendcmd('mode s')
- self.client.sendcmd('mode S')
- self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'mode ?!?')
-
- def test_noop(self):
- self.client.sendcmd('noop')
-
- def test_syst(self):
- self.client.sendcmd('syst')
-
- def test_allo(self):
- self.client.sendcmd('allo x')
-
- def test_quit(self):
- self.client.sendcmd('quit')
-
- def test_help(self):
- self.client.sendcmd('help')
- cmd = random.choice(ftpserver.proto_cmds.keys())
- self.client.sendcmd('help %s' %cmd)
- self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'help ?!?')
-
- def test_rest(self):
- # test error conditions only;
- # restored data-transfer is tested later
- self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'rest')
- self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'rest str')
- self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'rest -1')
-
- def test_opts_feat(self):
- self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'opts mlst bad_fact')
- self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'opts mlst type ;')
- self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'opts not_mlst')
- # utility function which used for extracting the MLST "facts"
- # string from the FEAT response
- def mlst():
- resp = self.client.sendcmd('feat')
- return re.search(r'^\s*MLST\s+(\S+)$', resp, re.MULTILINE).group(1)
- # we rely on "type", "perm", "size", and "modify" facts which
- # are those available on all platforms
- self.failUnless('type*;perm*;size*;modify*;' in mlst())
- self.assertEqual(self.client.sendcmd('opts mlst type;'), '200 MLST OPTS type;')
- self.assertEqual(self.client.sendcmd('opts mLSt TypE;'), '200 MLST OPTS type;')
- self.failUnless('type*;perm;size;modify;' in mlst())
-
- self.assertEqual(self.client.sendcmd('opts mlst'), '200 MLST OPTS ')
- self.failUnless(not '*' in mlst())
-
- self.assertEqual(self.client.sendcmd('opts mlst fish;cakes;'), '200 MLST OPTS ')
- self.failUnless(not '*' in mlst())
- self.assertEqual(self.client.sendcmd('opts mlst fish;cakes;type;'), \
- '200 MLST OPTS type;')
- self.failUnless('type*;perm;size;modify;' in mlst())
-
-
-class TestFtpCmdsSemantic(unittest.TestCase):
-
- arg_cmds = ('allo','appe','dele','eprt','mdtm','mode','mkd','opts','port',
- 'rest','retr','rmd','rnfr','rnto','size', 'stor', 'stru','type',
- 'user','xmkd','xrmd')
-
- def setUp(self):
- self.server = FTPd()
- self.server.start()
- self.client = ftplib.FTP()
- self.client.connect(self.server.host, self.server.port)
- self.client.login(USER, PASSWD)
-
- def tearDown(self):
- self.client.close()
- self.server.stop()
-
- def test_arg_cmds(self):
- # test commands requiring an argument
- expected = "501 Syntax error: command needs an argument."
- for cmd in self.arg_cmds:
- self.client.putcmd(cmd)
- resp = self.client.getmultiline()
- self.assertEqual(resp, expected)
-
- def test_no_arg_cmds(self):
- # test commands accepting no arguments
- expected = "501 Syntax error: command does not accept arguments."
- for cmd in ('abor','cdup','feat','noop','pasv','pwd','quit','rein',
- 'syst','xcup','xpwd'):
- self.client.putcmd(cmd + ' arg')
- resp = self.client.getmultiline()
- self.assertEqual(resp, expected)
-
- def test_auth_cmds(self):
- # test those commands requiring client to be authenticated
- expected = "530 Log in with USER and PASS first."
- self.client.sendcmd('rein')
- for cmd in ftpserver.proto_cmds:
- cmd = cmd.lower()
- if cmd in ('feat','help','noop','user','pass','stat','syst','quit'):
- continue
- if cmd in self.arg_cmds:
- cmd = cmd + ' arg'
- self.client.putcmd(cmd)
- resp = self.client.getmultiline()
- self.assertEqual(resp, expected)
-
- def test_no_auth_cmds(self):
- # test those commands that do not require client to be authenticated
- self.client.sendcmd('rein')
- for cmd in ('feat','help','noop','stat','syst'):
- self.client.sendcmd(cmd)
- # STAT provided with an argument is equal to LIST hence not allowed
- # if not authenticated
- self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'stat /')
- self.client.sendcmd('quit')
-
-
-class TestFtpFsOperations(unittest.TestCase):
- "test: PWD, CWD, CDUP, SIZE, RNFR, RNTO, DELE, MKD, RMD, MDTM, STAT"
-
- def setUp(self):
- self.server = FTPd()
- self.server.start()
- self.client = ftplib.FTP()
- self.client.connect(self.server.host, self.server.port)
- self.client.login(USER, PASSWD)
- self.tempfile = os.path.basename(open(TESTFN, 'w+b').name)
- self.tempdir = os.path.basename(tempfile.mktemp(dir=HOME))
- os.mkdir(self.tempdir)
-
- def tearDown(self):
- self.client.close()
- self.server.stop()
- if os.path.exists(self.tempfile):
- os.remove(self.tempfile)
- if os.path.exists(self.tempdir):
- os.rmdir(self.tempdir)
-
- def test_cwd(self):
- self.client.cwd(self.tempdir)
- self.assertEqual(self.client.pwd(), '/' + self.tempdir)
- self.assertRaises(ftplib.error_perm, self.client.cwd, 'subtempdir')
- # cwd provided with no arguments is supposed to move us to the
- # root directory
- self.client.sendcmd('cwd')
- self.assertEqual(self.client.pwd(), '/')
-
- def test_pwd(self):
- self.assertEqual(self.client.pwd(), '/')
- self.client.cwd(self.tempdir)
- self.assertEqual(self.client.pwd(), '/' + self.tempdir)
-
- def test_cdup(self):
- self.client.cwd(self.tempdir)
- self.assertEqual(self.client.pwd(), '/' + self.tempdir)
- self.client.sendcmd('cdup')
- self.assertEqual(self.client.pwd(), '/')
- # make sure we can't escape from root directory
- self.client.sendcmd('cdup')
- self.assertEqual(self.client.pwd(), '/')
-
- def test_mkd(self):
- tempdir = os.path.basename(tempfile.mktemp(dir=HOME))
- self.client.mkd(tempdir)
- # make sure we can't create directories which already exist
- # (probably not really necessary);
- # let's use a try/except statement to avoid leaving behind
- # orphaned temporary directory in the event of a test failure.
- try:
- self.client.mkd(tempdir)
- except ftplib.error_perm:
- os.rmdir(tempdir) # ok
- else:
- self.fail('ftplib.error_perm not raised.')
-
- def test_rmd(self):
- self.client.rmd(self.tempdir)
- self.assertRaises(ftplib.error_perm, self.client.rmd, self.tempfile)
- # make sure we can't remove the root directory
- self.assertRaises(ftplib.error_perm, self.client.rmd, '/')
-
- def test_dele(self):
- self.client.delete(self.tempfile)
- self.assertRaises(ftplib.error_perm, self.client.delete, self.tempdir)
-
- def test_rnfr_rnto(self):
- # rename file
- tempname = os.path.basename(tempfile.mktemp(dir=HOME))
- self.client.rename(self.tempfile, tempname)
- self.client.rename(tempname, self.tempfile)
- # rename dir
- tempname = os.path.basename(tempfile.mktemp(dir=HOME))
- self.client.rename(self.tempdir, tempname)
- self.client.rename(tempname, self.tempdir)
- # rnfr/rnto over non-existing paths
- bogus = os.path.basename(tempfile.mktemp(dir=HOME))
- self.assertRaises(ftplib.error_perm, self.client.rename, bogus, '/x')
- self.assertRaises(ftplib.error_perm, self.client.rename, self.tempfile, '/')
- # make sure we can't rename root directory
- self.assertRaises(ftplib.error_perm, self.client.rename, '/', '/x')
-
- def test_mdtm(self):
- self.client.sendcmd('mdtm ' + self.tempfile)
- # make sure we can't use mdtm against directories
- try:
- self.client.sendcmd('mdtm ' + self.tempdir)
- except ftplib.error_perm, err:
- self.failUnless("not retrievable" in str(err))
- else:
- self.fail('Exception not raised')
-
- def test_size(self):
- self.client.size(self.tempfile)
- # make sure we can't use size against directories
- try:
- self.client.sendcmd('size ' + self.tempdir)
- except ftplib.error_perm, err:
- self.failUnless("not retrievable" in str(err))
- else:
- self.fail('Exception not raised')
-
-
-class TestFtpRetrieveData(unittest.TestCase):
- "test: RETR, REST, LIST, NLST, argumented STAT"
-
- def setUp(self):
- self.server = FTPd()
- self.server.start()
- self.client = ftplib.FTP()
- self.client.connect(self.server.host, self.server.port)
- self.client.login(USER, PASSWD)
- self.f1 = open(TESTFN, 'w+b')
- self.f2 = open(TESTFN2, 'w+b')
-
- def tearDown(self):
- self.client.close()
- self.server.stop()
- if not self.f1.closed:
- self.f1.close()
- if not self.f2.closed:
- self.f2.close()
- os.remove(TESTFN)
- os.remove(TESTFN2)
-
- def test_retr(self):
- data = 'abcde12345' * 100000
- self.f1.write(data)
- self.f1.close()
- self.client.retrbinary("retr " + TESTFN, self.f2.write)
- self.f2.seek(0)
- self.assertEqual(hash(data), hash(self.f2.read()))
-
- def test_restore_on_retr(self):
- data = 'abcde12345' * 100000
- fname_1 = os.path.basename(self.f1.name)
- self.f1.write(data)
- self.f1.close()
-
- # look at ftplib.FTP.retrbinary method to understand this mess
- self.client.voidcmd('TYPE I')
- conn = self.client.transfercmd('retr ' + fname_1)
- chunk = conn.recv(len(data) / 2)
- self.f2.write(chunk)
- conn.close()
- # transfer wasn't finished yet so we expect a 426 response
- self.assertRaises(ftplib.error_temp, self.client.voidresp)
-
- # resuming transfer by using a marker value greater than the
- # file size stored on the server should result in an error
- # on retr (RFC-1123)
- file_size = self.client.size(fname_1)
- self.client.sendcmd('rest %s' %((file_size + 1)))
- self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'retr ' + fname_1)
-
- # test resume
- self.client.sendcmd('rest %s' %len(chunk))
- self.client.retrbinary("retr " + fname_1, self.f2.write)
- self.f2.seek(0)
- self.assertEqual(hash(data), hash (self.f2.read()))
-
- def _test_listing_cmds(self, cmd):
- """Tests common to LIST NLST and MLSD commands."""
- # assume that no argument has the same meaning of "/"
- l1 = l2 = []
- self.client.retrlines(cmd, l1.append)
- self.client.retrlines(cmd + ' /', l2.append)
- self.assertEqual(l1, l2)
- if cmd.lower() != 'mlsd':
- # if pathname is a file one line is expected
- x = []
- self.client.retrlines('%s ' %cmd + TESTFN, x.append)
- self.assertEqual(len(x), 1)
- self.failUnless(''.join(x).endswith(TESTFN))
- # non-existent path, 550 response is expected
- bogus = os.path.basename(tempfile.mktemp(dir=HOME))
- self.assertRaises(ftplib.error_perm, self.client.retrlines,
- '%s ' %cmd + bogus, lambda x: x)
- # for an empty directory we excpect that the data channel is
- # opened anyway and that no data is received
- x = []
- tempdir = os.path.basename(tempfile.mkdtemp(dir=HOME))
- try:
- self.client.retrlines('%s %s' %(cmd, tempdir), x.append)
- self.assertEqual(x, [])
- finally:
- os.rmdir(tempdir)
-
- def test_nlst(self):
- # common tests
- self._test_listing_cmds('nlst')
-
- def test_list(self):
- # common tests
- self._test_listing_cmds('list')
- # known incorrect pathname arguments (e.g. old clients) are
- # expected to be treated as if pathname would be == '/'
- l1 = l2 = l3 = l4 = l5 = []
- self.client.retrlines('list /', l1.append)
- self.client.retrlines('list -a', l2.append)
- self.client.retrlines('list -l', l3.append)
- self.client.retrlines('list -al', l4.append)
- self.client.retrlines('list -la', l5.append)
- tot = (l1, l2, l3, l4, l5)
- for x in range(len(tot) - 1):
- self.assertEqual(tot[x], tot[x+1])
-
- def test_mlst(self):
- # utility function for extracting the line of interest
- mlstline = lambda cmd: self.client.voidcmd(cmd).split('\n')[1]
-
- # the fact set must be preceded by a space
- self.failUnless(mlstline('mlst').startswith(' '))
- # where TVFS is supported, a fully qualified pathname is expected
- self.failUnless(mlstline('mlst ' + TESTFN).endswith('/' + TESTFN))
- self.failUnless(mlstline('mlst').endswith('/'))
- # assume that no argument has the same meaning of "/"
- self.assertEqual(mlstline('mlst'), mlstline('mlst /'))
- # non-existent path
- bogus = os.path.basename(tempfile.mktemp(dir=HOME))
- self.assertRaises(ftplib.error_perm, mlstline, bogus)
- # test file/dir notations
- self.failUnless('type=dir' in mlstline('mlst'))
- self.failUnless('type=file' in mlstline('mlst ' + TESTFN))
- # let's add some tests for OPTS command
- self.client.sendcmd('opts mlst type;')
- self.assertEqual(mlstline('mlst'), ' type=dir; /')
- # where no facts are present, two leading spaces before the
- # pathname are required (RFC-3659)
- self.client.sendcmd('opts mlst')
- self.assertEqual(mlstline('mlst'), ' /')
-
- def test_mlsd(self):
- # common tests
- self._test_listing_cmds('mlsd')
- dir = os.path.basename(tempfile.mkdtemp(dir=HOME))
- try:
- try:
- self.client.retrlines('mlsd ' + TESTFN, lambda x: x)
- except ftplib.error_perm, resp:
- # if path is a file a 501 response code is expected
- self.assertEqual(str(resp)[0:3], "501")
- else:
- self.fail("Exception not raised")
- finally:
- os.rmdir(dir)
-
- def test_stat(self):
- # test STAT provided with argument which is equal to LIST
- self.client.sendcmd('stat /')
- self.client.sendcmd('stat ' + TESTFN)
- self.client.putcmd('stat *')
- resp = self.client.getmultiline()
- self.assertEqual(resp, '550 Globbing not supported.')
- bogus = os.path.basename(tempfile.mktemp(dir=HOME))
- self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'stat ' + bogus)
-
-
-class TestFtpAbort(unittest.TestCase):
- "test: ABOR"
-
- def setUp(self):
- self.server = FTPd()
- self.server.start()
- self.client = ftplib.FTP()
- self.client.connect(self.server.host, self.server.port)
- self.client.login(USER, PASSWD)
- self.f1 = open(TESTFN, 'w+b')
- self.f2 = open(TESTFN2, 'w+b')
-
- def tearDown(self):
- self.client.close()
- self.server.stop()
- if not self.f1.closed:
- self.f1.close()
- if not self.f2.closed:
- self.f2.close()
- os.remove(self.f1.name)
- os.remove(self.f2.name)
-
- def test_abor_no_data(self):
- # Case 1: ABOR while no data channel is opened: respond with 225.
- resp = self.client.sendcmd('ABOR')
- self.failUnlessEqual('225 No transfer to abort.', resp)
-
- def test_abor_pasv(self):
- # Case 2: user sends a PASV, a data-channel socket is listening
- # but not connected, and ABOR is sent: close listening data
- # socket, respond with 225.
- self.client.makepasv()
- respcode = self.client.sendcmd('ABOR')[:3]
- self.failUnlessEqual('225', respcode)
-
- def test_abor_port(self):
- # Case 3: data channel opened with PASV or PORT, but ABOR sent
- # before a data transfer has been started: close data channel,
- # respond with 225
- self.client.makeport()
- respcode = self.client.sendcmd('ABOR')[:3]
- self.failUnlessEqual('225', respcode)
-
- def test_abor(self):
- # Case 4: ABOR while a data transfer on DTP channel is in
- # progress: close data channel, respond with 426, respond
- # with 226.
- data = 'abcde12345' * 100000
- self.f1.write(data)
- self.f1.close()
-
- # this ugly loop construct is to simulate an interrupted
- # transfer since ftplib doesn't like running storbinary()
- # in a separate thread
- self.client.voidcmd('TYPE I')
- conn = self.client.transfercmd('retr ' + TESTFN)
- chunk = conn.recv(len(data) / 2)
- # stop transfer while it isn't finished yet
- self.client.putcmd('ABOR')
-
- # transfer isn't finished yet so ftpd should respond with 426
- self.assertRaises(ftplib.error_temp, self.client.voidresp)
-
- # transfer successfully aborted, so should now respond with a 226
- self.failUnlessEqual('226', self.client.voidresp()[:3])
-
- if hasattr(socket, 'MSG_OOB'):
- def test_oob_abor(self):
- # Send ABOR by following the RFC-959 directives of sending
- # Telnet IP/Synch sequence as OOB data.
- # On some systems like FreeBSD this happened to be a problem
- # due to a different SO_OOBINLINE behavior.
- # On some platforms (e.g. Python CE) the test may fail
- # although the MSG_OOB constant is defined.
- self.client.sock.sendall(chr(244), socket.MSG_OOB)
- self.client.sock.sendall(chr(242), socket.MSG_OOB)
- self.client.sock.sendall('abor\r\n')
- self.client.sock.settimeout(1)
- self.assertEqual(self.client.getresp()[:3], '225')
-
-
-class TestFtpStoreData(unittest.TestCase):
- "test: STOR, STOU, APPE, REST"
-
- def setUp(self):
- self.server = FTPd()
- self.server.start()
- self.client = ftplib.FTP()
- self.client.connect(self.server.host, self.server.port)
- self.client.login(USER, PASSWD)
- self.f1 = open(TESTFN, 'w+b')
- self.f2 = open(TESTFN2, 'w+b')
-
- def tearDown(self):
- self.client.close()
- self.server.stop()
- if not self.f1.closed:
- self.f1.close()
- if not self.f2.closed:
- self.f2.close()
- os.remove(TESTFN)
- os.remove(TESTFN2)
-
- def test_stor(self):
- # TESTFN3 is the remote file name
- try:
- data = 'abcde12345' * 100000
- self.f1.write(data)
- self.f1.seek(0)
- self.client.storbinary('stor ' + TESTFN3, self.f1)
- self.client.retrbinary('retr ' + TESTFN3, self.f2.write)
- self.f2.seek(0)
- self.assertEqual(hash(data), hash (self.f2.read()))
- finally:
- # we do not use os.remove because file could be still
- # locked by ftpd thread
- if os.path.exists(TESTFN3):
- self.client.delete(TESTFN3)
-
- def test_stou(self):
- data = 'abcde12345' * 100000
- self.f1.write(data)
- self.f1.seek(0)
-
- self.client.voidcmd('TYPE I')
- # filename comes in as "1xx FILE: <filename>"
- filename = self.client.sendcmd('stou').split('FILE: ')[1]
- try:
- sock = self.client.makeport()
- conn, sockaddr = sock.accept()
- while 1:
- buf = self.f1.read(8192)
- if not buf:
- break
- conn.sendall(buf)
- conn.close()
- # transfer finished, a 226 response is expected
- self.client.voidresp()
- self.client.retrbinary('retr ' + filename, self.f2.write)
- self.f2.seek(0)
- self.assertEqual(hash(data), hash (self.f2.read()))
- finally:
- # we do not use os.remove because file could be
- # still locked by ftpd thread
- if os.path.exists(filename):
- self.client.delete(filename)
-
- def test_stou_rest(self):
- # watch for STOU preceded by REST, which makes no sense.
- self.client.sendcmd('rest 10')
- self.assertRaises(ftplib.error_temp, self.client.sendcmd, 'stou')
-
- def test_appe(self):
- # TESTFN3 is the remote file name
- try:
- data1 = 'abcde12345' * 100000
- self.f1.write(data1)
- self.f1.seek(0)
- self.client.storbinary('stor ' + TESTFN3, self.f1)
-
- data2 = 'fghil67890' * 100000
- self.f1.write(data2)
- self.f1.seek(self.client.size(TESTFN3))
- self.client.storbinary('appe ' + TESTFN3, self.f1)
-
- self.client.retrbinary("retr " + TESTFN3, self.f2.write)
- self.f2.seek(0)
- self.assertEqual(hash(data1 + data2), hash (self.f2.read()))
- finally:
- # we do not use os.remove because file could be still
- # locked by ftpd thread
- if os.path.exists(TESTFN3):
- self.client.delete(TESTFN3)
-
- def test_appe_rest(self):
- # watch for APPE preceded by REST, which makes no sense.
- self.client.sendcmd('rest 10')
- self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'appe x')
-
- def test_rest_on_stor(self):
- # TESTFN3 is the remote file name
- data = 'abcde12345' * 100000
- self.f1.write(data)
- self.f1.seek(0)
-
- self.client.voidcmd('TYPE I')
- conn = self.client.transfercmd('stor ' + TESTFN3)
- bytes_sent = 0
- while 1:
- chunk = self.f1.read(8192)
- conn.sendall(chunk)
- bytes_sent += len(chunk)
- # stop transfer while it isn't finished yet
- if bytes_sent >= 524288: # 2^19
- break
- elif not chunk:
- break
- conn.close()
- # transfer wasn't finished yet so we expect a 426 response
- self.client.voidresp()
-
- # resuming transfer by using a marker value greater than the
- # file size stored on the server should result in an error
- # on stor
- file_size = self.client.size(TESTFN3)
- self.assertEqual(file_size, bytes_sent)
- self.client.sendcmd('rest %s' %((file_size + 1)))
- self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'stor ' + TESTFN3)
-
- self.client.sendcmd('rest %s' %bytes_sent)
- self.client.storbinary('stor ' + TESTFN3, self.f1)
-
- self.client.retrbinary('retr ' + TESTFN3, self.f2.write)
- self.f1.seek(0)
- self.f2.seek(0)
- self.assertEqual(hash(self.f1.read()), hash(self.f2.read()))
- self.client.delete(TESTFN3)
-
-
-class TestTimeouts(unittest.TestCase):
- """Test idle-timeout capabilities of control and data channels.
- Some tests may fail on slow machines.
- """
-
- def _setUp(self, idle_timeout=300, data_timeout=300, pasv_timeout=30):
- self.server = FTPd()
- self.server.handler.timeout = idle_timeout
- self.server.handler.dtp_handler.timeout = data_timeout
- self.server.handler.passive_dtp.timeout = pasv_timeout
- self.server.start()
- self.client = ftplib.FTP()
- self.client.connect(self.server.host, self.server.port)
- self.client.login(USER, PASSWD)
-
- def tearDown(self):
- self.client.close()
- self.server.handler.timeout = 300
- self.server.handler.dtp_handler.timeout = 300
- self.server.handler.passive_dtp.timeout = 30
- self.server.stop()
-
- def test_idle_timeout(self):
- # Test control channel timeout. The client which does not send
- # any command within the time specified in FTPHandler.timeout is
- # supposed to be kicked off.
- self._setUp(idle_timeout=0.1)
- # fail if no msg is received within 1 second
- self.client.sock.settimeout(1)
- data = self.client.sock.recv(1024)
- self.assertEqual(data, "421 Control connection timed out.\r\n")
- # ensure client has been kicked off
- self.assertRaises((socket.error, EOFError), self.client.sendcmd, 'noop')
-
- def test_data_timeout(self):
- # Test data channel timeout. The client which does not send
- # or receive any data within the time specified in
- # DTPHandler.timeout is supposed to be kicked off.
- self._setUp(data_timeout=0.1)
- addr = self.client.makepasv()
- s = socket.socket()
- s.connect(addr)
- # fail if no msg is received within 1 second
- self.client.sock.settimeout(1)
- data = self.client.sock.recv(1024)
- self.assertEqual(data, "421 Data connection timed out.\r\n")
- # ensure client has been kicked off
- self.assertRaises((socket.error, EOFError), self.client.sendcmd, 'noop')
-
- def test_idle_data_timeout1(self):
- # Tests that the control connection timeout is suspended while
- # the data channel is opened
- self._setUp(idle_timeout=0.1, data_timeout=0.2)
- addr = self.client.makepasv()
- s = socket.socket()
- s.connect(addr)
- # fail if no msg is received within 1 second
- self.client.sock.settimeout(1)
- data = self.client.sock.recv(1024)
- self.assertEqual(data, "421 Data connection timed out.\r\n")
- # ensure client has been kicked off
- self.assertRaises((socket.error, EOFError), self.client.sendcmd, 'noop')
-
- def test_idle_data_timeout2(self):
- # Tests that the control connection timeout is restarted after
- # data channel has been closed
- self._setUp(idle_timeout=0.1, data_timeout=0.2)
- addr = self.client.makepasv()
- s = socket.socket()
- s.connect(addr)
- # close data channel
- self.client.sendcmd('abor')
- self.client.sock.settimeout(1)
- data = self.client.sock.recv(1024)
- self.assertEqual(data, "421 Control connection timed out.\r\n")
- # ensure client has been kicked off
- self.assertRaises((socket.error, EOFError), self.client.sendcmd, 'noop')
-
- def test_pasv_timeout(self):
- # Test pasv data channel timeout. The client which does not connect
- # to the listening data socket within the time specified in
- # PassiveDTP.timeout is supposed to receive a 421 response.
- self._setUp(pasv_timeout=0.1)
- self.client.makepasv()
- # fail if no msg is received within 1 second
- self.client.sock.settimeout(1)
- data = self.client.sock.recv(1024)
- self.assertEqual(data, "421 Passive data channel timed out.\r\n")
- # client is not expected to be kicked off
- self.client.sendcmd('noop')
-
-
-class TestMaxConnections(unittest.TestCase):
- """Test maximum connections (FTPServer.max_cons)."""
-
- def setUp(self):
- self.server = FTPd()
- self.server.server.max_cons = 3
- self.server.start()
-
- def tearDown(self):
- self.server.server.max_cons = 0
- self.server.stop()
-
- def test_max_connections(self):
- c1 = ftplib.FTP()
- c2 = ftplib.FTP()
- c3 = ftplib.FTP()
- try:
- c1.connect(self.server.host, self.server.port)
- c2.connect(self.server.host, self.server.port)
- self.assertRaises(ftplib.error_temp, c3.connect, self.server.host,
- self.server.port)
- # with passive data channel established
- c2.close()
- c1.login(USER, PASSWD)
- c1.makepasv()
- self.assertRaises(ftplib.error_temp, c2.connect, self.server.host,
- self.server.port)
- # with passive data socket waiting for connection
- c1.login(USER, PASSWD)
- c1.sendcmd('pasv')
- self.assertRaises(ftplib.error_temp, c2.connect, self.server.host,
- self.server.port)
- # with active data channel established
- c1.login(USER, PASSWD)
- c1.makeport()
- self.assertRaises(ftplib.error_temp, c2.connect, self.server.host,
- self.server.port)
- finally:
- c1.close()
- c2.close()
- c3.close()
-
-
-class _TestNetworkProtocols(unittest.TestCase):
- """Test PASV, EPSV, PORT and EPRT commands.
-
- Do not use this class directly. Let TestIPv4Environment and
- TestIPv6Environment classes use it instead.
- """
- HOST = HOST
-
- def setUp(self):
- self.server = FTPd(self.HOST)
- self.server.start()
- self.client = ftplib.FTP()
- self.client.connect(self.server.host, self.server.port)
- self.client.login(USER, PASSWD)
- if self.client.af == socket.AF_INET:
- self.proto = "1"
- self.other_proto = "2"
- else:
- self.proto = "2"
- self.other_proto = "1"
-
- def tearDown(self):
- self.client.close()
- self.server.stop()
-
- def cmdresp(self, cmd):
- """Send a command and return response, also if the command failed."""
- try:
- return self.client.sendcmd(cmd)
- except ftplib.Error, err:
- return str(err)
-
- def test_eprt(self):
- # test wrong proto
- try:
- self.client.sendcmd('eprt |%s|%s|%s|' %(self.other_proto,
- self.server.host, self.server.port))
- except ftplib.error_perm, err:
- self.assertEqual(str(err)[0:3], "522")
- else:
- self.fail("Exception not raised")
-
- # test bad args
- msg = "501 Invalid EPRT format."
- # len('|') > 3
- self.assertEqual(self.cmdresp('eprt ||||'), msg)
- # len('|') < 3
- self.assertEqual(self.cmdresp('eprt ||'), msg)
- # port > 65535
- self.assertEqual(self.cmdresp('eprt |%s|%s|65536|' %(self.proto,
- self.HOST)), msg)
- # port < 0
- self.assertEqual(self.cmdresp('eprt |%s|%s|-1|' %(self.proto,
- self.HOST)), msg)
- # port < 1024
- self.assertEqual(self.cmdresp('eprt |%s|%s|222|' %(self.proto,
- self.HOST)), "501 Can't connect over a privileged port.")
-
- # test connection
- sock = socket.socket(self.client.af, socket.SOCK_STREAM)
- sock.bind((self.client.sock.getsockname()[0], 0))
- sock.listen(5)
- sock.settimeout(2)
- ip, port = sock.getsockname()[:2]
- self.client.sendcmd('eprt |%s|%s|%s|' %(self.proto, ip, port))
- try:
- try:
- sock.accept()
- except socket.timeout:
- self.fail("Server didn't connect to passive socket")
- finally:
- sock.close()
-
- def test_epsv(self):
- # test wrong proto
- try:
- self.client.sendcmd('epsv ' + self.other_proto)
- except ftplib.error_perm, err:
- self.assertEqual(str(err)[0:3], "522")
- else:
- self.fail("Exception not raised")
-
- # test connection
- for cmd in ('EPSV', 'EPSV ' + self.proto):
- host, port = ftplib.parse229(self.client.sendcmd(cmd),
- self.client.sock.getpeername())
- s = socket.socket(self.client.af, socket.SOCK_STREAM)
- s.settimeout(2)
- try:
- s.connect((host, port))
- self.client.sendcmd('abor')
- finally:
- s.close()
-
- def test_epsv_all(self):
- self.client.sendcmd('epsv all')
- self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'pasv')
- self.assertRaises(ftplib.error_perm, self.client.sendport, self.HOST, 2000)
- self.assertRaises(ftplib.error_perm, self.client.sendcmd,
- 'eprt |%s|%s|%s|' %(self.proto, self.HOST, 2000))
-
-
-class TestIPv4Environment(_TestNetworkProtocols):
- """Test PASV, EPSV, PORT and EPRT commands.
-
- Runs tests contained in _TestNetworkProtocols class by using IPv4
- plus some additional specific tests.
- """
- HOST = '127.0.0.1'
-
- def test_port_v4(self):
- # test connection
- self.client.makeport()
- self.client.sendcmd('abor')
- # test bad arguments
- ae = self.assertEqual
- msg = "501 Invalid PORT format."
- ae(self.cmdresp('port 127,0,0,1,1.1'), msg) # sep != ','
- ae(self.cmdresp('port X,0,0,1,1,1'), msg) # value != int
- ae(self.cmdresp('port 127,0,0,1,1,1,1'), msg) # len(args) > 6
- ae(self.cmdresp('port 127,0,0,1'), msg) # len(args) < 6
- ae(self.cmdresp('port 256,0,0,1,1,1'), msg) # oct > 255
- ae(self.cmdresp('port 127,0,0,1,256,1'), msg) # port > 65535
- ae(self.cmdresp('port 127,0,0,1,-1,0'), msg) # port < 0
- msg = "501 Can't connect over a privileged port."
- ae(self.cmdresp('port %s,1,1' %self.HOST.replace('.',',')),msg) # port < 1024
- if "1.2.3.4" != self.HOST:
- msg = "501 Can't connect to a foreign address."
- ae(self.cmdresp('port 1,2,3,4,4,4'), msg)
-
- def test_eprt_v4(self):
- self.assertEqual(self.cmdresp('eprt |1|0.10.10.10|2222|'),
- "501 Can't connect to a foreign address.")
-
- def test_pasv_v4(self):
- host, port = ftplib.parse227(self.client.sendcmd('pasv'))
- s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- s.settimeout(2)
- try:
- s.connect((host, port))
- finally:
- s.close()
-
-
-class TestIPv6Environment(_TestNetworkProtocols):
- """Test PASV, EPSV, PORT and EPRT commands.
-
- Runs tests contained in _TestNetworkProtocols class by using IPv6
- plus some additional specific tests.
- """
- HOST = '::1'
-
- def test_port_v6(self):
- # 425 expected
- self.assertRaises(ftplib.error_temp, self.client.sendport,
- self.server.host, self.server.port)
-
- def test_pasv_v6(self):
- # 425 expected
- self.assertRaises(ftplib.error_temp, self.client.sendcmd, 'pasv')
-
- def test_eprt_v6(self):
- self.assertEqual(self.cmdresp('eprt |2|::xxx|2222|'),
- "501 Can't connect to a foreign address.")
-
-
-class FTPd(threading.Thread):
- """A threaded FTP server used for running tests."""
-
- def __init__(self, host=HOST, port=0, verbose=False):
- threading.Thread.__init__(self)
- self.active = False
- if not verbose:
- ftpserver.log = ftpserver.logline = lambda x: x
- self.authorizer = ftpserver.DummyAuthorizer()
- self.authorizer.add_user(USER, PASSWD, HOME, perm='elradfmw') # full perms
- self.authorizer.add_anonymous(HOME)
- self.handler = ftpserver.FTPHandler
- self.handler.authorizer = self.authorizer
- self.server = ftpserver.FTPServer((host, port), self.handler)
- self.host, self.port = self.server.socket.getsockname()[:2]
- self.active_lock = threading.Lock()
-
- def start(self):
- assert not self.active
- self.__flag = threading.Event()
- threading.Thread.start(self)
- self.__flag.wait()
-
- def run(self):
- self.active = True
- self.__flag.set()
- while self.active:
- self.active_lock.acquire()
- self.server.serve_forever(timeout=0.001, count=1)
- self.active_lock.release()
- self.server.close_all(ignore_all=True)
-
- def stop(self):
- assert self.active
- self.active = False
- self.join()
-
-
-def remove_test_files():
- "Convenience function for removing temporary test files"
- for file in [TESTFN, TESTFN2, TESTFN3]:
- try:
- os.remove(file)
- except os.error:
- pass
-
-def test_main(tests=None):
- test_suite = unittest.TestSuite()
- if tests is None:
- tests = [
- TestAbstractedFS,
- TestDummyAuthorizer,
- TestCallLater,
- TestFtpAuthentication,
- TestFtpDummyCmds,
- TestFtpCmdsSemantic,
- TestFtpFsOperations,
- TestFtpRetrieveData,
- TestFtpAbort,
- TestFtpStoreData,
- TestTimeouts,
- TestMaxConnections
- ]
- if SUPPORTS_IPV4:
- tests.append(TestIPv4Environment)
- if SUPPORTS_IPV6:
- tests.append(TestIPv6Environment)
-
- for test in tests:
- test_suite.addTest(unittest.makeSuite(test))
- remove_test_files()
- unittest.TextTestRunner(verbosity=2).run(test_suite)
- remove_test_files()
-
-
-if __name__ == '__main__':
- test_main()