#!/usr/bin/env python # Copyright (c) 2012 The Chromium Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. import logging import os import unittest import sys FILE_NAME = os.path.abspath(__file__) ROOT_DIR = os.path.dirname(FILE_NAME) import trace_inputs def join_norm(*args): """Joins and normalizes path in a single step.""" return unicode(os.path.normpath(os.path.join(*args))) class TraceInputs(unittest.TestCase): def test_process_quoted_arguments(self): test_cases = ( ('"foo"', ['foo']), ('"foo", "bar"', ['foo', 'bar']), ('"foo"..., "bar"', ['foo', 'bar']), ('"foo", "bar"...', ['foo', 'bar']), ( '"/browser_tests", "--type=use,comma"', ['/browser_tests', '--type=use,comma'] ), ( '"/browser_tests", "--ignored=\\" --type=renderer \\""', ['/browser_tests', '--ignored=" --type=renderer "'] ), ) for actual, expected in test_cases: self.assertEquals( expected, trace_inputs.strace_process_quoted_arguments(actual)) def test_process_escaped_arguments(self): test_cases = ( ('foo\\0', ['foo']), ('foo\\001bar\\0', ['foo', 'bar']), ('\\"foo\\"\\0', ['"foo"']), ) for actual, expected in test_cases: self.assertEquals( expected, trace_inputs.Dtrace.Context.process_escaped_arguments(actual)) def test_variable_abs(self): value = trace_inputs.Results.File(None, '/foo/bar', False, False) actual = value.replace_variables({'$FOO': '/foo'}) self.assertEquals('$FOO/bar', actual.path) self.assertEquals('$FOO/bar', actual.full_path) self.assertEquals(True, actual.tainted) def test_variable_rel(self): value = trace_inputs.Results.File('/usr', 'foo/bar', False, False) actual = value.replace_variables({'$FOO': 'foo'}) self.assertEquals('$FOO/bar', actual.path) self.assertEquals(os.path.join('/usr', '$FOO/bar'), actual.full_path) self.assertEquals(True, actual.tainted) def test_native_case_end_with_os_path_sep(self): # Make sure the trailing os.path.sep is kept. path = trace_inputs.get_native_path_case(ROOT_DIR) + os.path.sep self.assertEquals(trace_inputs.get_native_path_case(path), path) def test_native_case_non_existing(self): # Make sure it doesn't throw on non-existing files. non_existing = 'trace_input_test_this_file_should_not_exist' path = os.path.expanduser('~/' + non_existing) self.assertFalse(os.path.exists(path)) path = trace_inputs.get_native_path_case(ROOT_DIR) + os.path.sep self.assertEquals(trace_inputs.get_native_path_case(path), path) if sys.platform in ('darwin', 'win32'): def test_native_case_not_sensitive(self): # The home directory is almost guaranteed to have mixed upper/lower case # letters on both Windows and OSX. # This test also ensures that the output is independent on the input # string case. path = os.path.expanduser('~') self.assertTrue(os.path.isdir(path)) # This test assumes the variable is in the native path case on disk, this # should be the case. Verify this assumption: self.assertEquals(path, trace_inputs.get_native_path_case(path)) self.assertEquals( trace_inputs.get_native_path_case(path.lower()), trace_inputs.get_native_path_case(path.upper())) def test_native_case_not_sensitive_non_existent(self): # This test also ensures that the output is independent on the input # string case. non_existing = os.path.join( 'trace_input_test_this_dir_should_not_exist', 'really not', '') path = os.path.expanduser(os.path.join('~', non_existing)) self.assertFalse(os.path.exists(path)) lower = trace_inputs.get_native_path_case(path.lower()) upper = trace_inputs.get_native_path_case(path.upper()) # Make sure non-existing element is not modified: self.assertTrue(lower.endswith(non_existing.lower())) self.assertTrue(upper.endswith(non_existing.upper())) self.assertEquals(lower[:-len(non_existing)], upper[:-len(non_existing)]) if sys.platform != 'win32': def test_symlink(self): # This test will fail if the checkout is in a symlink. actual = trace_inputs.split_at_symlink(None, ROOT_DIR) expected = (ROOT_DIR, None, None) self.assertEquals(expected, actual) actual = trace_inputs.split_at_symlink( None, os.path.join(ROOT_DIR, 'data', 'trace_inputs')) expected = ( os.path.join(ROOT_DIR, 'data', 'trace_inputs'), None, None) self.assertEquals(expected, actual) actual = trace_inputs.split_at_symlink( None, os.path.join(ROOT_DIR, 'data', 'trace_inputs', 'files2')) expected = ( os.path.join(ROOT_DIR, 'data', 'trace_inputs'), 'files2', '') self.assertEquals(expected, actual) actual = trace_inputs.split_at_symlink( ROOT_DIR, os.path.join('data', 'trace_inputs', 'files2')) expected = ( os.path.join('data', 'trace_inputs'), 'files2', '') self.assertEquals(expected, actual) actual = trace_inputs.split_at_symlink( ROOT_DIR, os.path.join('data', 'trace_inputs', 'files2', 'bar')) expected = ( os.path.join('data', 'trace_inputs'), 'files2', '/bar') self.assertEquals(expected, actual) def test_native_case_symlink_right_case(self): actual = trace_inputs.get_native_path_case( os.path.join(ROOT_DIR, 'data', 'trace_inputs')) self.assertEquals('trace_inputs', os.path.basename(actual)) # Make sure the symlink is not resolved. actual = trace_inputs.get_native_path_case( os.path.join(ROOT_DIR, 'data', 'trace_inputs', 'files2')) self.assertEquals('files2', os.path.basename(actual)) if sys.platform == 'darwin': def test_native_case_symlink_wrong_case(self): actual = trace_inputs.get_native_path_case( os.path.join(ROOT_DIR, 'data', 'trace_inputs')) self.assertEquals('trace_inputs', os.path.basename(actual)) # Make sure the symlink is not resolved. actual = trace_inputs.get_native_path_case( os.path.join(ROOT_DIR, 'data', 'trace_inputs', 'Files2')) self.assertEquals('files2', os.path.basename(actual)) if sys.platform != 'win32': class StraceInputs(unittest.TestCase): # Represents the root process pid (an arbitrary number). _ROOT_PID = 27 _CHILD_PID = 14 _GRAND_CHILD_PID = 70 @staticmethod def _load_context(lines, initial_cwd): context = trace_inputs.Strace.Context(lambda _: False, initial_cwd) for line in lines: context.on_line(*line) return context.to_results().flatten() def _test_lines(self, lines, initial_cwd, files, command=None): filepath = join_norm(initial_cwd, '../out/unittests') command = command or ['../out/unittests'] expected = { 'root': { 'children': [], 'command': command, 'executable': filepath, 'files': files, 'initial_cwd': initial_cwd, 'pid': self._ROOT_PID, } } if not files: expected['root']['command'] = None expected['root']['executable'] = None self.assertEquals(expected, self._load_context(lines, initial_cwd)) def test_execve(self): lines = [ (self._ROOT_PID, 'execve("/home/foo_bar_user/out/unittests", ' '["/home/foo_bar_user/out/unittests", ' '"--gtest_filter=AtExitTest.Basic"], [/* 44 vars */]) = 0'), (self._ROOT_PID, 'open("out/unittests.log", O_WRONLY|O_CREAT|O_APPEND, 0666) = 8'), ] files = [ { 'path': u'/home/foo_bar_user/out/unittests', 'size': -1, }, { 'path': u'/home/foo_bar_user/src/out/unittests.log', 'size': -1, }, ] command = [ '/home/foo_bar_user/out/unittests', '--gtest_filter=AtExitTest.Basic', ] self._test_lines(lines, '/home/foo_bar_user/src', files, command) def test_empty(self): try: self._load_context([], None) self.fail() except trace_inputs.TracingFailure, e: expected = ( 'Found internal inconsitency in process lifetime detection ' 'while finding the root process', None, None, None, []) self.assertEquals(expected, e.args) def test_close(self): lines = [ (self._ROOT_PID, 'close(7) = 0'), ] self._test_lines(lines, '/home/foo_bar_user/src', []) def test_clone(self): # Grand-child with relative directory. lines = [ (self._ROOT_PID, 'clone(child_stack=0, flags=CLONE_CHILD_CLEARTID' '|CLONE_CHILD_SETTID|SIGCHLD, child_tidptr=0x7f5350f829d0) = %d' % self._CHILD_PID), (self._CHILD_PID, 'clone(child_stack=0, flags=CLONE_CHILD_CLEARTID' '|CLONE_CHILD_SETTID|SIGCHLD, child_tidptr=0x7f5350f829d0) = %d' % self._GRAND_CHILD_PID), (self._GRAND_CHILD_PID, 'open("%s", O_RDONLY) = 76' % os.path.basename(FILE_NAME)), ] size = os.stat(FILE_NAME).st_size expected = { 'root': { 'children': [ { 'children': [ { 'children': [], 'command': None, 'executable': None, 'files': [ { 'path': unicode(FILE_NAME), 'size': size, }, ], 'initial_cwd': ROOT_DIR, 'pid': self._GRAND_CHILD_PID, }, ], 'command': None, 'executable': None, 'files': [], 'initial_cwd': ROOT_DIR, 'pid': self._CHILD_PID, }, ], 'command': None, 'executable': None, 'files': [], 'initial_cwd': ROOT_DIR, 'pid': self._ROOT_PID, }, } self.assertEquals(expected, self._load_context(lines, ROOT_DIR)) def test_clone_chdir(self): # Grand-child with relative directory. lines = [ (self._ROOT_PID, 'execve("../out/unittests", ' '["../out/unittests"...], [/* 44 vars */]) = 0'), (self._ROOT_PID, 'clone(child_stack=0, flags=CLONE_CHILD_CLEARTID' '|CLONE_CHILD_SETTID|SIGCHLD, child_tidptr=0x7f5350f829d0) = %d' % self._CHILD_PID), (self._CHILD_PID, 'chdir("/home_foo_bar_user/path1") = 0'), (self._CHILD_PID, 'clone(child_stack=0, flags=CLONE_CHILD_CLEARTID' '|CLONE_CHILD_SETTID|SIGCHLD, child_tidptr=0x7f5350f829d0) = %d' % self._GRAND_CHILD_PID), (self._GRAND_CHILD_PID, 'execve("../out/unittests", ' '["../out/unittests"...], [/* 44 vars */]) = 0'), (self._ROOT_PID, 'chdir("/home_foo_bar_user/path2") = 0'), (self._GRAND_CHILD_PID, 'open("random.txt", O_RDONLY) = 76'), ] expected = { 'root': { 'children': [ { 'children': [ { 'children': [], 'command': ['../out/unittests'], 'executable': '/home_foo_bar_user/out/unittests', 'files': [ { 'path': u'/home_foo_bar_user/out/unittests', 'size': -1, }, { 'path': u'/home_foo_bar_user/path1/random.txt', 'size': -1, }, ], 'initial_cwd': u'/home_foo_bar_user/path1', 'pid': self._GRAND_CHILD_PID, }, ], # clone does not carry over the command and executable so it is # clear if an execve() call was done or not. 'command': None, 'executable': None, # This is important, since no execve call was done, it didn't # touch the executable file. 'files': [], 'initial_cwd': unicode(ROOT_DIR), 'pid': self._CHILD_PID, }, ], 'command': ['../out/unittests'], 'executable': join_norm(ROOT_DIR, '../out/unittests'), 'files': [ { 'path': join_norm(ROOT_DIR, '../out/unittests'), 'size': -1, }, ], 'initial_cwd': unicode(ROOT_DIR), 'pid': self._ROOT_PID, }, } self.assertEquals(expected, self._load_context(lines, ROOT_DIR)) def test_open(self): lines = [ (self._ROOT_PID, 'execve("../out/unittests", ' '["../out/unittests"...], [/* 44 vars */]) = 0'), (self._ROOT_PID, 'open("out/unittests.log", O_WRONLY|O_CREAT|O_APPEND, 0666) = 8'), ] files = [ { 'path': u'/home/foo_bar_user/out/unittests', 'size': -1, }, { 'path': u'/home/foo_bar_user/src/out/unittests.log', 'size': -1, }, ] self._test_lines(lines, '/home/foo_bar_user/src', files) def test_open_resumed(self): lines = [ (self._ROOT_PID, 'execve("../out/unittests", ' '["../out/unittests"...], [/* 44 vars */]) = 0'), (self._ROOT_PID, 'open("out/unittests.log", O_WRONLY|O_CREAT|O_APPEND ' ''), (self._ROOT_PID, '<... open resumed> ) = 3'), ] files = [ { 'path': u'/home/foo_bar_user/out/unittests', 'size': -1, }, { 'path': u'/home/foo_bar_user/src/out/unittests.log', 'size': -1, }, ] self._test_lines(lines, '/home/foo_bar_user/src', files) def test_sig_unexpected(self): lines = [ (self._ROOT_PID, 'exit_group(0) = ?'), ] self._test_lines(lines, '/home/foo_bar_user/src', []) def test_stray(self): lines = [ (self._ROOT_PID, 'execve("../out/unittests", ' '["../out/unittests"...], [/* 44 vars */]) = 0'), (self._ROOT_PID, ') = ? '), ] files = [ { 'path': u'/home/foo_bar_user/out/unittests', 'size': -1, }, ] self._test_lines(lines, '/home/foo_bar_user/src', files) if __name__ == '__main__': VERBOSE = '-v' in sys.argv logging.basicConfig(level=logging.DEBUG if VERBOSE else logging.ERROR) unittest.main()