diff options
author | mmeade <mmeade@chromium.org> | 2015-11-18 13:35:50 -0800 |
---|---|---|
committer | Commit bot <commit-bot@chromium.org> | 2015-11-18 21:37:36 +0000 |
commit | b257d2f6ef4103a883937cdf8740533a224bc9a3 (patch) | |
tree | 8ced8197d85782bae951b16f5cd0d2cc955c3736 /testing/legion | |
parent | bd325ae36bddc7d929a3562a44c5061d7015841b (diff) | |
download | chromium_src-b257d2f6ef4103a883937cdf8740533a224bc9a3.zip chromium_src-b257d2f6ef4103a883937cdf8740533a224bc9a3.tar.gz chromium_src-b257d2f6ef4103a883937cdf8740533a224bc9a3.tar.bz2 |
Enabling the servers to run on an available port rather than a hard coded one.
BUG=557878
Review URL: https://codereview.chromium.org/1460803002
Cr-Commit-Position: refs/heads/master@{#360418}
Diffstat (limited to 'testing/legion')
-rw-r--r-- | testing/legion/OWNERS | 3 | ||||
-rw-r--r-- | testing/legion/common_lib.py | 13 | ||||
-rw-r--r-- | testing/legion/legion_test_case.py | 3 | ||||
-rw-r--r-- | testing/legion/rpc_server.py | 6 | ||||
-rwxr-xr-x | testing/legion/run_task.py | 14 | ||||
-rw-r--r-- | testing/legion/task_controller.py | 21 | ||||
-rw-r--r-- | testing/legion/task_registration_server.py | 12 |
7 files changed, 49 insertions, 23 deletions
diff --git a/testing/legion/OWNERS b/testing/legion/OWNERS new file mode 100644 index 0000000..f44af62 --- /dev/null +++ b/testing/legion/OWNERS @@ -0,0 +1,3 @@ +mmeade@chromium.org +bgoldman@chromium.org +chaitali@chromium.org diff --git a/testing/legion/common_lib.py b/testing/legion/common_lib.py index 863f887..2d97ef8 100644 --- a/testing/legion/common_lib.py +++ b/testing/legion/common_lib.py @@ -11,9 +11,7 @@ import socket LOGGING_LEVELS = ['DEBUG', 'INFO', 'WARNING', 'WARN', 'ERROR'] MY_IP = socket.gethostbyname(socket.gethostname()) -SERVER_ADDRESS = '' -SERVER_PORT = 31710 -DEFAULT_TIMEOUT_SECS = 20 * 60 # 30 minutes +DEFAULT_TIMEOUT_SECS = 30 * 60 # 30 minutes THIS_DIR = os.path.dirname(os.path.abspath(__file__)) SWARMING_DIR = os.path.join(THIS_DIR, '..', '..', 'tools', 'swarming_client') @@ -41,3 +39,12 @@ def GetOutputDir(): parser.add_argument('--output-dir') args, _ = parser.parse_known_args() return args.output_dir + + +def GetUnusedPort(): + """Finds and returns an unused port.""" + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.bind(('localhost', 0)) + _, port = s.getsockname() + s.close() + return port diff --git a/testing/legion/legion_test_case.py b/testing/legion/legion_test_case.py index c70d154..dfd1e27 100644 --- a/testing/legion/legion_test_case.py +++ b/testing/legion/legion_test_case.py @@ -86,7 +86,8 @@ class TestCase(unittest.TestCase): @classmethod def CreateTask(cls, *args, **kwargs): """Convenience method to create a new task.""" - task = task_controller.TaskController(*args, **kwargs) + task = task_controller.TaskController( + reg_server_port=cls._registration_server.port, *args, **kwargs) cls._registration_server.RegisterTaskCallback( task.otp, task.OnConnect) return task diff --git a/testing/legion/rpc_server.py b/testing/legion/rpc_server.py index 4455eec..da9d88b 100644 --- a/testing/legion/rpc_server.py +++ b/testing/legion/rpc_server.py @@ -51,10 +51,10 @@ class RpcServer(SimpleJSONRPCServer.SimpleJSONRPCServer, SocketServer.ThreadingMixIn): """Restricts all endpoints to only specified IP addresses.""" - def __init__(self, authorized_address, + def __init__(self, authorized_address, port, idle_timeout_secs=common_lib.DEFAULT_TIMEOUT_SECS): SimpleJSONRPCServer.SimpleJSONRPCServer.__init__( - self, (common_lib.SERVER_ADDRESS, common_lib.SERVER_PORT), + self, ('', port), allow_none=True, logRequests=False, requestHandler=RequestHandler) self.authorized_address = authorized_address @@ -128,7 +128,7 @@ class RpcServer(SimpleJSONRPCServer.SimpleJSONRPCServer, self.shutdown() @staticmethod - def Connect(server, port=common_lib.SERVER_PORT): + def Connect(server, port): """Creates and returns a connection to an RPC server.""" addr = 'http://%s:%d' % (server, port) logging.debug('Connecting to RPC server at %s', addr) diff --git a/testing/legion/run_task.py b/testing/legion/run_task.py index 953d90d..0fa2044 100755 --- a/testing/legion/run_task.py +++ b/testing/legion/run_task.py @@ -26,18 +26,22 @@ def main(): help='One time token used to authenticate with the host') parser.add_argument('--controller', help='The ip address of the controller machine') + parser.add_argument('--controller-port', type=int, + help='The port the controllers registration server is on') parser.add_argument('--idle-timeout', type=int, default=common_lib.DEFAULT_TIMEOUT_SECS, help='The idle timeout for the rpc server in seconds') args, _ = parser.parse_known_args() + my_port = common_lib.GetUnusedPort() logging.info( - 'Registering with registration server at %s using OTP "%s"', - args.controller, args.otp) - rpc_server.RpcServer.Connect(args.controller).RegisterTask(args.otp, - common_lib.MY_IP) + 'Registering with registration server at %s:%d using OTP "%s"', + args.controller, args.controller_port, args.otp) + rpc_server.RpcServer.Connect( + args.controller, args.controller_port).RegisterTask( + args.otp, common_lib.MY_IP, my_port) - server = rpc_server.RpcServer(args.controller, args.idle_timeout) + server = rpc_server.RpcServer(args.controller, my_port, args.idle_timeout) server.serve_forever() logging.info('Server shutdown complete') diff --git a/testing/legion/task_controller.py b/testing/legion/task_controller.py index 1725f8f..a667d18 100644 --- a/testing/legion/task_controller.py +++ b/testing/legion/task_controller.py @@ -51,7 +51,7 @@ class TaskController(object): _task_count = 0 _tasks = [] - def __init__(self, isolated_hash, dimensions, priority=100, + def __init__(self, isolated_hash, dimensions, reg_server_port, priority=100, idle_timeout_secs=common_lib.DEFAULT_TIMEOUT_SECS, connection_timeout_secs=common_lib.DEFAULT_TIMEOUT_SECS, verbosity='ERROR', name=None, run_id=None): @@ -67,11 +67,13 @@ class TaskController(object): self._connect_event = threading.Event() self._connected = False self._ip_address = None + self._reg_server_port = reg_server_port self._otp = self._CreateOTP() self._rpc = None self._output_dir = None self._platform = None self._executable = None + self._task_rpc_port = None run_id = run_id or datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%S') self._task_name = '%s/%s/%s' % ( @@ -162,8 +164,9 @@ class TaskController(object): controller_name = socket.gethostname() test_name = os.path.basename(sys.argv[0]) creation_time = datetime.datetime.utcnow() - otp = 'task:%s controller:%s test:%s creation:%s' % ( - self._name, controller_name, test_name, creation_time) + otp = 'task:%s controller:%s port: %d test:%s creation:%s' % ( + self._name, controller_name, self._reg_server_port, test_name, + creation_time) return otp def Create(self): @@ -218,6 +221,7 @@ class TaskController(object): cmd.extend([ '--', '--controller', common_lib.MY_IP, + '--controller-port', str(self._reg_server_port), '--otp', self._otp, '--verbosity', self._verbosity, '--idle-timeout', str(self._idle_timeout_secs), @@ -234,12 +238,15 @@ class TaskController(object): if p.returncode != 0: raise Error(stderr) - def OnConnect(self, ip_address): - """Receives task ip address on connection.""" + def OnConnect(self, ip_address, rpc_port): + """Receives task ip address and port on connection.""" self._ip_address = ip_address + self._task_rpc_port = rpc_port self._connected = True - self._rpc = rpc_server.RpcServer.Connect(self._ip_address) - logging.info('%s connected from %s', self._name, ip_address) + self._rpc = rpc_server.RpcServer.Connect(self._ip_address, + self._task_rpc_port) + logging.info('%s connected from %s:%s', self._name, ip_address, + self._task_rpc_port) self._connect_event.set() def RetrieveOutputFiles(self): diff --git a/testing/legion/task_registration_server.py b/testing/legion/task_registration_server.py index 7b9f09f..a4ac4cb 100644 --- a/testing/legion/task_registration_server.py +++ b/testing/legion/task_registration_server.py @@ -24,12 +24,17 @@ class TaskRegistrationServer(object): self._expected_tasks = {} self._rpc_server = None self._thread = None + self._port = common_lib.GetUnusedPort() - def _RegisterTaskRPC(self, otp, ip): + @property + def port(self): + return self._port + + def _RegisterTaskRPC(self, otp, ip, port): """The RPC used by a task to register with the registration server.""" assert otp in self._expected_tasks cb = self._expected_tasks.pop(otp) - cb(ip) + cb(ip, port) def RegisterTaskCallback(self, otp, callback): """Registers a callback associated with an OTP.""" @@ -40,8 +45,7 @@ class TaskRegistrationServer(object): """Starts the registration server.""" logging.info('Starting task registration server') self._rpc_server = SimpleJSONRPCServer.SimpleJSONRPCServer( - (common_lib.SERVER_ADDRESS, common_lib.SERVER_PORT), - allow_none=True, logRequests=False) + ('', self._port), allow_none=True, logRequests=False) self._rpc_server.register_function( self._RegisterTaskRPC, 'RegisterTask') self._thread = threading.Thread(target=self._rpc_server.serve_forever) |