1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
|
#!/usr/bin/python
# Copyright (c) 2010 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""
This tool launches several shards of a gtest-based binary
in parallel on a local machine.
Example usage:
parallel_launcher.py path/to/base_unittests
"""
import optparse
import os
import subprocess
import sys
import threading
import time
def StreamCopyWindows(stream_from, stream_to):
"""Copies stream_from to stream_to."""
while True:
buf = stream_from.read(1024)
if not buf:
break
stream_to.write(buf)
stream_to.flush()
def StreamCopyPosix(stream_from, stream_to, child_exited):
"""
Copies stream_from to stream_to, and exits if child_exited
is signaled.
"""
import fcntl
# Put the source stream in a non-blocking mode, so we can check
# child_exited when there is no data.
fd = stream_from.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
while True:
try:
buf = os.read(fd, 1024)
except OSError, e:
if e.errno == 11:
if child_exited.isSet():
break
time.sleep(0.1)
continue
raise
if not buf:
break
stream_to.write(buf)
stream_to.flush()
class TestLauncher(object):
def __init__(self, args, executable, num_shards, shard):
self._args = args
self._executable = executable
self._num_shards = num_shards
self._shard = shard
self._test = None
def launch(self):
env = os.environ.copy()
env['CHROME_LOG_FILE'] = 'chrome_log_%d' % self._shard
if 'GTEST_TOTAL_SHARDS' in env:
# Handle the requested sharding transparently.
outer_shards = int(env['GTEST_TOTAL_SHARDS'])
outer_index = int(env['GTEST_SHARD_INDEX'])
env['GTEST_TOTAL_SHARDS'] = str(self._num_shards * outer_shards)
# Calculate the right shard index to pass to the child. This is going
# to be a shard of a shard.
env['GTEST_SHARD_INDEX'] = str((self._num_shards * outer_index) +
self._shard)
else:
env['GTEST_TOTAL_SHARDS'] = str(self._num_shards)
env['GTEST_SHARD_INDEX'] = str(self._shard)
args = self._args + ['--test-server-shard=' + str(self._shard)]
self._test = subprocess.Popen(args=args,
executable=self._executable,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
env=env)
def wait(self):
if subprocess.mswindows:
stdout_thread = threading.Thread(
target=StreamCopyWindows,
args=[self._test.stdout, sys.stdout])
stdout_thread.start()
code = self._test.wait()
stdout_thread.join()
return code
else:
child_exited = threading.Event()
stdout_thread = threading.Thread(
target=StreamCopyPosix,
args=[self._test.stdout, sys.stdout, child_exited])
stdout_thread.start()
code = self._test.wait()
child_exited.set()
stdout_thread.join()
return code
def main(argv):
parser = optparse.OptionParser()
parser.add_option("--shards", type="int", dest="shards", default=10)
# Make it possible to pass options to the launched process.
# Options for parallel_launcher should be first, then the binary path,
# and finally - optional arguments for the launched binary.
parser.disable_interspersed_args()
options, args = parser.parse_args(argv)
if not args:
print 'You must provide path to the test binary'
return 1
env = os.environ
if bool('GTEST_TOTAL_SHARDS' in env) != bool('GTEST_SHARD_INDEX' in env):
print 'Inconsistent environment. GTEST_TOTAL_SHARDS and GTEST_SHARD_INDEX'
print 'should either be both defined, or both undefined.'
return 1
launchers = []
for shard in range(options.shards):
launcher = TestLauncher(args, args[0], options.shards, shard)
launcher.launch()
launchers.append(launcher)
return_code = 0
for launcher in launchers:
if launcher.wait() != 0:
return_code = 1
return return_code
if __name__ == "__main__":
sys.exit(main(sys.argv[1:]))
|