summaryrefslogtreecommitdiffstats
path: root/testing/legion
diff options
context:
space:
mode:
authormmeade <mmeade@chromium.org>2015-11-18 13:35:50 -0800
committerCommit bot <commit-bot@chromium.org>2015-11-18 21:37:36 +0000
commitb257d2f6ef4103a883937cdf8740533a224bc9a3 (patch)
tree8ced8197d85782bae951b16f5cd0d2cc955c3736 /testing/legion
parentbd325ae36bddc7d929a3562a44c5061d7015841b (diff)
downloadchromium_src-b257d2f6ef4103a883937cdf8740533a224bc9a3.zip
chromium_src-b257d2f6ef4103a883937cdf8740533a224bc9a3.tar.gz
chromium_src-b257d2f6ef4103a883937cdf8740533a224bc9a3.tar.bz2
Enabling the servers to run on an available port rather than a hard coded one.
BUG=557878 Review URL: https://codereview.chromium.org/1460803002 Cr-Commit-Position: refs/heads/master@{#360418}
Diffstat (limited to 'testing/legion')
-rw-r--r--testing/legion/OWNERS3
-rw-r--r--testing/legion/common_lib.py13
-rw-r--r--testing/legion/legion_test_case.py3
-rw-r--r--testing/legion/rpc_server.py6
-rwxr-xr-xtesting/legion/run_task.py14
-rw-r--r--testing/legion/task_controller.py21
-rw-r--r--testing/legion/task_registration_server.py12
7 files changed, 49 insertions, 23 deletions
diff --git a/testing/legion/OWNERS b/testing/legion/OWNERS
new file mode 100644
index 0000000..f44af62
--- /dev/null
+++ b/testing/legion/OWNERS
@@ -0,0 +1,3 @@
+mmeade@chromium.org
+bgoldman@chromium.org
+chaitali@chromium.org
diff --git a/testing/legion/common_lib.py b/testing/legion/common_lib.py
index 863f887..2d97ef8 100644
--- a/testing/legion/common_lib.py
+++ b/testing/legion/common_lib.py
@@ -11,9 +11,7 @@ import socket
LOGGING_LEVELS = ['DEBUG', 'INFO', 'WARNING', 'WARN', 'ERROR']
MY_IP = socket.gethostbyname(socket.gethostname())
-SERVER_ADDRESS = ''
-SERVER_PORT = 31710
-DEFAULT_TIMEOUT_SECS = 20 * 60 # 30 minutes
+DEFAULT_TIMEOUT_SECS = 30 * 60 # 30 minutes
THIS_DIR = os.path.dirname(os.path.abspath(__file__))
SWARMING_DIR = os.path.join(THIS_DIR, '..', '..', 'tools', 'swarming_client')
@@ -41,3 +39,12 @@ def GetOutputDir():
parser.add_argument('--output-dir')
args, _ = parser.parse_known_args()
return args.output_dir
+
+
+def GetUnusedPort():
+ """Finds and returns an unused port."""
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ s.bind(('localhost', 0))
+ _, port = s.getsockname()
+ s.close()
+ return port
diff --git a/testing/legion/legion_test_case.py b/testing/legion/legion_test_case.py
index c70d154..dfd1e27 100644
--- a/testing/legion/legion_test_case.py
+++ b/testing/legion/legion_test_case.py
@@ -86,7 +86,8 @@ class TestCase(unittest.TestCase):
@classmethod
def CreateTask(cls, *args, **kwargs):
"""Convenience method to create a new task."""
- task = task_controller.TaskController(*args, **kwargs)
+ task = task_controller.TaskController(
+ reg_server_port=cls._registration_server.port, *args, **kwargs)
cls._registration_server.RegisterTaskCallback(
task.otp, task.OnConnect)
return task
diff --git a/testing/legion/rpc_server.py b/testing/legion/rpc_server.py
index 4455eec..da9d88b 100644
--- a/testing/legion/rpc_server.py
+++ b/testing/legion/rpc_server.py
@@ -51,10 +51,10 @@ class RpcServer(SimpleJSONRPCServer.SimpleJSONRPCServer,
SocketServer.ThreadingMixIn):
"""Restricts all endpoints to only specified IP addresses."""
- def __init__(self, authorized_address,
+ def __init__(self, authorized_address, port,
idle_timeout_secs=common_lib.DEFAULT_TIMEOUT_SECS):
SimpleJSONRPCServer.SimpleJSONRPCServer.__init__(
- self, (common_lib.SERVER_ADDRESS, common_lib.SERVER_PORT),
+ self, ('', port),
allow_none=True, logRequests=False,
requestHandler=RequestHandler)
self.authorized_address = authorized_address
@@ -128,7 +128,7 @@ class RpcServer(SimpleJSONRPCServer.SimpleJSONRPCServer,
self.shutdown()
@staticmethod
- def Connect(server, port=common_lib.SERVER_PORT):
+ def Connect(server, port):
"""Creates and returns a connection to an RPC server."""
addr = 'http://%s:%d' % (server, port)
logging.debug('Connecting to RPC server at %s', addr)
diff --git a/testing/legion/run_task.py b/testing/legion/run_task.py
index 953d90d..0fa2044 100755
--- a/testing/legion/run_task.py
+++ b/testing/legion/run_task.py
@@ -26,18 +26,22 @@ def main():
help='One time token used to authenticate with the host')
parser.add_argument('--controller',
help='The ip address of the controller machine')
+ parser.add_argument('--controller-port', type=int,
+ help='The port the controllers registration server is on')
parser.add_argument('--idle-timeout', type=int,
default=common_lib.DEFAULT_TIMEOUT_SECS,
help='The idle timeout for the rpc server in seconds')
args, _ = parser.parse_known_args()
+ my_port = common_lib.GetUnusedPort()
logging.info(
- 'Registering with registration server at %s using OTP "%s"',
- args.controller, args.otp)
- rpc_server.RpcServer.Connect(args.controller).RegisterTask(args.otp,
- common_lib.MY_IP)
+ 'Registering with registration server at %s:%d using OTP "%s"',
+ args.controller, args.controller_port, args.otp)
+ rpc_server.RpcServer.Connect(
+ args.controller, args.controller_port).RegisterTask(
+ args.otp, common_lib.MY_IP, my_port)
- server = rpc_server.RpcServer(args.controller, args.idle_timeout)
+ server = rpc_server.RpcServer(args.controller, my_port, args.idle_timeout)
server.serve_forever()
logging.info('Server shutdown complete')
diff --git a/testing/legion/task_controller.py b/testing/legion/task_controller.py
index 1725f8f..a667d18 100644
--- a/testing/legion/task_controller.py
+++ b/testing/legion/task_controller.py
@@ -51,7 +51,7 @@ class TaskController(object):
_task_count = 0
_tasks = []
- def __init__(self, isolated_hash, dimensions, priority=100,
+ def __init__(self, isolated_hash, dimensions, reg_server_port, priority=100,
idle_timeout_secs=common_lib.DEFAULT_TIMEOUT_SECS,
connection_timeout_secs=common_lib.DEFAULT_TIMEOUT_SECS,
verbosity='ERROR', name=None, run_id=None):
@@ -67,11 +67,13 @@ class TaskController(object):
self._connect_event = threading.Event()
self._connected = False
self._ip_address = None
+ self._reg_server_port = reg_server_port
self._otp = self._CreateOTP()
self._rpc = None
self._output_dir = None
self._platform = None
self._executable = None
+ self._task_rpc_port = None
run_id = run_id or datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%S')
self._task_name = '%s/%s/%s' % (
@@ -162,8 +164,9 @@ class TaskController(object):
controller_name = socket.gethostname()
test_name = os.path.basename(sys.argv[0])
creation_time = datetime.datetime.utcnow()
- otp = 'task:%s controller:%s test:%s creation:%s' % (
- self._name, controller_name, test_name, creation_time)
+ otp = 'task:%s controller:%s port: %d test:%s creation:%s' % (
+ self._name, controller_name, self._reg_server_port, test_name,
+ creation_time)
return otp
def Create(self):
@@ -218,6 +221,7 @@ class TaskController(object):
cmd.extend([
'--',
'--controller', common_lib.MY_IP,
+ '--controller-port', str(self._reg_server_port),
'--otp', self._otp,
'--verbosity', self._verbosity,
'--idle-timeout', str(self._idle_timeout_secs),
@@ -234,12 +238,15 @@ class TaskController(object):
if p.returncode != 0:
raise Error(stderr)
- def OnConnect(self, ip_address):
- """Receives task ip address on connection."""
+ def OnConnect(self, ip_address, rpc_port):
+ """Receives task ip address and port on connection."""
self._ip_address = ip_address
+ self._task_rpc_port = rpc_port
self._connected = True
- self._rpc = rpc_server.RpcServer.Connect(self._ip_address)
- logging.info('%s connected from %s', self._name, ip_address)
+ self._rpc = rpc_server.RpcServer.Connect(self._ip_address,
+ self._task_rpc_port)
+ logging.info('%s connected from %s:%s', self._name, ip_address,
+ self._task_rpc_port)
self._connect_event.set()
def RetrieveOutputFiles(self):
diff --git a/testing/legion/task_registration_server.py b/testing/legion/task_registration_server.py
index 7b9f09f..a4ac4cb 100644
--- a/testing/legion/task_registration_server.py
+++ b/testing/legion/task_registration_server.py
@@ -24,12 +24,17 @@ class TaskRegistrationServer(object):
self._expected_tasks = {}
self._rpc_server = None
self._thread = None
+ self._port = common_lib.GetUnusedPort()
- def _RegisterTaskRPC(self, otp, ip):
+ @property
+ def port(self):
+ return self._port
+
+ def _RegisterTaskRPC(self, otp, ip, port):
"""The RPC used by a task to register with the registration server."""
assert otp in self._expected_tasks
cb = self._expected_tasks.pop(otp)
- cb(ip)
+ cb(ip, port)
def RegisterTaskCallback(self, otp, callback):
"""Registers a callback associated with an OTP."""
@@ -40,8 +45,7 @@ class TaskRegistrationServer(object):
"""Starts the registration server."""
logging.info('Starting task registration server')
self._rpc_server = SimpleJSONRPCServer.SimpleJSONRPCServer(
- (common_lib.SERVER_ADDRESS, common_lib.SERVER_PORT),
- allow_none=True, logRequests=False)
+ ('', self._port), allow_none=True, logRequests=False)
self._rpc_server.register_function(
self._RegisterTaskRPC, 'RegisterTask')
self._thread = threading.Thread(target=self._rpc_server.serve_forever)