Jelajahi Sumber

force kill forked child processes after each extractor command

Nick Sweeting 4 tahun lalu
induk
melakukan
e7c7a8f71c
1 mengubah file dengan 45 tambahan dan 5 penghapusan
  1. 45 5
      archivebox/system.py

+ 45 - 5
archivebox/system.py

@@ -2,12 +2,13 @@ __package__ = 'archivebox'
 
 
 
 
 import os
 import os
+import signal
 import shutil
 import shutil
 
 
 from json import dump
 from json import dump
 from pathlib import Path
 from pathlib import Path
 from typing import Optional, Union, Set, Tuple
 from typing import Optional, Union, Set, Tuple
-from subprocess import run as subprocess_run
+from subprocess import _mswindows, PIPE, Popen, CalledProcessError, CompletedProcess, TimeoutExpired
 
 
 from crontab import CronTab
 from crontab import CronTab
 from .vendor.atomicwrites import atomic_write as lib_atomic_write
 from .vendor.atomicwrites import atomic_write as lib_atomic_write
@@ -17,19 +18,58 @@ from .config import OUTPUT_PERMISSIONS
 
 
 
 
 
 
-def run(*args, input=None, capture_output=True, text=False, **kwargs):
-    """Patched of subprocess.run to fix blocking io making timeout=innefective"""
+def run(*args, input=None, capture_output=True, timeout=None, check=False, text=False, start_new_session=True, **kwargs):
+    """Patched of subprocess.run to kill forked child subprocesses and fix blocking io making timeout=innefective"""
 
 
     if input is not None:
     if input is not None:
-        if 'stdin' in kwargs:
+        if kwargs.get('stdin') is not None:
             raise ValueError('stdin and input arguments may not both be used.')
             raise ValueError('stdin and input arguments may not both be used.')
+        kwargs['stdin'] = PIPE
 
 
     if capture_output:
     if capture_output:
         if ('stdout' in kwargs) or ('stderr' in kwargs):
         if ('stdout' in kwargs) or ('stderr' in kwargs):
             raise ValueError('stdout and stderr arguments may not be used '
             raise ValueError('stdout and stderr arguments may not be used '
                              'with capture_output.')
                              'with capture_output.')
+        kwargs['stdout'] = PIPE
+        kwargs['stderr'] = PIPE
 
 
-    return subprocess_run(*args, input=input, capture_output=capture_output, text=text, **kwargs)
+    pgid = None
+    try:
+        with Popen(*args, start_new_session=start_new_session, **kwargs) as process:
+            pgid = os.getpgid(process.pid)
+            try:
+                stdout, stderr = process.communicate(input, timeout=timeout)
+            except TimeoutExpired as exc:
+                process.kill()
+                if _mswindows:
+                    # Windows accumulates the output in a single blocking
+                    # read() call run on child threads, with the timeout
+                    # being done in a join() on those threads.  communicate()
+                    # _after_ kill() is required to collect that and add it
+                    # to the exception.
+                    exc.stdout, exc.stderr = process.communicate()
+                else:
+                    # POSIX _communicate already populated the output so
+                    # far into the TimeoutExpired exception.
+                    process.wait()
+                raise
+            except:  # Including KeyboardInterrupt, communicate handled that.
+                process.kill()
+                # We don't call process.wait() as .__exit__ does that for us.
+                raise
+
+            retcode = process.poll()
+            if check and retcode:
+                raise CalledProcessError(retcode, process.args,
+                                         output=stdout, stderr=stderr)
+    finally:
+        # force kill any straggler subprocesses that were forked from the main proc
+        try:
+            os.killpg(pgid, signal.SIGINT)
+        except Exception as e:
+            pass
+
+    return CompletedProcess(process.args, retcode, stdout, stderr)
 
 
 
 
 @enforce_types
 @enforce_types