aboutsummaryrefslogtreecommitdiffstats
path: root/popenasync.py
diff options
context:
space:
mode:
Diffstat (limited to 'popenasync.py')
-rw-r--r--popenasync.py360
1 files changed, 0 insertions, 360 deletions
diff --git a/popenasync.py b/popenasync.py
deleted file mode 100644
index 77faf9e0c..000000000
--- a/popenasync.py
+++ /dev/null
@@ -1,360 +0,0 @@
-#############################################################################
-##
-## Copyright (C) 2017 The Qt Company Ltd.
-## Contact: https://www.qt.io/licensing/
-##
-## This file is part of Qt for Python.
-##
-## $QT_BEGIN_LICENSE:LGPL$
-## Commercial License Usage
-## Licensees holding valid commercial Qt licenses may use this file in
-## accordance with the commercial license agreement provided with the
-## Software or, alternatively, in accordance with the terms contained in
-## a written agreement between you and The Qt Company. For licensing terms
-## and conditions see https://www.qt.io/terms-conditions. For further
-## information use the contact form at https://www.qt.io/contact-us.
-##
-## GNU Lesser General Public License Usage
-## Alternatively, this file may be used under the terms of the GNU Lesser
-## General Public License version 3 as published by the Free Software
-## Foundation and appearing in the file LICENSE.LGPL3 included in the
-## packaging of this file. Please review the following information to
-## ensure the GNU Lesser General Public License version 3 requirements
-## will be met: https://www.gnu.org/licenses/lgpl-3.0.html.
-##
-## GNU General Public License Usage
-## Alternatively, this file may be used under the terms of the GNU
-## General Public License version 2.0 or (at your option) the GNU General
-## Public license version 3 or any later version approved by the KDE Free
-## Qt Foundation. The licenses are as published by the Free Software
-## Foundation and appearing in the file LICENSE.GPL2 and LICENSE.GPL3
-## included in the packaging of this file. Please review the following
-## information to ensure the GNU General Public License requirements will
-## be met: https://www.gnu.org/licenses/gpl-2.0.html and
-## https://www.gnu.org/licenses/gpl-3.0.html.
-##
-## $QT_END_LICENSE$
-##
-#############################################################################
-
-################################################################################
-"""
-
-Modification of http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/440554
-
-"""
-
-#################################### IMPORTS ###################################
-
-import os
-import subprocess
-import errno
-import time
-import sys
-import unittest
-import tempfile
-
-def geterror ():
- return sys.exc_info()[1]
-
-if sys.version_info >= (3,):
- null_byte = '\x00'.encode('ascii')
-else:
- null_byte = '\x00'
-
-mswindows = (sys.platform == "win32")
-
-if mswindows:
- if sys.version_info >= (3,):
- # Test date should be in ascii.
- def encode(s):
- return s.encode('ascii', 'ignore')
-
- def decode(b):
- return b.decode('ascii', 'ignore')
- else:
- # Strings only; do nothing
- def encode(s):
- return s
-
- def decode(b):
- return b
-
- try:
- import ctypes
- from ctypes.wintypes import DWORD
- kernel32 = ctypes.windll.kernel32
- TerminateProcess = ctypes.windll.kernel32.TerminateProcess
- def WriteFile(handle, data, ol = None):
- c_written = DWORD()
- success = ctypes.windll.kernel32.WriteFile(handle,
- ctypes.create_string_buffer(encode(data)), len(data),
- ctypes.byref(c_written), ol)
- return ctypes.windll.kernel32.GetLastError(), c_written.value
- def ReadFile(handle, desired_bytes, ol = None):
- c_read = DWORD()
- buffer = ctypes.create_string_buffer(desired_bytes+1)
- success = ctypes.windll.kernel32.ReadFile(handle, buffer,
- desired_bytes, ctypes.byref(c_read), ol)
- buffer[c_read.value] = null_byte
- return ctypes.windll.kernel32.GetLastError(), decode(buffer.value)
- def PeekNamedPipe(handle, desired_bytes):
- c_avail = DWORD()
- c_message = DWORD()
- if desired_bytes > 0:
- c_read = DWORD()
- buffer = ctypes.create_string_buffer(desired_bytes+1)
- success = ctypes.windll.kernel32.PeekNamedPipe(handle, buffer,
- desired_bytes, ctypes.byref(c_read), ctypes.byref(c_avail),
- ctypes.byref(c_message))
- buffer[c_read.value] = null_byte
- return decode(buffer.value), c_avail.value, c_message.value
- else:
- success = ctypes.windll.kernel32.PeekNamedPipe(handle, None,
- desired_bytes, None, ctypes.byref(c_avail),
- ctypes.byref(c_message))
- return "", c_avail.value, c_message.value
-
- except ImportError:
- from win32file import ReadFile, WriteFile
- from win32pipe import PeekNamedPipe
- from win32api import TerminateProcess
- import msvcrt
-
-else:
- from signal import SIGINT, SIGTERM, SIGKILL
- import select
- import fcntl
-
-################################### CONSTANTS ##################################
-
-PIPE = subprocess.PIPE
-
-################################################################################
-
-class Popen(subprocess.Popen):
- def __init__(self, *args, **kwargs):
- subprocess.Popen.__init__(self, *args, **kwargs)
-
- def recv(self, maxsize=None):
- return self._recv('stdout', maxsize)
-
- def recv_err(self, maxsize=None):
- return self._recv('stderr', maxsize)
-
- def send_recv(self, input='', maxsize=None):
- return self.send(input), self.recv(maxsize), self.recv_err(maxsize)
-
- def read_async(self, wait=.1, e=1, tr=5, stderr=0):
- if tr < 1:
- tr = 1
- x = time.time()+ wait
- y = []
- r = ''
- pr = self.recv
- if stderr:
- pr = self.recv_err
- while time.time() < x or r:
- r = pr()
- if r is None:
- if e:
- raise Exception("Other end disconnected!")
- else:
- break
- elif r:
- y.append(r)
- else:
- time.sleep(max((x-time.time())/tr, 0))
- return ''.join(y)
-
- def send_all(self, data):
- while len(data):
- sent = self.send(data)
- if sent is None:
- raise Exception("Other end disconnected!")
- data = buffer(data, sent)
-
- def get_conn_maxsize(self, which, maxsize):
- if maxsize is None:
- maxsize = 1024
- elif maxsize < 1:
- maxsize = 1
- return getattr(self, which), maxsize
-
- def _close(self, which):
- conn = getattr(self, which)
- flags = fcntl.fcntl(conn, fcntl.F_GETFL)
- if not conn.closed:
- fcntl.fcntl(conn, fcntl.F_SETFL, flags & ~os.O_NONBLOCK)
- assert conn.read() == ''
- getattr(self, which).close()
- setattr(self, which, None)
-
- if mswindows:
- def kill(self):
- # Recipes
- #http://me.in-berlin.de/doc/python/faq/windows.html#how-do-i-emulate-os-kill-in-windows
- #http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/347462
-
- """kill function for Win32"""
- TerminateProcess(int(self._handle), 0) # returns None
-
- def send(self, input):
- if not self.stdin:
- return None
-
- try:
- x = msvcrt.get_osfhandle(self.stdin.fileno())
- (errCode, written) = WriteFile(x, input)
- except ValueError:
- return self._close('stdin')
- except (subprocess.pywintypes.error, Exception):
- if geterror()[0] in (109, errno.ESHUTDOWN):
- return self._close('stdin')
- raise
-
- return written
-
- def _recv(self, which, maxsize):
- conn, maxsize = self.get_conn_maxsize(which, maxsize)
- if conn is None:
- return None
-
- try:
- x = msvcrt.get_osfhandle(conn.fileno())
- (read, nAvail, nMessage) = PeekNamedPipe(x, 0)
- if maxsize < nAvail:
- nAvail = maxsize
- if nAvail > 0:
- (errCode, read) = ReadFile(x, nAvail, None)
- except ValueError:
- return self._close(which)
- except (subprocess.pywintypes.error, Exception):
- if geterror()[0] in (109, errno.ESHUTDOWN):
- return self._close(which)
- raise
-
- if self.universal_newlines:
- # Translate newlines. For Python 3.x assume read is text.
- # If bytes then another solution is needed.
- read = read.replace("\r\n", "\n").replace("\r", "\n")
- return read
-
- else:
- def kill(self):
- for i, sig in enumerate([SIGTERM, SIGKILL] * 2):
- if i % 2 == 0: os.kill(self.pid, sig)
- time.sleep((i * (i % 2) / 5.0) + 0.01)
-
- killed_pid, stat = os.waitpid(self.pid, os.WNOHANG)
- if killed_pid != 0: return
-
- def send(self, input):
- if not self.stdin:
- return None
-
- if not select.select([], [self.stdin], [], 0)[1]:
- return 0
-
- try:
- written = os.write(self.stdin.fileno(), input)
- except OSError:
- if geterror()[0] == errno.EPIPE: #broken pipe
- return self._close('stdin')
- raise
-
- return written
-
- def _recv(self, which, maxsize):
- conn, maxsize = self.get_conn_maxsize(which, maxsize)
- if conn is None:
- return None
-
- flags = fcntl.fcntl(conn, fcntl.F_GETFL)
- if not conn.closed:
- fcntl.fcntl(conn, fcntl.F_SETFL, flags| os.O_NONBLOCK)
-
- try:
- if not select.select([conn], [], [], 0)[0]:
- return ''
-
- try:
- r = conn.read(maxsize)
- except IOError as e:
- if e.errno == errno.EAGAIN:
- return ''
- raise
- if not r:
- return self._close(which)
-
- if self.universal_newlines:
- r = r.replace("\r\n", "\n").replace("\r", "\n")
- return r
- finally:
- if not conn.closed:
- fcntl.fcntl(conn, fcntl.F_SETFL, flags)
-
-################################################################################
-
-def proc_in_time_or_kill(cmd, time_out, wd = None, env = None):
- proc = Popen (
- cmd, cwd = wd, env = env,
- stdin = subprocess.PIPE, stdout = subprocess.PIPE,
- stderr = subprocess.STDOUT, universal_newlines = 1
- )
-
- ret_code = None
- response = []
-
- t = time.time()
- while ret_code is None and ((time.time() -t) < time_out):
- ret_code = proc.poll()
- response += [proc.read_async(wait=0.1, e=0)]
-
- if ret_code is None:
- ret_code = '"Process timed out (time_out = {} secs) '.format(time_out)
- try:
- proc.kill()
- ret_code += 'and was successfully terminated"'
- except Exception:
- ret_code += ("and termination failed "
- "(exception: {})".format(geterror(),))
-
- return ret_code, ''.join(response)
-
-################################################################################
-
-class AsyncTest(unittest.TestCase):
- def test_proc_in_time_or_kill(self):
- ret_code, response = proc_in_time_or_kill(
- [sys.executable, '-c', 'while 1: pass'], time_out = 1
- )
-
- self.assert_( 'rocess timed out' in ret_code )
- self.assert_( 'successfully terminated' in ret_code )
-
-################################################################################
-
-def _example():
- if sys.platform == 'win32':
- shell, commands, tail = ('cmd', ('echo "hello"', 'echo "HELLO WORLD"'),
- '\r\n')
- else:
- shell, commands, tail = ('sh', ('ls', 'echo HELLO WORLD'), '\n')
-
- a = Popen(shell, stdin=PIPE, stdout=PIPE)
- sys.stdout.write(a.read_async())
- sys.stdout.write(" ")
- for cmd in commands:
- a.send_all(cmd + tail)
- sys.stdout.write(a.read_async())
- sys.stdout.write(" ")
- a.send_all('exit' + tail)
- print (a.read_async(e=0))
- a.wait()
-
-################################################################################
-
-if __name__ == '__main__':
- if 1: unittest.main()
- else: _example()