diff --git a/pytools/prefork.py b/pytools/prefork.py index a81c987d67e0b556cd7479251be48b334567ffd6..93a56d7e03aad6eea44c8a9046e3beecb51c9cad 100644 --- a/pytools/prefork.py +++ b/pytools/prefork.py @@ -8,16 +8,11 @@ Since none of this is MPI-specific, it got parked in pytools. from __future__ import absolute_import - - - class ExecError(OSError): pass - - -class DirectForker: +class DirectForker(object): @staticmethod def call(cmdline, cwd=None): from subprocess import call @@ -25,34 +20,25 @@ class DirectForker: return call(cmdline, cwd=cwd) except OSError as e: raise ExecError("error invoking '%s': %s" - % ( " ".join(cmdline), e)) - - @staticmethod - def call_capture_stdout(cmdline, cwd=None): - from subprocess import Popen, PIPE - try: - return Popen(cmdline, cwd=cwd, stdin=PIPE, stdout=PIPE, stderr=PIPE).communicate()[0] - except OSError as e: - raise ExecError("error invoking '%s': %s" - % ( " ".join(cmdline), e)) + % (" ".join(cmdline), e)) @staticmethod def call_capture_output(cmdline, cwd=None, error_on_nonzero=True): - """ - :returns: a tuple (return code, stdout_data, stderr_data). - """ from subprocess import Popen, PIPE try: - popen = Popen(cmdline, cwd=cwd, stdin=PIPE, stdout=PIPE, stderr=PIPE) + popen = Popen(cmdline, cwd=cwd, stdin=PIPE, stdout=PIPE, + stderr=PIPE) stdout_data, stderr_data = popen.communicate() + if error_on_nonzero and popen.returncode: raise ExecError("status %d invoking '%s': %s" - % (popen.returncode, " ".join(cmdline), stderr_data)) + % (popen.returncode, " ".join(cmdline), + stderr_data)) + return popen.returncode, stdout_data, stderr_data except OSError as e: raise ExecError("error invoking '%s': %s" - % ( " ".join(cmdline), e)) - + % (" ".join(cmdline), e)) def _send_packet(sock, data): @@ -64,6 +50,7 @@ def _send_packet(sock, data): sock.sendall(pack("I", len(packet))) sock.sendall(packet) + def _recv_packet(sock, who="Process", partner="other end"): from struct import calcsize, unpack size_bytes_size = calcsize("I") @@ -85,8 +72,6 @@ def _recv_packet(sock, who="Process", partner="other end"): return loads(packet) - - def _fork_server(sock): import signal # ignore keyboard interrupts, we'll get notified by the parent. @@ -98,16 +83,16 @@ def _fork_server(sock): quitflag[0] = True funcs = { - "quit": quit, - "call": DirectForker.call, - "call_capture_stdout": DirectForker.call_capture_stdout, - "call_capture_output": DirectForker.call_capture_output, + "quit": quit, + "call": DirectForker.call, + "call_capture_output": DirectForker.call_capture_output, } try: while not quitflag[0]: - func_name, args, kwargs = _recv_packet(sock, - who="Prefork server", partner="parent") + func_name, args, kwargs = _recv_packet( + sock, who="Prefork server", partner="parent" + ) try: result = funcs[func_name](*args, **kwargs) @@ -122,18 +107,16 @@ def _fork_server(sock): os._exit(0) - - - -class IndirectForker: +class IndirectForker(object): def __init__(self, server_pid, sock): self.server_pid = server_pid self.socket = sock def _remote_invoke(self, name, *args, **kwargs): _send_packet(self.socket, (name, args, kwargs)) - status, result = _recv_packet(self.socket, - who="Prefork client", partner="prefork server") + status, result = _recv_packet( + self.socket, who="Prefork client", partner="prefork server" + ) if status == "exception": raise result @@ -142,24 +125,22 @@ class IndirectForker: def _quit(self): self._remote_invoke("quit") + from os import waitpid waitpid(self.server_pid, 0) def call(self, cmdline, cwd=None): return self._remote_invoke("call", cmdline, cwd) - def call_capture_stdout(self, cmdline, cwd=None): - return self._remote_invoke("call_capture_stdout", cmdline, cwd) - def call_capture_output(self, cmdline, cwd=None, error_on_nonzero=True): - return self._remote_invoke("call_capture_output", cmdline, cwd, - error_on_nonzero) - - + return self._remote_invoke("call_capture_output", cmdline, cwd, + error_on_nonzero) def enable_prefork(): - if isinstance(forker[0], IndirectForker): + global forker + + if isinstance(forker, IndirectForker): return from socket import socketpair @@ -168,30 +149,25 @@ def enable_prefork(): from os import fork fork_res = fork() + # Child if fork_res == 0: - # child s_parent.close() _fork_server(s_child) + # Parent else: s_child.close() - forker[0] = IndirectForker(fork_res, s_parent) + forker = IndirectForker(fork_res, s_parent) import atexit - atexit.register(forker[0]._quit) - + atexit.register(forker._quit) +forker = DirectForker() -forker = [DirectForker()] def call(cmdline, cwd=None): - return forker[0].call(cmdline, cwd) + return forker.call(cmdline, cwd) -def call_capture_stdout(cmdline, cwd=None): - from warnings import warn - warn("call_capture_stdout is deprecated: use call_capture_output instead", - stacklevel=2) - return forker[0].call_capture_stdout(cmdline, cwd) def call_capture_output(cmdline, cwd=None, error_on_nonzero=True): - return forker[0].call_capture_output(cmdline, cwd, error_on_nonzero) + return forker.call_capture_output(cmdline, cwd, error_on_nonzero)