summaryrefslogtreecommitdiffstats
path: root/tools/sharding_supervisor/sharding_supervisor_unittest.py
blob: 77b1f54d4c29935c72322f8aec768cbfeca203e8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
#!/usr/bin/env python
# Copyright (c) 2012 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

"""Verify basic usage of sharding_supervisor."""

import difflib
import os
import subprocess
import sys
import unittest

from xml.dom import minidom

import sharding_supervisor

ROOT_DIR = os.path.dirname(os.path.abspath(__file__))
SHARDING_SUPERVISOR = os.path.join(ROOT_DIR, 'sharding_supervisor.py')
DUMMY_TEST = os.path.join(ROOT_DIR, 'dummy_test.py')
NUM_CORES = sharding_supervisor.DetectNumCores()
SHARDS_PER_CORE = sharding_supervisor.SS_DEFAULT_SHARDS_PER_CORE


def generate_expected_output(start, end, num_shards):
  """Generate the expected stdout and stderr for the dummy test."""
  stdout = ''
  stderr = ''
  for i in range(start, end):
    stdout += 'Running shard %d of %d%s' % (i, num_shards, os.linesep)
  stdout += '%sALL SHARDS PASSED!%sALL TESTS PASSED!%s' % (os.linesep,
                                                           os.linesep,
                                                           os.linesep)

  return (stdout, stderr)


class ShardingSupervisorUnittest(unittest.TestCase):
  def test_basic_run(self):
    # Default test.
    expected_shards = NUM_CORES * SHARDS_PER_CORE
    (expected_out, expected_err) = generate_expected_output(
        0, expected_shards, expected_shards)
    p = subprocess.Popen([sys.executable, SHARDING_SUPERVISOR, '--no-color',
                          DUMMY_TEST], stdout=subprocess.PIPE,
                         stderr=subprocess.PIPE)

    (out, err) = p.communicate()
    self.assertEqual(expected_out, out)
    self.assertEqual(expected_err, err)
    self.assertEqual(0, p.returncode)

  def test_shard_per_core(self):
    """Test the --shards_per_core parameter."""
    expected_shards = NUM_CORES * 25
    (expected_out, expected_err) = generate_expected_output(
        0, expected_shards, expected_shards)
    p = subprocess.Popen([sys.executable, SHARDING_SUPERVISOR, '--no-color',
                          '--shards_per_core', '25', DUMMY_TEST],
                         stdout=subprocess.PIPE, stderr=subprocess.PIPE)

    (out, err) = p.communicate()
    self.assertEqual(expected_out, out)
    self.assertEqual(expected_err, err)
    self.assertEqual(0, p.returncode)

  def test_slave_sharding(self):
    """Test the --total-slaves and --slave-index parameters."""
    total_shards = 6
    expected_shards = NUM_CORES * SHARDS_PER_CORE * total_shards

    # Test every single index to make sure they run correctly.
    for index in range(total_shards):
      begin = NUM_CORES * SHARDS_PER_CORE * index
      end = begin + NUM_CORES * SHARDS_PER_CORE
      (expected_out, expected_err) = generate_expected_output(
          begin, end, expected_shards)
      p = subprocess.Popen([sys.executable, SHARDING_SUPERVISOR, '--no-color',
                            '--total-slaves', str(total_shards),
                            '--slave-index', str(index),
                            DUMMY_TEST],
                           stdout=subprocess.PIPE, stderr=subprocess.PIPE)

      (out, err) = p.communicate()
      self.assertEqual(expected_out, out)
      self.assertEqual(expected_err, err)
      self.assertEqual(0, p.returncode)

  def test_append_to_xml(self):
    shard_xml_path = os.path.join(ROOT_DIR, 'data', 'gtest_results.xml')
    expected_xml_path = os.path.join(
        ROOT_DIR, 'data', 'gtest_results_expected.xml')
    merged_xml = sharding_supervisor.AppendToXML(None, shard_xml_path, 0)
    merged_xml = sharding_supervisor.AppendToXML(merged_xml, shard_xml_path, 1)

    with open(expected_xml_path) as expected_xml_file:
      expected_xml = minidom.parse(expected_xml_file)

    # Serialize XML to a list of strings that is consistently formatted
    # (ignoring whitespace between elements) so that it may be compared.
    def _serialize_xml(xml):
      def _remove_whitespace_and_comments(xml):
        children_to_remove = []
        for child in xml.childNodes:
          if (child.nodeType == minidom.Node.TEXT_NODE and
              not child.data.strip()):
            children_to_remove.append(child)
          elif child.nodeType == minidom.Node.COMMENT_NODE:
            children_to_remove.append(child)
          elif child.nodeType == minidom.Node.ELEMENT_NODE:
            _remove_whitespace_and_comments(child)

        for child in children_to_remove:
          xml.removeChild(child)

      _remove_whitespace_and_comments(xml)
      return xml.toprettyxml(indent='  ').splitlines()

    diff = list(difflib.unified_diff(
        _serialize_xml(expected_xml),
        _serialize_xml(merged_xml),
        fromfile='gtest_results_expected.xml',
        tofile='gtest_results_actual.xml'))
    if diff:
      self.fail('Did not merge results XML correctly:\n' + '\n'.join(diff))


if __name__ == '__main__':
  unittest.main()