diff options
Diffstat (limited to 'src/3rdparty/v8/tools/testrunner/network/network_execution.py')
-rw-r--r-- | src/3rdparty/v8/tools/testrunner/network/network_execution.py | 253 |
1 files changed, 253 insertions, 0 deletions
diff --git a/src/3rdparty/v8/tools/testrunner/network/network_execution.py b/src/3rdparty/v8/tools/testrunner/network/network_execution.py new file mode 100644 index 0000000..ddb59e6 --- /dev/null +++ b/src/3rdparty/v8/tools/testrunner/network/network_execution.py @@ -0,0 +1,253 @@ +# Copyright 2012 the V8 project authors. All rights reserved. +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following +# disclaimer in the documentation and/or other materials provided +# with the distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + +import os +import socket +import subprocess +import threading +import time + +from . import distro +from . import perfdata +from ..local import execution +from ..objects import peer +from ..objects import workpacket +from ..server import compression +from ..server import constants +from ..server import local_handler +from ..server import signatures + + +def GetPeers(): + data = local_handler.LocalQuery([constants.REQUEST_PEERS]) + if not data: return [] + return [ peer.Peer.Unpack(p) for p in data ] + + +class NetworkedRunner(execution.Runner): + def __init__(self, suites, progress_indicator, context, peers, workspace): + self.suites = suites + num_tests = 0 + datapath = os.path.join("out", "testrunner_data") + self.perf_data_manager = perfdata.PerfDataManager(datapath) + self.perfdata = self.perf_data_manager.GetStore(context.arch, context.mode) + for s in suites: + for t in s.tests: + t.duration = self.perfdata.FetchPerfData(t) or 1.0 + num_tests += len(s.tests) + self._CommonInit(num_tests, progress_indicator, context) + self.tests = [] # Only used if we need to fall back to local execution. + self.tests_lock = threading.Lock() + self.peers = peers + self.pubkey_fingerprint = None # Fetched later. + self.base_rev = subprocess.check_output( + "cd %s; git log -1 --format=%%H --grep=git-svn-id" % workspace, + shell=True).strip() + self.base_svn_rev = subprocess.check_output( + "cd %s; git log -1 %s" # Get commit description. + " | grep -e '^\s*git-svn-id:'" # Extract "git-svn-id" line. + " | awk '{print $2}'" # Extract "repository@revision" part. + " | sed -e 's/.*@//'" % # Strip away "repository@". + (workspace, self.base_rev), shell=True).strip() + self.patch = subprocess.check_output( + "cd %s; git diff %s" % (workspace, self.base_rev), shell=True) + self.binaries = {} + self.initialization_lock = threading.Lock() + self.initialization_lock.acquire() # Released when init is done. + self._OpenLocalConnection() + self.local_receiver_thread = threading.Thread( + target=self._ListenLocalConnection) + self.local_receiver_thread.daemon = True + self.local_receiver_thread.start() + self.initialization_lock.acquire() + self.initialization_lock.release() + + def _OpenLocalConnection(self): + self.local_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + code = self.local_socket.connect_ex(("localhost", constants.CLIENT_PORT)) + if code != 0: + raise RuntimeError("Failed to connect to local server") + compression.Send([constants.REQUEST_PUBKEY_FINGERPRINT], self.local_socket) + + def _ListenLocalConnection(self): + release_lock_countdown = 1 # Pubkey. + self.local_receiver = compression.Receiver(self.local_socket) + while not self.local_receiver.IsDone(): + data = self.local_receiver.Current() + if data[0] == constants.REQUEST_PUBKEY_FINGERPRINT: + pubkey = data[1] + if not pubkey: raise RuntimeError("Received empty public key") + self.pubkey_fingerprint = pubkey + release_lock_countdown -= 1 + if release_lock_countdown == 0: + self.initialization_lock.release() + release_lock_countdown -= 1 # Prevent repeated triggering. + self.local_receiver.Advance() + + def Run(self, jobs): + self.indicator.Starting() + need_libv8 = False + for s in self.suites: + shell = s.shell() + if shell not in self.binaries: + path = os.path.join(self.context.shell_dir, shell) + # Check if this is a shared library build. + try: + ldd = subprocess.check_output("ldd %s | grep libv8\\.so" % (path), + shell=True) + ldd = ldd.strip().split(" ") + assert ldd[0] == "libv8.so" + assert ldd[1] == "=>" + need_libv8 = True + binary_needs_libv8 = True + libv8 = signatures.ReadFileAndSignature(ldd[2]) + except: + binary_needs_libv8 = False + binary = signatures.ReadFileAndSignature(path) + if binary[0] is None: + print("Error: Failed to create signature.") + assert binary[1] != 0 + return binary[1] + binary.append(binary_needs_libv8) + self.binaries[shell] = binary + if need_libv8: + self.binaries["libv8.so"] = libv8 + distro.Assign(self.suites, self.peers) + # Spawn one thread for each peer. + threads = [] + for p in self.peers: + thread = threading.Thread(target=self._TalkToPeer, args=[p]) + threads.append(thread) + thread.start() + try: + for thread in threads: + # Use a timeout so that signals (Ctrl+C) will be processed. + thread.join(timeout=10000000) + self._AnalyzePeerRuntimes() + except KeyboardInterrupt: + self.terminate = True + raise + except Exception, _e: + # If there's an exception we schedule an interruption for any + # remaining threads... + self.terminate = True + # ...and then reraise the exception to bail out. + raise + compression.Send(constants.END_OF_STREAM, self.local_socket) + self.local_socket.close() + if self.tests: + self._RunInternal(jobs) + self.indicator.Done() + return not self.failed + + def _TalkToPeer(self, peer): + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.settimeout(self.context.timeout + 10) + code = sock.connect_ex((peer.address, constants.PEER_PORT)) + if code == 0: + try: + peer.runtime = None + start_time = time.time() + packet = workpacket.WorkPacket(peer=peer, context=self.context, + base_revision=self.base_svn_rev, + patch=self.patch, + pubkey=self.pubkey_fingerprint) + data, test_map = packet.Pack(self.binaries) + compression.Send(data, sock) + compression.Send(constants.END_OF_STREAM, sock) + rec = compression.Receiver(sock) + while not rec.IsDone() and not self.terminate: + data_list = rec.Current() + for data in data_list: + test_id = data[0] + if test_id < 0: + # The peer is reporting an error. + with self.lock: + print("\nPeer %s reports error: %s" % (peer.address, data[1])) + continue + test = test_map.pop(test_id) + test.MergeResult(data) + try: + self.perfdata.UpdatePerfData(test) + except Exception, e: + print("UpdatePerfData exception: %s" % e) + pass # Just keep working. + with self.lock: + perf_key = self.perfdata.GetKey(test) + compression.Send( + [constants.INFORM_DURATION, perf_key, test.duration, + self.context.arch, self.context.mode], + self.local_socket) + self.indicator.AboutToRun(test) + if test.suite.HasUnexpectedOutput(test): + self.failed.append(test) + if test.output.HasCrashed(): + self.crashed += 1 + else: + self.succeeded += 1 + self.remaining -= 1 + self.indicator.HasRun(test) + rec.Advance() + peer.runtime = time.time() - start_time + except KeyboardInterrupt: + sock.close() + raise + except Exception, e: + print("Got exception: %s" % e) + pass # Fall back to local execution. + else: + compression.Send([constants.UNRESPONSIVE_PEER, peer.address], + self.local_socket) + sock.close() + if len(test_map) > 0: + # Some tests have not received any results. Run them locally. + print("\nNo results for %d tests, running them locally." % len(test_map)) + self._EnqueueLocally(test_map) + + def _EnqueueLocally(self, test_map): + with self.tests_lock: + for test in test_map: + self.tests.append(test_map[test]) + + def _AnalyzePeerRuntimes(self): + total_runtime = 0.0 + total_work = 0.0 + for p in self.peers: + if p.runtime is None: + return + total_runtime += p.runtime + total_work += p.assigned_work + for p in self.peers: + p.assigned_work /= total_work + p.runtime /= total_runtime + perf_correction = p.assigned_work / p.runtime + old_perf = p.relative_performance + p.relative_performance = (old_perf + perf_correction) / 2.0 + compression.Send([constants.UPDATE_PERF, p.address, + p.relative_performance], + self.local_socket) |