summaryrefslogtreecommitdiffstats
path: root/tools/parallel_launcher/parallel_launcher.py
blob: 469d44404e5e5089163e7fb2cbe8fd08587f00ec (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
#!/usr/bin/python
# Copyright (c) 2010 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

"""
This tool launches several shards of a gtest-based binary
in parallel on a local machine.

Example usage:

parallel_launcher.py path/to/base_unittests
"""

import optparse
import os
import subprocess
import sys
import threading
import time


def StreamCopyWindows(stream_from, stream_to):
  """Copies stream_from to stream_to."""

  while True:
    buf = stream_from.read(1024)
    if not buf:
      break
    stream_to.write(buf)
    stream_to.flush()

def StreamCopyPosix(stream_from, stream_to, child_exited):
  """
  Copies stream_from to stream_to, and exits if child_exited
  is signaled.
  """

  import fcntl

  # Put the source stream in a non-blocking mode, so we can check
  # child_exited when there is no data.
  fd = stream_from.fileno()
  fl = fcntl.fcntl(fd, fcntl.F_GETFL)
  fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)

  while True:
    try:
      buf = os.read(fd, 1024)
    except OSError, e:
      if e.errno == 11:
        if child_exited.isSet():
          break
        time.sleep(0.1)
        continue
      raise
    if not buf:
      break
    stream_to.write(buf)
    stream_to.flush()

class TestLauncher(object):
  def __init__(self, args, executable, num_shards, shard):
    self._args = args
    self._executable = executable
    self._num_shards = num_shards
    self._shard = shard
    self._test = None

  def launch(self):
    env = os.environ.copy()

    env['CHROME_LOG_FILE'] = 'chrome_log_%d' % self._shard

    if 'GTEST_TOTAL_SHARDS' in env:
      # Handle the requested sharding transparently.
      outer_shards = int(env['GTEST_TOTAL_SHARDS'])
      outer_index = int(env['GTEST_SHARD_INDEX'])

      env['GTEST_TOTAL_SHARDS'] = str(self._num_shards * outer_shards)

      # Calculate the right shard index to pass to the child. This is going
      # to be a shard of a shard.
      env['GTEST_SHARD_INDEX'] = str((self._num_shards * outer_index) +
                                     self._shard)
    else:
      env['GTEST_TOTAL_SHARDS'] = str(self._num_shards)
      env['GTEST_SHARD_INDEX'] = str(self._shard)

    args = self._args + ['--test-server-shard=' + str(self._shard)]

    self._test = subprocess.Popen(args=args,
                                  executable=self._executable,
                                  stdout=subprocess.PIPE,
                                  stderr=subprocess.STDOUT,
                                  env=env)

  def wait(self):
    if subprocess.mswindows:
      stdout_thread = threading.Thread(
          target=StreamCopyWindows,
          args=[self._test.stdout, sys.stdout])
      stdout_thread.start()
      code = self._test.wait()
      stdout_thread.join()
      return code
    else:
      child_exited = threading.Event()
      stdout_thread = threading.Thread(
          target=StreamCopyPosix,
          args=[self._test.stdout, sys.stdout, child_exited])
      stdout_thread.start()
      code = self._test.wait()
      child_exited.set()
      stdout_thread.join()
      return code

def main(argv):
  parser = optparse.OptionParser()
  parser.add_option("--shards", type="int", dest="shards", default=10)

  # Make it possible to pass options to the launched process.
  # Options for parallel_launcher should be first, then the binary path,
  # and finally - optional arguments for the launched binary.
  parser.disable_interspersed_args()

  options, args = parser.parse_args(argv)

  if not args:
    print 'You must provide path to the test binary'
    return 1

  env = os.environ
  if bool('GTEST_TOTAL_SHARDS' in env) != bool('GTEST_SHARD_INDEX' in env):
    print 'Inconsistent environment. GTEST_TOTAL_SHARDS and GTEST_SHARD_INDEX'
    print 'should either be both defined, or both undefined.'
    return 1

  launchers = []

  for shard in range(options.shards):
    launcher = TestLauncher(args, args[0], options.shards, shard)
    launcher.launch()
    launchers.append(launcher)

  return_code = 0
  for launcher in launchers:
    if launcher.wait() != 0:
      return_code = 1

  return return_code

if __name__ == "__main__":
  sys.exit(main(sys.argv[1:]))