diff options
Diffstat (limited to 'third_party/psutil/test')
-rw-r--r-- | third_party/psutil/test/_bsd.py | 165 | ||||
-rw-r--r-- | third_party/psutil/test/_linux.py | 72 | ||||
-rw-r--r-- | third_party/psutil/test/_osx.py | 91 | ||||
-rw-r--r-- | third_party/psutil/test/_posix.py | 148 | ||||
-rw-r--r-- | third_party/psutil/test/_windows.py | 189 | ||||
-rw-r--r-- | third_party/psutil/test/test_memory_leaks.py | 211 | ||||
-rw-r--r-- | third_party/psutil/test/test_psutil.py | 1493 |
7 files changed, 0 insertions, 2369 deletions
diff --git a/third_party/psutil/test/_bsd.py b/third_party/psutil/test/_bsd.py deleted file mode 100644 index 0296b05..0000000 --- a/third_party/psutil/test/_bsd.py +++ /dev/null @@ -1,165 +0,0 @@ -#!/usr/bin/env python -# -# $Id: _bsd.py 1142 2011-10-05 18:45:49Z g.rodola $ -# -# Copyright (c) 2009, Jay Loden, Giampaolo Rodola'. All rights reserved. -# Use of this source code is governed by a BSD-style license that can be -# found in the LICENSE file. - -"""BSD specific tests. These are implicitly run by test_psutil.py.""" - -import unittest -import subprocess -import time -import re -import sys - -import psutil - -from test_psutil import reap_children, get_test_subprocess, sh - -def sysctl(cmdline): - """Expects a sysctl command with an argument and parse the result - returning only the value of interest. - """ - result = sh("sysctl " + cmdline) - result = result[result.find(": ") + 2:] - try: - return int(result) - except ValueError: - return result - -def parse_sysctl_vmtotal(output): - """Parse sysctl vm.vmtotal output returning total and free memory - values. - """ - line = output.split('\n')[4] # our line of interest - mobj = re.match(r'Virtual\s+Memory.*Total:\s+(\d+)K,\s+Active\s+(\d+)K.*', line) - total, active = mobj.groups() - # values are represented in kilo bytes - total = int(total) * 1024 - active = int(active) * 1024 - free = total - active - return total, free - - -class BSDSpecificTestCase(unittest.TestCase): - - def setUp(self): - self.pid = get_test_subprocess().pid - - def tearDown(self): - reap_children() - - def test_TOTAL_PHYMEM(self): - sysctl_hwphymem = sysctl('sysctl hw.physmem') - self.assertEqual(sysctl_hwphymem, psutil.TOTAL_PHYMEM) - - def test_BOOT_TIME(self): - s = sysctl('sysctl kern.boottime') - s = s[s.find(" sec = ") + 7:] - s = s[:s.find(',')] - btime = int(s) - self.assertEqual(btime, psutil.BOOT_TIME) - - def test_avail_phymem(self): - # This test is not particularly accurate and may fail if the OS is - # consuming memory for other applications. - # We just want to test that the difference between psutil result - # and sysctl's is not too high. - _sum = sum((sysctl("sysctl vm.stats.vm.v_inactive_count"), - sysctl("sysctl vm.stats.vm.v_cache_count"), - sysctl("sysctl vm.stats.vm.v_free_count") - )) - _pagesize = sysctl("sysctl hw.pagesize") - sysctl_avail_phymem = _sum * _pagesize - psutil_avail_phymem = psutil.phymem_usage().free - difference = abs(psutil_avail_phymem - sysctl_avail_phymem) - # On my system both sysctl and psutil report the same values. - # Let's use a tollerance of 0.5 MB and consider the test as failed - # if we go over it. - if difference > (0.5 * 2**20): - self.fail("sysctl=%s; psutil=%s; difference=%s;" %( - sysctl_avail_phymem, psutil_avail_phymem, difference)) - - def test_total_virtmem(self): - # This test is not particularly accurate and may fail if the OS is - # consuming memory for other applications. - # We just want to test that the difference between psutil result - # and sysctl's is not too high. - p = subprocess.Popen("sysctl vm.vmtotal", shell=1, stdout=subprocess.PIPE) - result = p.communicate()[0].strip() - if sys.version_info >= (3,): - result = str(result, sys.stdout.encoding) - sysctl_total_virtmem, _ = parse_sysctl_vmtotal(result) - psutil_total_virtmem = psutil.virtmem_usage().total - difference = abs(sysctl_total_virtmem - psutil_total_virtmem) - - # On my system I get a difference of 4657152 bytes, probably because - # the system is consuming memory for this same test. - # Assuming psutil is right, let's use a tollerance of 10 MB and consider - # the test as failed if we go over it. - if difference > (10 * 2**20): - self.fail("sysctl=%s; psutil=%s; difference=%s;" %( - sysctl_total_virtmem, psutil_total_virtmem, difference)) - - def test_avail_virtmem(self): - # This test is not particularly accurate and may fail if the OS is - # consuming memory for other applications. - # We just want to test that the difference between psutil result - # and sysctl's is not too high. - p = subprocess.Popen("sysctl vm.vmtotal", shell=1, stdout=subprocess.PIPE) - result = p.communicate()[0].strip() - if sys.version_info >= (3,): - result = str(result, sys.stdout.encoding) - _, sysctl_avail_virtmem = parse_sysctl_vmtotal(result) - psutil_avail_virtmem = psutil.virtmem_usage().free - difference = abs(sysctl_avail_virtmem - psutil_avail_virtmem) - # let's assume the test is failed if difference is > 0.5 MB - if difference > (0.5 * 2**20): - self.fail(difference) - - def test_process_create_time(self): - cmdline = "ps -o lstart -p %s" %self.pid - p = subprocess.Popen(cmdline, shell=1, stdout=subprocess.PIPE) - output = p.communicate()[0] - if sys.version_info >= (3,): - output = str(output, sys.stdout.encoding) - start_ps = output.replace('STARTED', '').strip() - start_psutil = psutil.Process(self.pid).create_time - start_psutil = time.strftime("%a %b %e %H:%M:%S %Y", - time.localtime(start_psutil)) - self.assertEqual(start_ps, start_psutil) - - def test_disks(self): - # test psutil.disk_usage() and psutil.disk_partitions() - # against "df -a" - def df(path): - out = sh('df -k "%s"' % path).strip() - lines = out.split('\n') - lines.pop(0) - line = lines.pop(0) - dev, total, used, free = line.split()[:4] - if dev == 'none': - dev = '' - total = int(total) * 1024 - used = int(used) * 1024 - free = int(free) * 1024 - return dev, total, used, free - - for part in psutil.disk_partitions(all=False): - usage = psutil.disk_usage(part.mountpoint) - dev, total, used, free = df(part.mountpoint) - self.assertEqual(part.device, dev) - self.assertEqual(usage.total, total) - # 10 MB tollerance - if abs(usage.free - free) > 10 * 1024 * 1024: - self.fail("psutil=%s, df=%s" % usage.free, free) - if abs(usage.used - used) > 10 * 1024 * 1024: - self.fail("psutil=%s, df=%s" % usage.used, used) - - -if __name__ == '__main__': - test_suite = unittest.TestSuite() - test_suite.addTest(unittest.makeSuite(BSDSpecificTestCase)) - unittest.TextTestRunner(verbosity=2).run(test_suite) diff --git a/third_party/psutil/test/_linux.py b/third_party/psutil/test/_linux.py deleted file mode 100644 index a20d8dc..0000000 --- a/third_party/psutil/test/_linux.py +++ /dev/null @@ -1,72 +0,0 @@ -#!/usr/bin/env python -# -# $Id: _linux.py 1142 2011-10-05 18:45:49Z g.rodola $ -# -# Copyright (c) 2009, Jay Loden, Giampaolo Rodola'. All rights reserved. -# Use of this source code is governed by a BSD-style license that can be -# found in the LICENSE file. - -"""Linux specific tests. These are implicitly run by test_psutil.py.""" - -import unittest -import subprocess -import sys - -from test_psutil import sh -import psutil - - -class LinuxSpecificTestCase(unittest.TestCase): - - def test_cached_phymem(self): - # test psutil.cached_phymem against "cached" column of free - # command line utility - p = subprocess.Popen("free", shell=1, stdout=subprocess.PIPE) - output = p.communicate()[0].strip() - if sys.version_info >= (3,): - output = str(output, sys.stdout.encoding) - free_cmem = int(output.split('\n')[1].split()[6]) - psutil_cmem = psutil.cached_phymem() / 1024 - self.assertEqual(free_cmem, psutil_cmem) - - def test_phymem_buffers(self): - # test psutil.phymem_buffers against "buffers" column of free - # command line utility - p = subprocess.Popen("free", shell=1, stdout=subprocess.PIPE) - output = p.communicate()[0].strip() - if sys.version_info >= (3,): - output = str(output, sys.stdout.encoding) - free_cmem = int(output.split('\n')[1].split()[5]) - psutil_cmem = psutil.phymem_buffers() / 1024 - self.assertEqual(free_cmem, psutil_cmem) - - def test_disks(self): - # test psutil.disk_usage() and psutil.disk_partitions() - # against "df -a" - def df(path): - out = sh('df -P -B 1 "%s"' % path).strip() - lines = out.split('\n') - lines.pop(0) - line = lines.pop(0) - dev, total, used, free = line.split()[:4] - if dev == 'none': - dev = '' - total, used, free = int(total), int(used), int(free) - return dev, total, used, free - - for part in psutil.disk_partitions(all=False): - usage = psutil.disk_usage(part.mountpoint) - dev, total, used, free = df(part.mountpoint) - self.assertEqual(part.device, dev) - self.assertEqual(usage.total, total) - # 10 MB tollerance - if abs(usage.free - free) > 10 * 1024 * 1024: - self.fail("psutil=%s, df=%s" % usage.free, free) - if abs(usage.used - used) > 10 * 1024 * 1024: - self.fail("psutil=%s, df=%s" % usage.used, used) - - -if __name__ == '__main__': - test_suite = unittest.TestSuite() - test_suite.addTest(unittest.makeSuite(LinuxSpecificTestCase)) - unittest.TextTestRunner(verbosity=2).run(test_suite) diff --git a/third_party/psutil/test/_osx.py b/third_party/psutil/test/_osx.py deleted file mode 100644 index fdee67b..0000000 --- a/third_party/psutil/test/_osx.py +++ /dev/null @@ -1,91 +0,0 @@ -#!/usr/bin/env python -# -# $Id: _osx.py 1142 2011-10-05 18:45:49Z g.rodola $ -# -# Copyright (c) 2009, Jay Loden, Giampaolo Rodola'. All rights reserved. -# Use of this source code is governed by a BSD-style license that can be -# found in the LICENSE file. - -"""OSX specific tests. These are implicitly run by test_psutil.py.""" - -import unittest -import subprocess -import time -import sys - -import psutil - -from test_psutil import reap_children, get_test_subprocess, sh -#from _posix import ps - - -def sysctl(cmdline): - """Expects a sysctl command with an argument and parse the result - returning only the value of interest. - """ - p = subprocess.Popen(cmdline, shell=1, stdout=subprocess.PIPE) - result = p.communicate()[0].strip().split()[1] - if sys.version_info >= (3,): - result = str(result, sys.stdout.encoding) - try: - return int(result) - except ValueError: - return result - - -class OSXSpecificTestCase(unittest.TestCase): - - def setUp(self): - self.pid = get_test_subprocess().pid - - def tearDown(self): - reap_children() - - def test_TOTAL_PHYMEM(self): - sysctl_hwphymem = sysctl('sysctl hw.memsize') - self.assertEqual(sysctl_hwphymem, psutil.TOTAL_PHYMEM) - - def test_process_create_time(self): - cmdline = "ps -o lstart -p %s" %self.pid - p = subprocess.Popen(cmdline, shell=1, stdout=subprocess.PIPE) - output = p.communicate()[0] - if sys.version_info >= (3,): - output = str(output, sys.stdout.encoding) - start_ps = output.replace('STARTED', '').strip() - start_psutil = psutil.Process(self.pid).create_time - start_psutil = time.strftime("%a %b %e %H:%M:%S %Y", - time.localtime(start_psutil)) - self.assertEqual(start_ps, start_psutil) - - def test_disks(self): - # test psutil.disk_usage() and psutil.disk_partitions() - # against "df -a" - def df(path): - out = sh('df -k "%s"' % path).strip() - lines = out.split('\n') - lines.pop(0) - line = lines.pop(0) - dev, total, used, free = line.split()[:4] - if dev == 'none': - dev = '' - total = int(total) * 1024 - used = int(used) * 1024 - free = int(free) * 1024 - return dev, total, used, free - - for part in psutil.disk_partitions(all=False): - usage = psutil.disk_usage(part.mountpoint) - dev, total, used, free = df(part.mountpoint) - self.assertEqual(part.device, dev) - self.assertEqual(usage.total, total) - # 10 MB tollerance - if abs(usage.free - free) > 10 * 1024 * 1024: - self.fail("psutil=%s, df=%s" % usage.free, free) - if abs(usage.used - used) > 10 * 1024 * 1024: - self.fail("psutil=%s, df=%s" % usage.used, used) - - -if __name__ == '__main__': - test_suite = unittest.TestSuite() - test_suite.addTest(unittest.makeSuite(OSXSpecificTestCase)) - unittest.TextTestRunner(verbosity=2).run(test_suite) diff --git a/third_party/psutil/test/_posix.py b/third_party/psutil/test/_posix.py deleted file mode 100644 index ad0db64..0000000 --- a/third_party/psutil/test/_posix.py +++ /dev/null @@ -1,148 +0,0 @@ -#!/usr/bin/env python -# -# $Id: _posix.py 1142 2011-10-05 18:45:49Z g.rodola $ -# -# Copyright (c) 2009, Jay Loden, Giampaolo Rodola'. All rights reserved. -# Use of this source code is governed by a BSD-style license that can be -# found in the LICENSE file. - -"""POSIX specific tests. These are implicitly run by test_psutil.py.""" - -import unittest -import subprocess -import time -import sys -import os - -import psutil - -from test_psutil import (get_test_subprocess, reap_children, PYTHON, LINUX, OSX, - ignore_access_denied, sh) - - -def ps(cmd): - """Expects a ps command with a -o argument and parse the result - returning only the value of interest. - """ - if not LINUX: - cmd = cmd.replace(" --no-headers ", " ") - p = subprocess.Popen(cmd, shell=1, stdout=subprocess.PIPE) - output = p.communicate()[0].strip() - if sys.version_info >= (3,): - output = str(output, sys.stdout.encoding) - if not LINUX: - output = output.split('\n')[1] - try: - return int(output) - except ValueError: - return output - - -class PosixSpecificTestCase(unittest.TestCase): - """Compare psutil results against 'ps' command line utility.""" - - # for ps -o arguments see: http://unixhelp.ed.ac.uk/CGI/man-cgi?ps - - def setUp(self): - self.pid = get_test_subprocess([PYTHON, "-E", "-O"]).pid - - def tearDown(self): - reap_children() - - def test_process_parent_pid(self): - ppid_ps = ps("ps --no-headers -o ppid -p %s" %self.pid) - ppid_psutil = psutil.Process(self.pid).ppid - self.assertEqual(ppid_ps, ppid_psutil) - - def test_process_uid(self): - uid_ps = ps("ps --no-headers -o uid -p %s" %self.pid) - uid_psutil = psutil.Process(self.pid).uids.real - self.assertEqual(uid_ps, uid_psutil) - - def test_process_gid(self): - gid_ps = ps("ps --no-headers -o rgid -p %s" %self.pid) - gid_psutil = psutil.Process(self.pid).gids.real - self.assertEqual(gid_ps, gid_psutil) - - def test_process_username(self): - username_ps = ps("ps --no-headers -o user -p %s" %self.pid) - username_psutil = psutil.Process(self.pid).username - self.assertEqual(username_ps, username_psutil) - - @ignore_access_denied - def test_process_rss_memory(self): - # give python interpreter some time to properly initialize - # so that the results are the same - time.sleep(0.1) - rss_ps = ps("ps --no-headers -o rss -p %s" %self.pid) - rss_psutil = psutil.Process(self.pid).get_memory_info()[0] / 1024 - self.assertEqual(rss_ps, rss_psutil) - - @ignore_access_denied - def test_process_vsz_memory(self): - # give python interpreter some time to properly initialize - # so that the results are the same - time.sleep(0.1) - vsz_ps = ps("ps --no-headers -o vsz -p %s" %self.pid) - vsz_psutil = psutil.Process(self.pid).get_memory_info()[1] / 1024 - self.assertEqual(vsz_ps, vsz_psutil) - - def test_process_name(self): - # use command + arg since "comm" keyword not supported on all platforms - name_ps = ps("ps --no-headers -o command -p %s" %self.pid).split(' ')[0] - # remove path if there is any, from the command - name_ps = os.path.basename(name_ps).lower() - name_psutil = psutil.Process(self.pid).name.lower() - self.assertEqual(name_ps, name_psutil) - - def test_process_exe(self): - ps_pathname = ps("ps --no-headers -o command -p %s" %self.pid).split(' ')[0] - psutil_pathname = psutil.Process(self.pid).exe - try: - self.assertEqual(ps_pathname, psutil_pathname) - except AssertionError: - # certain platforms such as BSD are more accurate returning: - # "/usr/local/bin/python2.7" - # ...instead of: - # "/usr/local/bin/python" - # We do not want to consider this difference in accuracy - # an error. - adjusted_ps_pathname = ps_pathname[:len(ps_pathname)] - self.assertEqual(ps_pathname, adjusted_ps_pathname) - - def test_process_cmdline(self): - ps_cmdline = ps("ps --no-headers -o command -p %s" %self.pid) - psutil_cmdline = " ".join(psutil.Process(self.pid).cmdline) - self.assertEqual(ps_cmdline, psutil_cmdline) - - def test_get_pids(self): - # Note: this test might fail if the OS is starting/killing - # other processes in the meantime - p = get_test_subprocess(["ps", "ax", "-o", "pid"], stdout=subprocess.PIPE) - output = p.communicate()[0].strip() - if sys.version_info >= (3,): - output = str(output, sys.stdout.encoding) - output = output.replace('PID', '') - p.wait() - pids_ps = [] - for pid in output.split('\n'): - if pid: - pids_ps.append(int(pid.strip())) - # remove ps subprocess pid which is supposed to be dead in meantime - pids_ps.remove(p.pid) - pids_psutil = psutil.get_pid_list() - pids_ps.sort() - pids_psutil.sort() - - if pids_ps != pids_psutil: - difference = [x for x in pids_psutil if x not in pids_ps] + \ - [x for x in pids_ps if x not in pids_psutil] - self.fail("difference: " + str(difference)) - - -if __name__ == '__main__': - test_suite = unittest.TestSuite() - test_suite.addTest(unittest.makeSuite(PosixSpecificTestCase)) - unittest.TextTestRunner(verbosity=2).run(test_suite) - - diff --git a/third_party/psutil/test/_windows.py b/third_party/psutil/test/_windows.py deleted file mode 100644 index ef5ff24..0000000 --- a/third_party/psutil/test/_windows.py +++ /dev/null @@ -1,189 +0,0 @@ -#!/usr/bin/env python -# -# $Id: _windows.py 1142 2011-10-05 18:45:49Z g.rodola $ -# -# Copyright (c) 2009, Jay Loden, Giampaolo Rodola'. All rights reserved. -# Use of this source code is governed by a BSD-style license that can be -# found in the LICENSE file. - -"""Windows specific tests. These are implicitly run by test_psutil.py.""" - -import os -import unittest -import platform -import signal -import time -import warnings -import atexit -import sys - -import psutil -import _psutil_mswindows -from test_psutil import reap_children, get_test_subprocess, wait_for_pid -try: - import wmi -except ImportError: - err = sys.exc_info()[1] - atexit.register(warnings.warn, "Couldn't run wmi tests: %s" % str(err), - RuntimeWarning) - wmi = None - -WIN2000 = platform.win32_ver()[0] == '2000' - - -class WindowsSpecificTestCase(unittest.TestCase): - - def setUp(self): - sproc = get_test_subprocess() - wait_for_pid(sproc.pid) - self.pid = sproc.pid - - def tearDown(self): - reap_children() - - def test_issue_24(self): - p = psutil.Process(0) - self.assertRaises(psutil.AccessDenied, p.kill) - - def test_special_pid(self): - if not WIN2000: - p = psutil.Process(4) - else: - p = psutil.Process(8) - self.assertEqual(p.name, 'System') - # use __str__ to access all common Process properties to check - # that nothing strange happens - str(p) - p.username - self.assertTrue(p.create_time >= 0.0) - try: - rss, vms = p.get_memory_info() - except psutil.AccessDenied: - # expected on Windows Vista and Windows 7 - if not platform.uname()[1] in ('vista', 'win-7', 'win7'): - raise - else: - self.assertTrue(rss > 0) - if not WIN2000: - self.assertEqual(vms, 0) - - def test_signal(self): - p = psutil.Process(self.pid) - self.assertRaises(ValueError, p.send_signal, signal.SIGINT) - - if wmi is not None: - - # --- Process class tests - - def test_process_name(self): - w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] - p = psutil.Process(self.pid) - self.assertEqual(p.name, w.Caption) - - def test_process_exe(self): - w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] - p = psutil.Process(self.pid) - self.assertEqual(p.exe, w.ExecutablePath) - - if not WIN2000: - def test_process_cmdline(self): - w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] - p = psutil.Process(self.pid) - self.assertEqual(' '.join(p.cmdline), w.CommandLine.replace('"', '')) - - def test_process_username(self): - w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] - p = psutil.Process(self.pid) - domain, _, username = w.GetOwner() - username = "%s\\%s" %(domain, username) - self.assertEqual(p.username, username) - - def test_process_rss_memory(self): - time.sleep(0.1) - w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] - p = psutil.Process(self.pid) - rss = p.get_memory_info().rss - self.assertEqual(rss, int(w.WorkingSetSize)) - - def test_process_vms_memory(self): - time.sleep(0.1) - w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] - p = psutil.Process(self.pid) - vms = p.get_memory_info().vms - # http://msdn.microsoft.com/en-us/library/aa394372(VS.85).aspx - # ...claims that PageFileUsage is represented in Kilo - # bytes but funnily enough on certain platforms bytes are - # returned instead. - wmi_usage = int(w.PageFileUsage) - if (vms != wmi_usage) and (vms != wmi_usage * 1024): - self.fail("wmi=%s, psutil=%s" % (wmi_usage, vms)) - - def test_process_create_time(self): - w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] - p = psutil.Process(self.pid) - wmic_create = str(w.CreationDate.split('.')[0]) - psutil_create = time.strftime("%Y%m%d%H%M%S", - time.localtime(p.create_time)) - self.assertEqual(wmic_create, psutil_create) - - - # --- psutil namespace functions and constants tests - - def test_NUM_CPUS(self): - num_cpus = int(os.environ['NUMBER_OF_PROCESSORS']) - self.assertEqual(num_cpus, psutil.NUM_CPUS) - - def test_TOTAL_PHYMEM(self): - w = wmi.WMI().Win32_ComputerSystem()[0] - self.assertEqual(int(w.TotalPhysicalMemory), psutil.TOTAL_PHYMEM) - - def test__UPTIME(self): - # _UPTIME constant is not public but it is used internally - # as value to return for pid 0 creation time. - # WMI behaves the same. - w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] - p = psutil.Process(0) - wmic_create = str(w.CreationDate.split('.')[0]) - psutil_create = time.strftime("%Y%m%d%H%M%S", - time.localtime(p.create_time)) - # XXX - ? no actual test here - - def test_get_pids(self): - # Note: this test might fail if the OS is starting/killing - # other processes in the meantime - w = wmi.WMI().Win32_Process() - wmi_pids = [x.ProcessId for x in w] - wmi_pids.sort() - psutil_pids = psutil.get_pid_list() - psutil_pids.sort() - if wmi_pids != psutil_pids: - difference = filter(lambda x:x not in wmi_pids, psutil_pids) + \ - filter(lambda x:x not in psutil_pids, wmi_pids) - self.fail("difference: " + str(difference)) - - def test_disks(self): - ps_parts = psutil.disk_partitions(all=True) - wmi_parts = wmi.WMI().Win32_LogicalDisk() - for ps_part in ps_parts: - for wmi_part in wmi_parts: - if ps_part.device.replace('\\', '') == wmi_part.DeviceID: - if not ps_part.mountpoint: - # this is usually a CD-ROM with no disk inserted - break - usage = psutil.disk_usage(ps_part.mountpoint) - self.assertEqual(usage.total, int(wmi_part.Size)) - wmi_free = int(wmi_part.FreeSpace) - self.assertEqual(usage.free, wmi_free) - # 10 MB tollerance - if abs(usage.free - wmi_free) > 10 * 1024 * 1024: - self.fail("psutil=%s, wmi=%s" % usage.free, wmi_free) - break - else: - self.fail("can't find partition %r", ps_part) - - -if __name__ == '__main__': - test_suite = unittest.TestSuite() - test_suite.addTest(unittest.makeSuite(WindowsSpecificTestCase)) - unittest.TextTestRunner(verbosity=2).run(test_suite) - diff --git a/third_party/psutil/test/test_memory_leaks.py b/third_party/psutil/test/test_memory_leaks.py deleted file mode 100644 index b6a3a4e..0000000 --- a/third_party/psutil/test/test_memory_leaks.py +++ /dev/null @@ -1,211 +0,0 @@ -#!/usr/bin/env python -# -# $Id: test_memory_leaks.py 1142 2011-10-05 18:45:49Z g.rodola $ -# -# Copyright (c) 2009, Jay Loden, Giampaolo Rodola'. All rights reserved. -# Use of this source code is governed by a BSD-style license that can be -# found in the LICENSE file. - -""" -A test script which attempts to detect memory leaks by calling C -functions many times and compare process memory usage before and -after the calls. It might produce false positives. -""" - -import os -import gc -import unittest -import time - -import psutil -from test_psutil import reap_children, skipUnless, skipIf, \ - POSIX, LINUX, WINDOWS, OSX, BSD, PY3 - -LOOPS = 1000 -TOLERANCE = 4096 - -if PY3: - xrange = range - - -class Base(unittest.TestCase): - - def execute(self, function, *args, **kwargs): - # step 1 - for x in xrange(LOOPS): - self.call(function, *args, **kwargs) - del x - gc.collect() - rss1 = self.get_mem() - - # step 2 - for x in xrange(LOOPS): - self.call(function, *args, **kwargs) - del x - gc.collect() - rss2 = self.get_mem() - - # comparison - difference = rss2 - rss1 - if difference > TOLERANCE: - # This doesn't necessarily mean we have a leak yet. - # At this point we assume that after having called the - # function so many times the memory usage is stabilized - # and if there are no leaks it should not increase any - # more. - # Let's keep calling fun for 3 more seconds and fail if - # we notice any difference. - stop_at = time.time() + 3 - while 1: - self.call(function, *args, **kwargs) - if time.time() >= stop_at: - break - del stop_at - gc.collect() - rss3 = self.get_mem() - difference = rss3 - rss2 - if rss3 > rss2: - self.fail("rss2=%s, rss3=%s, difference=%s" \ - % (rss2, rss3, difference)) - - @staticmethod - def get_mem(): - return psutil.Process(os.getpid()).get_memory_info()[0] - - def call(self): - raise NotImplementedError("must be implemented in subclass") - - -class TestProcessObjectLeaks(Base): - """Test leaks of Process class methods and properties""" - - def setUp(self): - gc.collect() - - def tearDown(self): - reap_children() - - def call(self, function, *args, **kwargs): - p = psutil.Process(os.getpid()) - obj = getattr(p, function) - if callable(obj): - obj(*args, **kwargs) - - def test_name(self): - self.execute('name') - - def test_cmdline(self): - self.execute('cmdline') - - def test_ppid(self): - self.execute('ppid') - - @skipIf(WINDOWS) - def test_uids(self): - self.execute('uids') - - @skipIf(WINDOWS) - def test_gids(self): - self.execute('gids') - - def test_status(self): - self.execute('status') - - @skipIf(POSIX) - def test_username(self): - self.execute('username') - - def test_create_time(self): - self.execute('create_time') - - def test_get_num_threads(self): - self.execute('get_num_threads') - - def test_get_threads(self): - self.execute('get_threads') - - def test_get_cpu_times(self): - self.execute('get_cpu_times') - - def test_get_memory_info(self): - self.execute('get_memory_info') - - def test_is_running(self): - self.execute('is_running') - - @skipIf(WINDOWS) - def test_terminal(self): - self.execute('terminal') - - @skipUnless(WINDOWS) - def test_resume(self): - self.execute('resume') - - @skipUnless(WINDOWS) - def test_getcwd(self): - self.execute('getcwd') - - @skipUnless(WINDOWS or OSX) - def test_get_open_files(self): - self.execute('get_open_files') - - @skipUnless(WINDOWS or OSX) - def test_get_connections(self): - self.execute('get_connections') - - -class TestModuleFunctionsLeaks(Base): - """Test leaks of psutil module functions.""" - - def setUp(self): - gc.collect() - - def call(self, function, *args, **kwargs): - obj = getattr(psutil, function) - if callable(obj): - retvalue = obj(*args, **kwargs) - - def test_get_pid_list(self): - self.execute('get_pid_list') - - @skipIf(POSIX) - def test_pid_exists(self): - self.execute('pid_exists', os.getpid()) - - def test_process_iter(self): - self.execute('process_iter') - - def test_phymem_usage(self): - self.execute('phymem_usage') - - def test_virtmem_usage(self): - self.execute('virtmem_usage') - - def test_cpu_times(self): - self.execute('cpu_times') - - def test_per_cpu_times(self): - self.execute('cpu_times', percpu=True) - - @skipUnless(WINDOWS) - def test_disk_usage(self): - self.execute('disk_usage', '.') - - def test_disk_partitions(self): - self.execute('disk_partitions') - - def test_network_io_counters(self): - self.execute('network_io_counters') - - def test_disk_io_counters(self): - self.execute('disk_io_counters') - -def test_main(): - test_suite = unittest.TestSuite() - test_suite.addTest(unittest.makeSuite(TestProcessObjectLeaks)) - test_suite.addTest(unittest.makeSuite(TestModuleFunctionsLeaks)) - unittest.TextTestRunner(verbosity=2).run(test_suite) - -if __name__ == '__main__': - test_main() - diff --git a/third_party/psutil/test/test_psutil.py b/third_party/psutil/test/test_psutil.py deleted file mode 100644 index 35ed744..0000000 --- a/third_party/psutil/test/test_psutil.py +++ /dev/null @@ -1,1493 +0,0 @@ -#!/usr/bin/env python -# -# $Id: test_psutil.py 1142 2011-10-05 18:45:49Z g.rodola $ -# -# Copyright (c) 2009, Jay Loden, Giampaolo Rodola'. All rights reserved. -# Use of this source code is governed by a BSD-style license that can be -# found in the LICENSE file. - -""" -psutil test suite. - -Note: this is targeted for both python 2.x and 3.x so there's no need -to use 2to3 tool first. -""" - -import unittest -import os -import sys -import subprocess -import time -import signal -import types -import traceback -import socket -import warnings -import atexit -import errno -import threading -import tempfile -import collections - -import psutil - - -PYTHON = os.path.realpath(sys.executable) -DEVNULL = open(os.devnull, 'r+') -TESTFN = os.path.join(os.getcwd(), "$testfile") -PY3 = sys.version_info >= (3,) -POSIX = os.name == 'posix' -LINUX = sys.platform.lower().startswith("linux") -WINDOWS = sys.platform.lower().startswith("win32") -OSX = sys.platform.lower().startswith("darwin") -BSD = sys.platform.lower().startswith("freebsd") - -try: - psutil.Process(os.getpid()).get_connections() -except NotImplementedError: - err = sys.exc_info()[1] - SUPPORT_CONNECTIONS = False - atexit.register(warnings.warn, "get_connections() not supported on this platform", - RuntimeWarning) -else: - SUPPORT_CONNECTIONS = True - -if PY3: - long = int - - def callable(attr): - return isinstance(attr, collections.Callable) - - -_subprocesses_started = set() - -def get_test_subprocess(cmd=None, stdout=DEVNULL, stderr=DEVNULL, stdin=None): - """Return a subprocess.Popen object to use in tests. - By default stdout and stderr are redirected to /dev/null and the - python interpreter is used as test process. - """ - if cmd is None: - cmd = [PYTHON, "-c", "import time; time.sleep(3600);"] - sproc = subprocess.Popen(cmd, stdout=stdout, stderr=stderr, stdin=stdin) - _subprocesses_started.add(sproc.pid) - return sproc - -def sh(cmdline): - """run cmd in a subprocess and return its output. - raises RuntimeError on error. - """ - p = subprocess.Popen(cmdline, shell=True, stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - stdout, stderr = p.communicate() - if p.returncode != 0: - raise RuntimeError(stderr) - if stderr: - warnings.warn(stderr, RuntimeWarning) - if sys.version_info >= (3,): - stdout = str(stdout, sys.stdout.encoding) - return stdout.strip() - -def wait_for_pid(pid, timeout=1): - """Wait for pid to show up in the process list then return. - Used in the test suite to give time the sub process to initialize. - """ - raise_at = time.time() + timeout - while 1: - if pid in psutil.get_pid_list(): - # give it one more iteration to allow full initialization - time.sleep(0.01) - return - time.sleep(0.0001) - if time.time() >= raise_at: - raise RuntimeError("Timed out") - -def reap_children(search_all=False): - """Kill any subprocess started by this test suite and ensure that - no zombies stick around to hog resources and create problems when - looking for refleaks. - """ - if search_all: - this_process = psutil.Process(os.getpid()) - pids = [x.pid for x in this_process.get_children()] - else: - pids =_subprocesses_started - while pids: - pid = pids.pop() - try: - child = psutil.Process(pid) - child.kill() - except psutil.NoSuchProcess: - pass - else: - child.wait() - - -# we want to search through all processes before exiting -atexit.register(reap_children, search_all=True) - -def skipIf(condition, reason="", warn=False): - """Decorator which skip a test under if condition is satisfied. - This is a substitute of unittest.skipIf which is available - only in python 2.7 and 3.2. - If 'reason' argument is provided this will be printed during - tests execution. - If 'warn' is provided a RuntimeWarning will be shown when all - tests are run. - """ - def outer(fun, *args, **kwargs): - def inner(self): - if condition: - sys.stdout.write("skipped-") - sys.stdout.flush() - if warn: - objname = "%s.%s" % (self.__class__.__name__, fun.__name__) - msg = "%s was skipped" % objname - if reason: - msg += "; reason: " + repr(reason) - atexit.register(warnings.warn, msg, RuntimeWarning) - return - else: - return fun(self, *args, **kwargs) - return inner - return outer - -def skipUnless(condition, reason="", warn=False): - """Contrary of skipIf.""" - if not condition: - return skipIf(True, reason, warn) - return skipIf(False) - -def ignore_access_denied(fun): - """Decorator to Ignore AccessDenied exceptions.""" - def outer(fun, *args, **kwargs): - def inner(self): - try: - return fun(self, *args, **kwargs) - except psutil.AccessDenied: - pass - return inner - return outer - -def supports_ipv6(): - """Return True if IPv6 is supported on this platform.""" - if not socket.has_ipv6 or not hasattr(socket, "AF_INET6"): - return False - sock = None - try: - try: - sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) - sock.bind(("::1", 0)) - except (socket.error, socket.gaierror): - return False - else: - return True - finally: - if sock is not None: - sock.close() - - -class ThreadTask(threading.Thread): - """A thread object used for running process thread tests.""" - - def __init__(self): - threading.Thread.__init__(self) - self._running = False - self._interval = None - self._flag = threading.Event() - - def __repr__(self): - name = self.__class__.__name__ - return '<%s running=%s at %#x>' % (name, self._running, id(self)) - - def start(self, interval=0.001): - """Start thread and keep it running until an explicit - stop() request. Polls for shutdown every 'timeout' seconds. - """ - if self._running: - raise ValueError("already started") - self._interval = interval - threading.Thread.start(self) - self._flag.wait() - - def run(self): - self._running = True - self._flag.set() - while self._running: - time.sleep(self._interval) - - def stop(self): - """Stop thread execution and and waits until it is stopped.""" - if not self._running: - raise ValueError("already stopped") - self._running = False - self.join() - - -class TestCase(unittest.TestCase): - - def tearDown(self): - reap_children() - - # ============================ - # tests for system-related API - # ============================ - - def test_get_process_list(self): - pids = [x.pid for x in psutil.get_process_list()] - self.assertTrue(os.getpid() in pids) - - def test_process_iter(self): - pids = [x.pid for x in psutil.process_iter()] - self.assertTrue(os.getpid() in pids) - - def test_TOTAL_PHYMEM(self): - x = psutil.TOTAL_PHYMEM - self.assertTrue(isinstance(x, (int, long))) - self.assertTrue(x > 0) - - def test_BOOT_TIME(self): - x = psutil.BOOT_TIME - self.assertTrue(isinstance(x, float)) - self.assertTrue(x > 0) - - def test_deprecated_memory_functions(self): - warnings.filterwarnings("error") - try: - self.assertRaises(DeprecationWarning, psutil.used_phymem) - self.assertRaises(DeprecationWarning, psutil.avail_phymem) - self.assertRaises(DeprecationWarning, psutil.total_virtmem) - self.assertRaises(DeprecationWarning, psutil.used_virtmem) - self.assertRaises(DeprecationWarning, psutil.avail_virtmem) - finally: - warnings.resetwarnings() - - def test_phymem_usage(self): - mem = psutil.phymem_usage() - self.assertTrue(mem.total > 0) - self.assertTrue(mem.used > 0) - self.assertTrue(mem.free > 0) - self.assertTrue(0 <= mem.percent <= 100) - - def test_virtmem_usage(self): - mem = psutil.virtmem_usage() - self.assertTrue(mem.total > 0) - self.assertTrue(mem.used >= 0) - self.assertTrue(mem.free > 0) - self.assertTrue(0 <= mem.percent <= 100) - - @skipUnless(LINUX) - def test_phymem_buffers(self): - x = psutil.phymem_buffers() - self.assertTrue(isinstance(x, (int, long))) - self.assertTrue(x >= 0) - - def test_pid_exists(self): - sproc = get_test_subprocess() - wait_for_pid(sproc.pid) - self.assertTrue(psutil.pid_exists(sproc.pid)) - p = psutil.Process(sproc.pid) - p.kill() - p.wait() - self.assertFalse(psutil.pid_exists(sproc.pid)) - self.assertFalse(psutil.pid_exists(-1)) - - def test_pid_exists_2(self): - reap_children() - pids = psutil.get_pid_list() - for pid in pids: - try: - self.assertTrue(psutil.pid_exists(pid)) - except AssertionError: - # in case the process disappeared in meantime fail only - # if it is no longer in get_pid_list() - time.sleep(.1) - if pid in psutil.get_pid_list(): - self.fail(pid) - pids = range(max(pids) + 5000, max(pids) + 6000) - for pid in pids: - self.assertFalse(psutil.pid_exists(pid)) - - def test_get_pid_list(self): - plist = [x.pid for x in psutil.get_process_list()] - pidlist = psutil.get_pid_list() - self.assertEqual(plist.sort(), pidlist.sort()) - # make sure every pid is unique - self.assertEqual(len(pidlist), len(set(pidlist))) - - def test_test(self): - # test for psutil.test() function - stdout = sys.stdout - sys.stdout = DEVNULL - try: - psutil.test() - finally: - sys.stdout = stdout - - def test_sys_cpu_times(self): - total = 0 - times = psutil.cpu_times() - sum(times) - for cp_time in times: - self.assertTrue(isinstance(cp_time, float)) - self.assertTrue(cp_time >= 0.0) - total += cp_time - self.assertEqual(total, sum(times)) - str(times) - - def test_sys_cpu_times2(self): - t1 = sum(psutil.cpu_times()) - time.sleep(0.1) - t2 = sum(psutil.cpu_times()) - difference = t2 - t1 - if not difference >= 0.05: - self.fail("difference %s" % difference) - - def test_sys_per_cpu_times(self): - for times in psutil.cpu_times(percpu=True): - total = 0 - sum(times) - for cp_time in times: - self.assertTrue(isinstance(cp_time, float)) - self.assertTrue(cp_time >= 0.0) - total += cp_time - self.assertEqual(total, sum(times)) - str(times) - - def test_sys_per_cpu_times2(self): - tot1 = psutil.cpu_times(percpu=True) - stop_at = time.time() + 0.1 - while 1: - if time.time() >= stop_at: - break - tot2 = psutil.cpu_times(percpu=True) - for t1, t2 in zip(tot1, tot2): - t1, t2 = sum(t1), sum(t2) - difference = t2 - t1 - if difference >= 0.05: - return - self.fail() - - def test_sys_cpu_percent(self): - psutil.cpu_percent(interval=0.001) - psutil.cpu_percent(interval=0.001) - for x in range(1000): - percent = psutil.cpu_percent(interval=None) - self.assertTrue(isinstance(percent, float)) - self.assertTrue(percent >= 0.0) - self.assertTrue(percent <= 100.0) - - def test_sys_per_cpu_percent(self): - psutil.cpu_percent(interval=0.001, percpu=True) - psutil.cpu_percent(interval=0.001, percpu=True) - for x in range(1000): - percents = psutil.cpu_percent(interval=None, percpu=True) - for percent in percents: - self.assertTrue(isinstance(percent, float)) - self.assertTrue(percent >= 0.0) - self.assertTrue(percent <= 100.0) - - def test_sys_cpu_percent_compare(self): - psutil.cpu_percent(interval=0) - psutil.cpu_percent(interval=0, percpu=True) - time.sleep(.1) - t1 = psutil.cpu_percent(interval=0) - t2 = psutil.cpu_percent(interval=0, percpu=True) - # calculate total average - t2 = sum(t2) / len(t2) - if abs(t1 - t2) > 5: - self.assertEqual(t1, t2) - - def test_disk_usage(self): - usage = psutil.disk_usage(os.getcwd()) - self.assertTrue(usage.total > 0) - self.assertTrue(usage.used > 0) - self.assertTrue(usage.free > 0) - self.assertTrue(usage.total > usage.used) - self.assertTrue(usage.total > usage.free) - self.assertTrue(0 <= usage.percent <= 100) - - # if path does not exist OSError ENOENT is expected across - # all platforms - fname = tempfile.mktemp() - try: - psutil.disk_usage(fname) - except OSError: - err = sys.exc_info()[1] - if err.args[0] != errno.ENOENT: - raise - else: - self.fail("OSError not raised") - - def test_disk_partitions(self): - for disk in psutil.disk_partitions(all=False): - self.assertTrue(os.path.exists(disk.device)) - self.assertTrue(os.path.isdir(disk.mountpoint)) - self.assertTrue(disk.fstype) - for disk in psutil.disk_partitions(all=True): - if not WINDOWS: - self.assertTrue(os.path.isdir(disk.mountpoint)) - self.assertTrue(disk.fstype) - - def find_mount_point(path): - path = os.path.abspath(path) - while not os.path.ismount(path): - path = os.path.dirname(path) - return path - - mount = find_mount_point(__file__) - mounts = [x.mountpoint for x in psutil.disk_partitions(all=True)] - self.assertTrue(mount in mounts) - psutil.disk_usage(mount) - - # XXX - @skipUnless(hasattr(psutil, "network_io_counters")) - def test_anetwork_io_counters(self): - def check_ntuple(nt): - self.assertEqual(nt[0], nt.bytes_sent) - self.assertEqual(nt[1], nt.bytes_recv) - self.assertEqual(nt[2], nt.packets_sent) - self.assertEqual(nt[3], nt.packets_recv) - self.assertTrue(nt.bytes_sent >= 0) - self.assertTrue(nt.bytes_recv >= 0) - self.assertTrue(nt.packets_sent >= 0) - self.assertTrue(nt.packets_recv >= 0) - - ret = psutil.network_io_counters(pernic=False) - check_ntuple(ret) - ret = psutil.network_io_counters(pernic=True) - for name, ntuple in ret.iteritems(): - self.assertTrue(name) - check_ntuple(ntuple) - # XXX - @skipUnless(hasattr(psutil, "disk_io_counters")) - def test_disk_io_counters(self): - def check_ntuple(nt): - self.assertEqual(nt[0], nt.read_count) - self.assertEqual(nt[1], nt.write_count) - self.assertEqual(nt[2], nt.read_bytes) - self.assertEqual(nt[3], nt.write_bytes) - self.assertEqual(nt[4], nt.read_time) - self.assertEqual(nt[5], nt.write_time) - self.assertTrue(nt.read_count >= 0) - self.assertTrue(nt.write_count >= 0) - self.assertTrue(nt.read_bytes >= 0) - self.assertTrue(nt.write_bytes >= 0) - self.assertTrue(nt.read_time >= 0) - self.assertTrue(nt.write_time >= 0) - - ret = psutil.disk_io_counters(perdisk=False) - check_ntuple(ret) - ret = psutil.disk_io_counters(perdisk=True) - for name, ntuple in ret.iteritems(): - self.assertTrue(name) - check_ntuple(ntuple) - - # ==================== - # Process object tests - # ==================== - - def test_kill(self): - sproc = get_test_subprocess() - test_pid = sproc.pid - wait_for_pid(test_pid) - p = psutil.Process(test_pid) - name = p.name - p.kill() - p.wait() - self.assertFalse(psutil.pid_exists(test_pid) and name == PYTHON) - - def test_terminate(self): - sproc = get_test_subprocess() - test_pid = sproc.pid - wait_for_pid(test_pid) - p = psutil.Process(test_pid) - name = p.name - p.terminate() - p.wait() - self.assertFalse(psutil.pid_exists(test_pid) and name == PYTHON) - - def test_send_signal(self): - if POSIX: - sig = signal.SIGKILL - else: - sig = signal.SIGTERM - sproc = get_test_subprocess() - test_pid = sproc.pid - p = psutil.Process(test_pid) - name = p.name - p.send_signal(sig) - p.wait() - self.assertFalse(psutil.pid_exists(test_pid) and name == PYTHON) - - def test_wait(self): - # check exit code signal - sproc = get_test_subprocess() - p = psutil.Process(sproc.pid) - p.kill() - code = p.wait() - if os.name == 'posix': - self.assertEqual(code, signal.SIGKILL) - else: - self.assertEqual(code, 0) - self.assertFalse(p.is_running()) - - sproc = get_test_subprocess() - p = psutil.Process(sproc.pid) - p.terminate() - code = p.wait() - if os.name == 'posix': - self.assertEqual(code, signal.SIGTERM) - else: - self.assertEqual(code, 0) - self.assertFalse(p.is_running()) - - # check sys.exit() code - code = "import time, sys; time.sleep(0.01); sys.exit(5);" - sproc = get_test_subprocess([PYTHON, "-c", code]) - p = psutil.Process(sproc.pid) - self.assertEqual(p.wait(), 5) - self.assertFalse(p.is_running()) - - # Test wait() issued twice. - # It is not supposed to raise NSP when the process is gone. - # On UNIX this should return None, on Windows it should keep - # returning the exit code. - sproc = get_test_subprocess([PYTHON, "-c", code]) - p = psutil.Process(sproc.pid) - self.assertEqual(p.wait(), 5) - self.assertTrue(p.wait() in (5, None)) - - # test timeout - sproc = get_test_subprocess() - p = psutil.Process(sproc.pid) - p.name - self.assertRaises(psutil.TimeoutExpired, p.wait, 0.01) - - # timeout < 0 not allowed - self.assertRaises(ValueError, p.wait, -1) - - @skipUnless(POSIX) - def test_wait_non_children(self): - # test wait() against processes which are not our children - code = "import sys;" - code += "from subprocess import Popen, PIPE;" - code += "cmd = ['%s', '-c', 'import time; time.sleep(10)'];" %PYTHON - code += "sp = Popen(cmd, stdout=PIPE);" - code += "sys.stdout.write(str(sp.pid));" - sproc = get_test_subprocess([PYTHON, "-c", code], stdout=subprocess.PIPE) - - grandson_pid = int(sproc.stdout.read()) - grandson_proc = psutil.Process(grandson_pid) - try: - self.assertRaises(psutil.TimeoutExpired, grandson_proc.wait, 0.01) - grandson_proc.kill() - ret = grandson_proc.wait() - self.assertEqual(ret, None) - finally: - if grandson_proc.is_running(): - grandson_proc.kill() - grandson_proc.wait() - - def test_wait_timeout_0(self): - sproc = get_test_subprocess() - p = psutil.Process(sproc.pid) - self.assertRaises(psutil.TimeoutExpired, p.wait, 0) - p.kill() - stop_at = time.time() + 2 - while 1: - try: - code = p.wait(0) - except psutil.TimeoutExpired: - if time.time() >= stop_at: - raise - else: - break - if os.name == 'posix': - self.assertEqual(code, signal.SIGKILL) - else: - self.assertEqual(code, 0) - self.assertFalse(p.is_running()) - - def test_cpu_percent(self): - p = psutil.Process(os.getpid()) - p.get_cpu_percent(interval=0.001) - p.get_cpu_percent(interval=0.001) - for x in range(100): - percent = p.get_cpu_percent(interval=None) - self.assertTrue(isinstance(percent, float)) - self.assertTrue(percent >= 0.0) - self.assertTrue(percent <= 100.0) - - def test_cpu_times(self): - times = psutil.Process(os.getpid()).get_cpu_times() - self.assertTrue((times.user > 0.0) or (times.system > 0.0)) - # make sure returned values can be pretty printed with strftime - time.strftime("%H:%M:%S", time.localtime(times.user)) - time.strftime("%H:%M:%S", time.localtime(times.system)) - - # Test Process.cpu_times() against os.times() - # os.times() is broken on Python 2.6 - # http://bugs.python.org/issue1040026 - # XXX fails on OSX: not sure if it's for os.times(). We should - # try this with Python 2.7 and re-enable the test. - - @skipUnless(sys.version_info > (2, 6, 1) and not OSX) - def test_cpu_times2(self): - user_time, kernel_time = psutil.Process(os.getpid()).get_cpu_times() - utime, ktime = os.times()[:2] - - # Use os.times()[:2] as base values to compare our results - # using a tolerance of +/- 0.1 seconds. - # It will fail if the difference between the values is > 0.1s. - if (max([user_time, utime]) - min([user_time, utime])) > 0.1: - self.fail("expected: %s, found: %s" %(utime, user_time)) - - if (max([kernel_time, ktime]) - min([kernel_time, ktime])) > 0.1: - self.fail("expected: %s, found: %s" %(ktime, kernel_time)) - - def test_create_time(self): - sproc = get_test_subprocess() - now = time.time() - wait_for_pid(sproc.pid) - p = psutil.Process(sproc.pid) - create_time = p.create_time - - # Use time.time() as base value to compare our result using a - # tolerance of +/- 1 second. - # It will fail if the difference between the values is > 2s. - difference = abs(create_time - now) - if difference > 2: - self.fail("expected: %s, found: %s, difference: %s" - % (now, create_time, difference)) - - # make sure returned value can be pretty printed with strftime - time.strftime("%Y %m %d %H:%M:%S", time.localtime(p.create_time)) - - @skipIf(WINDOWS) - def test_terminal(self): - tty = sh('tty') - p = psutil.Process(os.getpid()) - self.assertEqual(p.terminal, tty) - - @skipIf(OSX, warn=False) - def test_get_io_counters(self): - p = psutil.Process(os.getpid()) - # test reads - io1 = p.get_io_counters() - f = open(PYTHON, 'rb') - f.read() - f.close() - io2 = p.get_io_counters() - if not BSD: - self.assertTrue(io2.read_count > io1.read_count) - self.assertTrue(io2.write_count == io1.write_count) - self.assertTrue(io2.read_bytes >= io1.read_bytes) - self.assertTrue(io2.write_bytes >= io1.write_bytes) - # test writes - io1 = p.get_io_counters() - f = tempfile.TemporaryFile() - if sys.version_info >= (3,): - f.write(bytes("x" * 1000000, 'ascii')) - else: - f.write("x" * 1000000) - f.close() - io2 = p.get_io_counters() - if not BSD: - self.assertTrue(io2.write_count > io1.write_count) - self.assertTrue(io2.write_bytes > io1.write_bytes) - self.assertTrue(io2.read_count >= io1.read_count) - self.assertTrue(io2.read_bytes >= io1.read_bytes) - - @skipUnless(LINUX) - def test_get_set_ionice(self): - from psutil import (IOPRIO_CLASS_NONE, IOPRIO_CLASS_RT, IOPRIO_CLASS_BE, - IOPRIO_CLASS_IDLE) - self.assertEqual(IOPRIO_CLASS_NONE, 0) - self.assertEqual(IOPRIO_CLASS_RT, 1) - self.assertEqual(IOPRIO_CLASS_BE, 2) - self.assertEqual(IOPRIO_CLASS_IDLE, 3) - p = psutil.Process(os.getpid()) - try: - p.set_ionice(2) - ioclass, value = p.get_ionice() - self.assertEqual(ioclass, 2) - self.assertEqual(value, 4) - # - p.set_ionice(3) - ioclass, value = p.get_ionice() - self.assertEqual(ioclass, 3) - self.assertEqual(value, 0) - # - p.set_ionice(2, 0) - ioclass, value = p.get_ionice() - self.assertEqual(ioclass, 2) - self.assertEqual(value, 0) - p.set_ionice(2, 7) - ioclass, value = p.get_ionice() - self.assertEqual(ioclass, 2) - self.assertEqual(value, 7) - self.assertRaises(ValueError, p.set_ionice, 2, 10) - finally: - p.set_ionice(IOPRIO_CLASS_NONE) - - def test_get_num_threads(self): - # on certain platforms such as Linux we might test for exact - # thread number, since we always have with 1 thread per process, - # but this does not apply across all platforms (OSX, Windows) - p = psutil.Process(os.getpid()) - step1 = p.get_num_threads() - - thread = ThreadTask() - thread.start() - try: - step2 = p.get_num_threads() - self.assertEqual(step2, step1 + 1) - thread.stop() - finally: - if thread._running: - thread.stop() - - def test_get_threads(self): - p = psutil.Process(os.getpid()) - step1 = p.get_threads() - - thread = ThreadTask() - thread.start() - - try: - step2 = p.get_threads() - self.assertEqual(len(step2), len(step1) + 1) - # on Linux, first thread id is supposed to be this process - if LINUX: - self.assertEqual(step2[0].id, os.getpid()) - athread = step2[0] - # test named tuple - self.assertEqual(athread.id, athread[0]) - self.assertEqual(athread.user_time, athread[1]) - self.assertEqual(athread.system_time, athread[2]) - # test num threads - thread.stop() - finally: - if thread._running: - thread.stop() - - def test_get_memory_info(self): - p = psutil.Process(os.getpid()) - - # step 1 - get a base value to compare our results - rss1, vms1 = p.get_memory_info() - percent1 = p.get_memory_percent() - self.assertTrue(rss1 > 0) - self.assertTrue(vms1 > 0) - - # step 2 - allocate some memory - memarr = [None] * 1500000 - - rss2, vms2 = p.get_memory_info() - percent2 = p.get_memory_percent() - # make sure that the memory usage bumped up - self.assertTrue(rss2 > rss1) - self.assertTrue(vms2 >= vms1) # vms might be equal - self.assertTrue(percent2 > percent1) - del memarr - - def test_get_memory_percent(self): - p = psutil.Process(os.getpid()) - self.assertTrue(p.get_memory_percent() > 0.0) - - def test_pid(self): - sproc = get_test_subprocess() - self.assertEqual(psutil.Process(sproc.pid).pid, sproc.pid) - - def test_is_running(self): - sproc = get_test_subprocess() - wait_for_pid(sproc.pid) - p = psutil.Process(sproc.pid) - self.assertTrue(p.is_running()) - p.kill() - p.wait() - self.assertFalse(p.is_running()) - - def test_exe(self): - sproc = get_test_subprocess() - wait_for_pid(sproc.pid) - try: - self.assertEqual(psutil.Process(sproc.pid).exe, PYTHON) - except AssertionError: - # certain platforms such as BSD are more accurate returning: - # "/usr/local/bin/python2.7" - # ...instead of: - # "/usr/local/bin/python" - # We do not want to consider this difference in accuracy - # an error. - name = psutil.Process(sproc.pid).exe - adjusted_name = PYTHON[:len(name)] - self.assertEqual(name, adjusted_name) - for p in psutil.process_iter(): - try: - exe = p.exe - except psutil.Error: - continue - if not exe: - continue - if not os.path.exists(exe): - self.fail("%s does not exist (pid=%s, name=%s, cmdline=%s)" \ - % (repr(exe), p.pid, p.name, p.cmdline)) - if hasattr(os, 'access') and hasattr(os, "X_OK"): - if not os.access(p.exe, os.X_OK): - self.fail("%s is not executable (pid=%s, name=%s, cmdline=%s)" \ - % (repr(p.exe), p.pid, p.name, p.cmdline)) - - def test_cmdline(self): - sproc = get_test_subprocess([PYTHON, "-E"]) - wait_for_pid(sproc.pid) - self.assertEqual(psutil.Process(sproc.pid).cmdline, [PYTHON, "-E"]) - - def test_name(self): - sproc = get_test_subprocess(PYTHON) - wait_for_pid(sproc.pid) - self.assertEqual(psutil.Process(sproc.pid).name.lower(), - os.path.basename(sys.executable).lower()) - - if os.name == 'posix': - - def test_uids(self): - p = psutil.Process(os.getpid()) - real, effective, saved = p.uids - # os.getuid() refers to "real" uid - self.assertEqual(real, os.getuid()) - # os.geteuid() refers to "effective" uid - self.assertEqual(effective, os.geteuid()) - # no such thing as os.getsuid() ("saved" uid), but starting - # from python 2.7 we have os.getresuid()[2] - if hasattr(os, "getresuid"): - self.assertEqual(saved, os.getresuid()[2]) - - def test_gids(self): - p = psutil.Process(os.getpid()) - real, effective, saved = p.gids - # os.getuid() refers to "real" uid - self.assertEqual(real, os.getgid()) - # os.geteuid() refers to "effective" uid - self.assertEqual(effective, os.getegid()) - # no such thing as os.getsuid() ("saved" uid), but starting - # from python 2.7 we have os.getresgid()[2] - if hasattr(os, "getresuid"): - self.assertEqual(saved, os.getresgid()[2]) - - def test_nice(self): - p = psutil.Process(os.getpid()) - self.assertRaises(TypeError, setattr, p, "nice", "str") - try: - try: - first_nice = p.nice - p.nice = 1 - self.assertEqual(p.nice, 1) - # going back to previous nice value raises AccessDenied on OSX - if not OSX: - p.nice = 0 - self.assertEqual(p.nice, 0) - except psutil.AccessDenied: - pass - finally: - # going back to previous nice value raises AccessDenied on OSX - if not OSX: - p.nice = first_nice - - if os.name == 'nt': - - def test_nice(self): - p = psutil.Process(os.getpid()) - self.assertRaises(TypeError, setattr, p, "nice", "str") - try: - self.assertEqual(p.nice, psutil.NORMAL_PRIORITY_CLASS) - p.nice = psutil.HIGH_PRIORITY_CLASS - self.assertEqual(p.nice, psutil.HIGH_PRIORITY_CLASS) - p.nice = psutil.NORMAL_PRIORITY_CLASS - self.assertEqual(p.nice, psutil.NORMAL_PRIORITY_CLASS) - finally: - p.nice = psutil.NORMAL_PRIORITY_CLASS - - def test_status(self): - p = psutil.Process(os.getpid()) - self.assertEqual(p.status, psutil.STATUS_RUNNING) - self.assertEqual(str(p.status), "running") - for p in psutil.process_iter(): - if str(p.status) == '?': - self.fail("invalid status for pid %d" % p.pid) - - def test_username(self): - sproc = get_test_subprocess() - p = psutil.Process(sproc.pid) - if POSIX: - import pwd - self.assertEqual(p.username, pwd.getpwuid(os.getuid()).pw_name) - elif WINDOWS: - expected_username = os.environ['USERNAME'] - expected_domain = os.environ['USERDOMAIN'] - domain, username = p.username.split('\\') - self.assertEqual(domain, expected_domain) - self.assertEqual(username, expected_username) - else: - p.username - - @skipUnless(WINDOWS or LINUX) - def test_getcwd(self): - sproc = get_test_subprocess() - wait_for_pid(sproc.pid) - p = psutil.Process(sproc.pid) - self.assertEqual(p.getcwd(), os.getcwd()) - - @skipUnless(WINDOWS or LINUX) - def test_getcwd_2(self): - cmd = [PYTHON, "-c", "import os, time; os.chdir('..'); time.sleep(10)"] - sproc = get_test_subprocess(cmd) - wait_for_pid(sproc.pid) - p = psutil.Process(sproc.pid) - time.sleep(0.1) - expected_dir = os.path.dirname(os.getcwd()) - self.assertEqual(p.getcwd(), expected_dir) - - def test_get_open_files(self): - # current process - p = psutil.Process(os.getpid()) - files = p.get_open_files() - self.assertFalse(TESTFN in files) - f = open(TESTFN, 'r') - time.sleep(.1) - filenames = [x.path for x in p.get_open_files()] - self.assertTrue(TESTFN in filenames) - f.close() - for file in filenames: - self.assertTrue(os.path.isfile(file)) - - # another process - cmdline = "import time; f = open(r'%s', 'r'); time.sleep(100);" % TESTFN - sproc = get_test_subprocess([PYTHON, "-c", cmdline]) - wait_for_pid(sproc.pid) - time.sleep(0.1) - p = psutil.Process(sproc.pid) - for x in range(100): - filenames = [x.path for x in p.get_open_files()] - if TESTFN in filenames: - break - time.sleep(.01) - else: - self.assertTrue(TESTFN in filenames) - for file in filenames: - self.assertTrue(os.path.isfile(file)) - # all processes - for proc in psutil.process_iter(): - try: - files = proc.get_open_files() - except psutil.Error: - pass - else: - for file in filenames: - self.assertTrue(os.path.isfile(file)) - - def test_get_open_files2(self): - # test fd and path fields - fileobj = open(TESTFN, 'r') - p = psutil.Process(os.getpid()) - for path, fd in p.get_open_files(): - if path == fileobj.name or fd == fileobj.fileno(): - break - else: - self.fail("no file found; files=%s" % repr(p.get_open_files())) - self.assertEqual(path, fileobj.name) - if WINDOWS: - self.assertEqual(fd, -1) - else: - self.assertEqual(fd, fileobj.fileno()) - # test positions - ntuple = p.get_open_files()[0] - self.assertEqual(ntuple[0], ntuple.path) - self.assertEqual(ntuple[1], ntuple.fd) - # test file is gone - fileobj.close() - self.assertTrue(fileobj.name not in p.get_open_files()) - - @skipUnless(SUPPORT_CONNECTIONS, warn=1) - def test_get_connections(self): - arg = "import socket, time;" \ - "s = socket.socket();" \ - "s.bind(('127.0.0.1', 0));" \ - "s.listen(1);" \ - "conn, addr = s.accept();" \ - "time.sleep(100);" - sproc = get_test_subprocess([PYTHON, "-c", arg]) - p = psutil.Process(sproc.pid) - for x in range(100): - cons = p.get_connections() - if cons: - break - time.sleep(.01) - self.assertEqual(len(cons), 1) - con = cons[0] - self.assertEqual(con.family, socket.AF_INET) - self.assertEqual(con.type, socket.SOCK_STREAM) - self.assertEqual(con.status, "LISTEN") - ip, port = con.local_address - self.assertEqual(ip, '127.0.0.1') - self.assertEqual(con.remote_address, ()) - if WINDOWS: - self.assertEqual(con.fd, -1) - else: - self.assertTrue(con.fd > 0) - # test positions - self.assertEqual(con[0], con.fd) - self.assertEqual(con[1], con.family) - self.assertEqual(con[2], con.type) - self.assertEqual(con[3], con.local_address) - self.assertEqual(con[4], con.remote_address) - self.assertEqual(con[5], con.status) - - @skipUnless(supports_ipv6()) - def test_get_connections_ipv6(self): - s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) - s.bind(('::1', 0)) - s.listen(1) - cons = psutil.Process(os.getpid()).get_connections() - s.close() - self.assertEqual(len(cons), 1) - self.assertEqual(cons[0].local_address[0], '::1') - - @skipUnless(hasattr(socket, "fromfd") and not WINDOWS) - def test_connection_fromfd(self): - sock = socket.socket() - sock.bind(('localhost', 0)) - sock.listen(1) - p = psutil.Process(os.getpid()) - for conn in p.get_connections(): - if conn.fd == sock.fileno(): - break - else: - sock.close() - self.fail("couldn't find socket fd") - dupsock = socket.fromfd(conn.fd, conn.family, conn.type) - try: - self.assertEqual(dupsock.getsockname(), conn.local_address) - self.assertNotEqual(sock.fileno(), dupsock.fileno()) - finally: - sock.close() - dupsock.close() - - @skipUnless(SUPPORT_CONNECTIONS, warn=1) - def test_get_connections_all(self): - - def check_address(addr, family): - if not addr: - return - ip, port = addr - self.assertTrue(isinstance(port, int)) - if family == socket.AF_INET: - ip = list(map(int, ip.split('.'))) - self.assertTrue(len(ip) == 4) - for num in ip: - self.assertTrue(0 <= num <= 255) - self.assertTrue(0 <= port <= 65535) - - # all values are supposed to match Linux's tcp_states.h states - # table across all platforms. - valid_states = ["ESTABLISHED", "SYN_SENT", "SYN_RECV", "FIN_WAIT1", - "FIN_WAIT2", "TIME_WAIT", "CLOSE", "CLOSE_WAIT", - "LAST_ACK", "LISTEN", "CLOSING", ""] - - tcp_template = "import socket;" \ - "s = socket.socket($family, socket.SOCK_STREAM);" \ - "s.bind(('$addr', 0));" \ - "s.listen(1);" \ - "conn, addr = s.accept();" - - udp_template = "import socket, time;" \ - "s = socket.socket($family, socket.SOCK_DGRAM);" \ - "s.bind(('$addr', 0));" \ - "time.sleep(100);" - - from string import Template - tcp4_template = Template(tcp_template).substitute(family=socket.AF_INET, - addr="127.0.0.1") - udp4_template = Template(udp_template).substitute(family=socket.AF_INET, - addr="127.0.0.1") - tcp6_template = Template(tcp_template).substitute(family=socket.AF_INET6, - addr="::1") - udp6_template = Template(udp_template).substitute(family=socket.AF_INET6, - addr="::1") - - # launch various subprocess instantiating a socket of various - # families and tupes to enrich psutil results - tcp4_proc = get_test_subprocess([PYTHON, "-c", tcp4_template]) - udp4_proc = get_test_subprocess([PYTHON, "-c", udp4_template]) - if supports_ipv6(): - tcp6_proc = get_test_subprocess([PYTHON, "-c", tcp6_template]) - udp6_proc = get_test_subprocess([PYTHON, "-c", udp6_template]) - else: - tcp6_proc = None - udp6_proc = None - - # --- check connections of all processes - - time.sleep(0.1) - for p in psutil.process_iter(): - try: - cons = p.get_connections() - except (psutil.NoSuchProcess, psutil.AccessDenied): - pass - else: - for conn in cons: - self.assertTrue(conn.type in (socket.SOCK_STREAM, - socket.SOCK_DGRAM)) - self.assertTrue(conn.family in (socket.AF_INET, - socket.AF_INET6)) - check_address(conn.local_address, conn.family) - check_address(conn.remote_address, conn.family) - if conn.status not in valid_states: - self.fail("%s is not a valid status" %conn.status) - # actually try to bind the local socket; ignore IPv6 - # sockets as their address might be represented as - # an IPv4-mapped-address (e.g. "::127.0.0.1") - # and that's rejected by bind() - if conn.family == socket.AF_INET: - s = socket.socket(conn.family, conn.type) - s.bind((conn.local_address[0], 0)) - s.close() - - if not WINDOWS and hasattr(socket, 'fromfd'): - dupsock = None - try: - try: - dupsock = socket.fromfd(conn.fd, conn.family, - conn.type) - except (socket.error, OSError): - err = sys.exc_info()[1] - if err.args[0] == errno.EBADF: - continue - else: - raise - # python >= 2.5 - if hasattr(dupsock, "family"): - self.assertEqual(dupsock.family, conn.family) - self.assertEqual(dupsock.type, conn.type) - finally: - if dupsock is not None: - dupsock.close() - - - # --- check matches against subprocesses - - for p in psutil.Process(os.getpid()).get_children(): - for conn in p.get_connections(): - # TCP v4 - if p.pid == tcp4_proc.pid: - self.assertEqual(conn.family, socket.AF_INET) - self.assertEqual(conn.type, socket.SOCK_STREAM) - self.assertEqual(conn.local_address[0], "127.0.0.1") - self.assertEqual(conn.remote_address, ()) - self.assertEqual(conn.status, "LISTEN") - # UDP v4 - elif p.pid == udp4_proc.pid: - self.assertEqual(conn.family, socket.AF_INET) - self.assertEqual(conn.type, socket.SOCK_DGRAM) - self.assertEqual(conn.local_address[0], "127.0.0.1") - self.assertEqual(conn.remote_address, ()) - self.assertEqual(conn.status, "") - # TCP v6 - elif p.pid == getattr(tcp6_proc, "pid", None): - self.assertEqual(conn.family, socket.AF_INET6) - self.assertEqual(conn.type, socket.SOCK_STREAM) - self.assertTrue(conn.local_address[0] in ("::", "::1")) - self.assertEqual(conn.remote_address, ()) - self.assertEqual(conn.status, "LISTEN") - # UDP v6 - elif p.pid == getattr(udp6_proc, "pid", None): - self.assertEqual(conn.family, socket.AF_INET6) - self.assertEqual(conn.type, socket.SOCK_DGRAM) - self.assertTrue(conn.local_address[0] in ("::", "::1")) - self.assertEqual(conn.remote_address, ()) - self.assertEqual(conn.status, "") - - def test_parent_ppid(self): - this_parent = os.getpid() - sproc = get_test_subprocess() - p = psutil.Process(sproc.pid) - self.assertEqual(p.ppid, this_parent) - self.assertEqual(p.parent.pid, this_parent) - # no other process is supposed to have us as parent - for p in psutil.process_iter(): - if p.pid == sproc.pid: - continue - self.assertTrue(p.ppid != this_parent) - - def test_get_children(self): - p = psutil.Process(os.getpid()) - self.assertEqual(p.get_children(), []) - sproc = get_test_subprocess() - children = p.get_children() - self.assertEqual(len(children), 1) - self.assertEqual(children[0].pid, sproc.pid) - self.assertEqual(children[0].ppid, os.getpid()) - - def test_suspend_resume(self): - sproc = get_test_subprocess() - p = psutil.Process(sproc.pid) - p.suspend() - time.sleep(0.1) - self.assertEqual(p.status, psutil.STATUS_STOPPED) - self.assertEqual(str(p.status), "stopped") - p.resume() - self.assertTrue(p.status != psutil.STATUS_STOPPED) - - def test_invalid_pid(self): - self.assertRaises(ValueError, psutil.Process, "1") - self.assertRaises(ValueError, psutil.Process, None) - # Refers to Issue #12 - self.assertRaises(psutil.NoSuchProcess, psutil.Process, -1) - - def test_zombie_process(self): - # Test that NoSuchProcess exception gets raised in case the - # process dies after we create the Process object. - # Example: - # >>> proc = Process(1234) - # >>> time.sleep(5) # time-consuming task, process dies in meantime - # >>> proc.name - # Refers to Issue #15 - sproc = get_test_subprocess() - p = psutil.Process(sproc.pid) - p.kill() - p.wait() - - for name in dir(p): - if name.startswith('_')\ - or name in ('pid', 'send_signal', 'is_running', 'set_ionice', - 'wait'): - continue - try: - meth = getattr(p, name) - if callable(meth): - meth() - except psutil.NoSuchProcess: - pass - else: - self.fail("NoSuchProcess exception not raised for %r" % name) - - # other methods - try: - if os.name == 'posix': - p.nice = 1 - else: - p.nice = psutil.NORMAL_PRIORITY_CLASS - except psutil.NoSuchProcess: - pass - else: - self.fail("exception not raised") - if hasattr(p, 'set_ionice'): - self.assertRaises(psutil.NoSuchProcess, p.set_ionice, 2) - self.assertRaises(psutil.NoSuchProcess, p.send_signal, signal.SIGTERM) - self.assertFalse(p.is_running()) - - def test__str__(self): - sproc = get_test_subprocess() - p = psutil.Process(sproc.pid) - self.assertTrue(str(sproc.pid) in str(p)) - # python shows up as 'Python' in cmdline on OS X so test fails on OS X - if not OSX: - self.assertTrue(os.path.basename(PYTHON) in str(p)) - sproc = get_test_subprocess() - p = psutil.Process(sproc.pid) - p.kill() - p.wait() - self.assertTrue(str(sproc.pid) in str(p)) - self.assertTrue("terminated" in str(p)) - - def test_fetch_all(self): - valid_procs = 0 - excluded_names = ['send_signal', 'suspend', 'resume', 'terminate', - 'kill', 'wait'] - excluded_names += ['get_cpu_percent', 'get_children'] - # XXX - skip slow lsof implementation; - if BSD: - excluded_names += ['get_open_files', 'get_connections'] - if OSX: - excluded_names += ['get_connections'] - attrs = [] - for name in dir(psutil.Process): - if name.startswith("_"): - continue - if name.startswith("set_"): - continue - if name in excluded_names: - continue - attrs.append(name) - - for p in psutil.process_iter(): - for name in attrs: - try: - try: - attr = getattr(p, name, None) - if attr is not None and callable(attr): - ret = attr() - else: - ret = attr - valid_procs += 1 - except (psutil.NoSuchProcess, psutil.AccessDenied): - err = sys.exc_info()[1] - self.assertEqual(err.pid, p.pid) - if err.name: - self.assertEqual(err.name, p.name) - self.assertTrue(str(err)) - self.assertTrue(err.msg) - else: - if name == 'parent' or ret in (0, 0.0, [], None): - continue - self.assertTrue(ret) - if name == "exe": - self.assertTrue(os.path.isfile(ret)) - elif name == "getcwd": - # XXX - temporary fix; on my Linux box - # chrome process cws is errnously reported - # as /proc/4144/fdinfo whichd doesn't exist - if 'chrome' in p.name: - continue - self.assertTrue(os.path.isdir(ret)) - except Exception: - err = sys.exc_info()[1] - trace = traceback.format_exc() - self.fail('%s\nmethod=%s, pid=%s, retvalue=%s' - %(trace, name, p.pid, repr(ret))) - - # we should always have a non-empty list, not including PID 0 etc. - # special cases. - self.assertTrue(valid_procs > 0) - - @skipIf(LINUX) - def test_pid_0(self): - # Process(0) is supposed to work on all platforms except Linux - p = psutil.Process(0) - self.assertTrue(p.name) - - if os.name == 'posix': - self.assertEqual(p.uids.real, 0) - self.assertEqual(p.gids.real, 0) - - self.assertTrue(p.ppid in (0, 1)) - #self.assertEqual(p.exe, "") - self.assertEqual(p.cmdline, []) - try: - p.get_num_threads() - except psutil.AccessDenied: - pass - - if OSX : #and os.geteuid() != 0: - self.assertRaises(psutil.AccessDenied, p.get_memory_info) - self.assertRaises(psutil.AccessDenied, p.get_cpu_times) - else: - p.get_memory_info() - - # username property - if POSIX: - self.assertEqual(p.username, 'root') - elif WINDOWS: - self.assertEqual(p.username, 'NT AUTHORITY\\SYSTEM') - else: - p.username - - self.assertTrue(0 in psutil.get_pid_list()) - self.assertTrue(psutil.pid_exists(0)) - - def test_Popen(self): - # Popen class test - cmd = [PYTHON, "-c", "import time; time.sleep(3600);"] - proc = psutil.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - try: - proc.name - proc.stdin - self.assertTrue(hasattr(proc, 'name')) - self.assertTrue(hasattr(proc, 'stdin')) - self.assertRaises(AttributeError, getattr, proc, 'foo') - finally: - proc.kill() - proc.wait() - - -if hasattr(os, 'getuid'): - class LimitedUserTestCase(TestCase): - """Repeat the previous tests by using a limited user. - Executed only on UNIX and only if the user who run the test script - is root. - """ - # the uid/gid the test suite runs under - PROCESS_UID = os.getuid() - PROCESS_GID = os.getgid() - - def __init__(self, *args, **kwargs): - TestCase.__init__(self, *args, **kwargs) - # re-define all existent test methods in order to - # ignore AccessDenied exceptions - for attr in [x for x in dir(self) if x.startswith('test')]: - meth = getattr(self, attr) - def test_(self): - try: - meth() - except psutil.AccessDenied: - pass - setattr(self, attr, types.MethodType(test_, self)) - - def setUp(self): - os.setegid(1000) - os.seteuid(1000) - TestCase.setUp(self) - - def tearDown(self): - os.setegid(self.PROCESS_UID) - os.seteuid(self.PROCESS_GID) - TestCase.tearDown(self) - - def test_nice(self): - try: - psutil.Process(os.getpid()).nice = -1 - except psutil.AccessDenied: - pass - else: - self.fail("exception not raised") - - -def test_main(): - tests = [] - test_suite = unittest.TestSuite() - tests.append(TestCase) - - if POSIX: - from _posix import PosixSpecificTestCase - tests.append(PosixSpecificTestCase) - - # import the specific platform test suite - if LINUX: - from _linux import LinuxSpecificTestCase as stc - elif WINDOWS: - from _windows import WindowsSpecificTestCase as stc - elif OSX: - from _osx import OSXSpecificTestCase as stc - elif BSD: - from _bsd import BSDSpecificTestCase as stc - tests.append(stc) - - if hasattr(os, 'getuid'): - if os.getuid() == 0: - tests.append(LimitedUserTestCase) - else: - atexit.register(warnings.warn, "Couldn't run limited user tests (" - "super-user privileges are required)", RuntimeWarning) - - for test_class in tests: - test_suite.addTest(unittest.makeSuite(test_class)) - - f = open(TESTFN, 'w') - f.close() - atexit.register(lambda: os.remove(TESTFN)) - - unittest.TextTestRunner(verbosity=2).run(test_suite) - DEVNULL.close() - -if __name__ == '__main__': - test_main() |