Bladeren bron

wait for bg threads to finish before exiting

Nick Sweeting 1 jaar geleden
bovenliggende
commit
e1a04729b3
1 gewijzigde bestanden met toevoegingen van 40 en 2 verwijderingen
  1. 40 2
      archivebox/cli/__init__.py

+ 40 - 2
archivebox/cli/__init__.py

@@ -4,14 +4,18 @@ __command__ = 'archivebox'
 import os
 import sys
 import argparse
+import threading
+from time import sleep
 
-from typing import Optional, Dict, List, IO, Union
+from typing import Optional, Dict, List, IO, Union, Iterable
 from pathlib import Path
 
-from ..config import OUTPUT_DIR, check_data_folder, check_migrations
+from ..config import OUTPUT_DIR, check_data_folder, check_migrations, stderr
 
 from importlib import import_module
 
+BUILTIN_LIST = list
+
 CLI_DIR = Path(__file__).resolve().parent
 
 # these common commands will appear sorted before any others for ease-of-use
@@ -33,6 +37,37 @@ is_valid_cli_module = lambda module, subcommand: (
 )
 
 
+def wait_for_bg_threads_to_exit(thread_names: Iterable[str]=(), ignore_names: Iterable[str]=('MainThread', 'ThreadPoolExecutor'), timeout: int=60) -> int:
+    """
+    Block until the specified threads exit. e.g. pass thread_names=('default_hook_handler',) to wait for webhooks.
+    Useful for waiting for signal handlers, webhooks, etc. to finish running after a mgmt command completes.
+    """
+
+    wait_for_all: bool = thread_names == ()
+
+    thread_matches = lambda thread, ptns: any(ptn in repr(thread) for ptn in ptns)
+
+    should_wait = lambda thread: (
+        not thread_matches(thread, ignore_names)
+        and (wait_for_all or thread_matches(thread, thread_names)))
+
+    for tries in range(timeout):
+        all_threads = [*threading.enumerate()]
+        blocking_threads = [*filter(should_wait, all_threads)]
+        threads_summary = ', '.join(repr(t) for t in blocking_threads)
+        if blocking_threads:
+            sleep(1)
+            if tries == 5:                            # only show stderr message if we need to wait more than 5s
+                stderr(
+                    f'[…] Waiting up to {timeout}s for background jobs (e.g. webhooks) to finish...',
+                    threads_summary,
+                )
+        else:
+            return tries
+
+    raise Exception('Background threads failed to exit after {tries}s: {threads_summary}')
+
+
 def list_subcommands() -> Dict[str, str]:
     """find and import all valid archivebox_<subcommand>.py files in CLI_DIR"""
 
@@ -79,6 +114,9 @@ def run_subcommand(subcommand: str,
     module = import_module('.archivebox_{}'.format(subcommand), __package__)
     module.main(args=subcommand_args, stdin=stdin, pwd=pwd)    # type: ignore
 
+    # wait for webhooks, signals, and other background jobs to finish before exit
+    wait_for_bg_threads_to_exit(timeout=60)
+
 
 SUBCOMMANDS = list_subcommands()