Prechádzať zdrojové kódy

rename system_tasks queue to commands queue

Nick Sweeting 1 rok pred
rodič
commit
b61f6ff8d8

+ 2 - 2
archivebox/core/settings.py

@@ -267,7 +267,7 @@ if not IS_GETTING_VERSION_OR_HELP:             # dont create queue.sqlite3 file
     HUEY = {
     HUEY = {
         "huey_class": "huey.SqliteHuey",
         "huey_class": "huey.SqliteHuey",
         "filename": CONSTANTS.QUEUE_DATABASE_FILENAME,
         "filename": CONSTANTS.QUEUE_DATABASE_FILENAME,
-        "name": "system_tasks",
+        "name": "commands",
         "results": True,
         "results": True,
         "store_none": True,
         "store_none": True,
         "immediate": False,
         "immediate": False,
@@ -288,7 +288,7 @@ if not IS_GETTING_VERSION_OR_HELP:             # dont create queue.sqlite3 file
     # https://huey.readthedocs.io/en/latest/contrib.html#setting-things-up
     # https://huey.readthedocs.io/en/latest/contrib.html#setting-things-up
     # https://github.com/gaiacoop/django-huey
     # https://github.com/gaiacoop/django-huey
     DJANGO_HUEY = {
     DJANGO_HUEY = {
-        "default": "system_tasks",
+        "default": "commands",
         "queues": {
         "queues": {
             HUEY["name"]: HUEY.copy(),
             HUEY["name"]: HUEY.copy(),
             # more registered here at plugin import-time by BaseQueue.register()
             # more registered here at plugin import-time by BaseQueue.register()

+ 73 - 67
archivebox/queues/supervisor_util.py

@@ -26,6 +26,23 @@ CONFIG_FILE_NAME = "supervisord.conf"
 PID_FILE_NAME = "supervisord.pid"
 PID_FILE_NAME = "supervisord.pid"
 WORKERS_DIR_NAME = "workers"
 WORKERS_DIR_NAME = "workers"
 
 
+SCHEDULER_WORKER = {
+    "name": "worker_scheduler",
+    "command": "archivebox manage djangohuey --queue system_tasks -w 4 -k thread --disable-health-check --flush-locks",
+    "autostart": "true",
+    "autorestart": "true",
+    "stdout_logfile": "logs/worker_scheduler.log",
+    "redirect_stderr": "true",
+}
+COMMAND_WORKER = {
+    "name": "worker_commands",
+    "command": "archivebox manage djangohuey --queue commands -w 4 -k thread --no-periodic --disable-health-check",
+    "autostart": "true",
+    "autorestart": "true",
+    "stdout_logfile": "logs/worker_commands.log",
+    "redirect_stderr": "true",
+}
+
 @cache
 @cache
 def get_sock_file():
 def get_sock_file():
     """Get the path to the supervisord socket file, symlinking to a shorter path if needed due to unix path length limits"""
     """Get the path to the supervisord socket file, symlinking to a shorter path if needed due to unix path length limits"""
@@ -84,33 +101,35 @@ files = {WORKERS_DIR}/*.conf
 
 
 """
 """
     CONFIG_FILE.write_text(config_content)
     CONFIG_FILE.write_text(config_content)
-    Path.mkdir(WORKERS_DIR, exist_ok=True)
+    Path.mkdir(WORKERS_DIR, exist_ok=True, parents=True)
+    
     (WORKERS_DIR / 'initial_startup.conf').write_text('')   # hides error about "no files found to include" when supervisord starts
     (WORKERS_DIR / 'initial_startup.conf').write_text('')   # hides error about "no files found to include" when supervisord starts
 
 
 def create_worker_config(daemon):
 def create_worker_config(daemon):
+    """Create a supervisord worker config file for a given daemon"""
     SOCK_FILE = get_sock_file()
     SOCK_FILE = get_sock_file()
     WORKERS_DIR = SOCK_FILE.parent / WORKERS_DIR_NAME
     WORKERS_DIR = SOCK_FILE.parent / WORKERS_DIR_NAME
     
     
-    Path.mkdir(WORKERS_DIR, exist_ok=True)
+    Path.mkdir(WORKERS_DIR, exist_ok=True, parents=True)
     
     
     name = daemon['name']
     name = daemon['name']
-    configfile = WORKERS_DIR / f"{name}.conf"
+    worker_conf = WORKERS_DIR / f"{name}.conf"
 
 
-    config_content = f"[program:{name}]\n"
+    worker_str = f"[program:{name}]\n"
     for key, value in daemon.items():
     for key, value in daemon.items():
         if key == 'name':
         if key == 'name':
             continue
             continue
-        config_content += f"{key}={value}\n"
-    config_content += "\n"
+        worker_str += f"{key}={value}\n"
+    worker_str += "\n"
 
 
-    configfile.write_text(config_content)
+    worker_conf.write_text(worker_str)
 
 
 
 
 def get_existing_supervisord_process():
 def get_existing_supervisord_process():
     SOCK_FILE = get_sock_file()
     SOCK_FILE = get_sock_file()
     try:
     try:
         transport = SupervisorTransport(None, None, f"unix://{SOCK_FILE}")
         transport = SupervisorTransport(None, None, f"unix://{SOCK_FILE}")
-        server = ServerProxy("http://localhost", transport=transport)
+        server = ServerProxy("http://localhost", transport=transport)       # user:pass@localhost doesn't work for some reason with unix://.sock, cant seem to silence CRIT no-auth warning
         current_state = cast(Dict[str, int | str], server.supervisor.getState())
         current_state = cast(Dict[str, int | str], server.supervisor.getState())
         if current_state["statename"] == "RUNNING":
         if current_state["statename"] == "RUNNING":
             pid = server.supervisor.getPID()
             pid = server.supervisor.getPID()
@@ -127,6 +146,7 @@ def stop_existing_supervisord_process():
     PID_FILE = SOCK_FILE.parent / PID_FILE_NAME
     PID_FILE = SOCK_FILE.parent / PID_FILE_NAME
     
     
     try:
     try:
+        # if pid file exists, load PID int
         try:
         try:
             pid = int(PID_FILE.read_text())
             pid = int(PID_FILE.read_text())
         except (FileNotFoundError, ValueError):
         except (FileNotFoundError, ValueError):
@@ -136,15 +156,15 @@ def stop_existing_supervisord_process():
             print(f"[🦸‍♂️] Stopping supervisord process (pid={pid})...")
             print(f"[🦸‍♂️] Stopping supervisord process (pid={pid})...")
             proc = psutil.Process(pid)
             proc = psutil.Process(pid)
             proc.terminate()
             proc.terminate()
-            proc.wait()
-        except (Exception, BrokenPipeError, IOError):
+            proc.wait(timeout=5)
+        except (BaseException, BrokenPipeError, IOError, KeyboardInterrupt):
             pass
             pass
     finally:
     finally:
         try:
         try:
             # clear PID file and socket file
             # clear PID file and socket file
             PID_FILE.unlink(missing_ok=True)
             PID_FILE.unlink(missing_ok=True)
             get_sock_file().unlink(missing_ok=True)
             get_sock_file().unlink(missing_ok=True)
-        except Exception:
+        except BaseException:
             pass
             pass
 
 
 def start_new_supervisord_process(daemonize=False):
 def start_new_supervisord_process(daemonize=False):
@@ -278,24 +298,36 @@ def start_worker(supervisor, daemon, lazy=False):
     raise Exception(f"Failed to start worker {daemon['name']}! Only found: {procs}")
     raise Exception(f"Failed to start worker {daemon['name']}! Only found: {procs}")
 
 
 
 
-def watch_worker(supervisor, daemon_name, interval=5):
-    """loop continuously and monitor worker's health"""
-    while True:
-        proc = get_worker(supervisor, daemon_name)
-        if not proc:
-            raise Exception("Worker dissapeared while running! " + daemon_name)
+def get_worker(supervisor, daemon_name):
+    try:
+        return supervisor.getProcessInfo(daemon_name)
+    except Exception:
+        pass
+    return None
+
+def stop_worker(supervisor, daemon_name):
+    proc = get_worker(supervisor, daemon_name)
 
 
+    for _ in range(10):
+        if not proc:
+            # worker does not exist (was never running or configured in the first place)
+            return True
+        
+        # See process state diagram here: http://supervisord.org/subprocess.html
         if proc['statename'] == 'STOPPED':
         if proc['statename'] == 'STOPPED':
-            return proc
+            # worker was configured but has already stopped for some reason
+            supervisor.removeProcessGroup(daemon_name)
+            return True
+        else:
+            # worker was configured and is running, stop it now
+            supervisor.stopProcessGroup(daemon_name)
 
 
-        if proc['statename'] == 'RUNNING':
-            time.sleep(1)
-            continue
+        # wait 500ms and then re-check to make sure it's really stopped
+        time.sleep(0.5)
+        proc = get_worker(supervisor, daemon_name)
+
+    raise Exception(f"Failed to stop worker {daemon_name}!")
 
 
-        if proc['statename'] in ('STARTING', 'BACKOFF', 'FATAL', 'EXITED', 'STOPPING'):
-            print(f'[🦸‍♂️] WARNING: Worker {daemon_name} {proc["statename"]} {proc["description"]}')
-            time.sleep(interval)
-            continue
 
 
 def tail_worker_logs(log_path: str):
 def tail_worker_logs(log_path: str):
     get_or_create_supervisord_process(daemonize=False)
     get_or_create_supervisord_process(daemonize=False)
@@ -319,36 +351,24 @@ def tail_worker_logs(log_path: str):
     except SystemExit:
     except SystemExit:
         pass
         pass
 
 
-def get_worker(supervisor, daemon_name):
-    try:
-        return supervisor.getProcessInfo(daemon_name)
-    except Exception:
-        pass
-    return None
-
-def stop_worker(supervisor, daemon_name):
-    proc = get_worker(supervisor, daemon_name)
-
-    for _ in range(10):
+def watch_worker(supervisor, daemon_name, interval=5):
+    """loop continuously and monitor worker's health"""
+    while True:
+        proc = get_worker(supervisor, daemon_name)
         if not proc:
         if not proc:
-            # worker does not exist (was never running or configured in the first place)
-            return True
-        
-        # See process state diagram here: http://supervisord.org/subprocess.html
-        if proc['statename'] == 'STOPPED':
-            # worker was configured but has already stopped for some reason
-            supervisor.removeProcessGroup(daemon_name)
-            return True
-        else:
-            # worker was configured and is running, stop it now
-            supervisor.stopProcessGroup(daemon_name)
+            raise Exception("Worker dissapeared while running! " + daemon_name)
 
 
-        # wait 500ms and then re-check to make sure it's really stopped
-        time.sleep(0.5)
-        proc = get_worker(supervisor, daemon_name)
+        if proc['statename'] == 'STOPPED':
+            return proc
 
 
-    raise Exception(f"Failed to stop worker {daemon_name}!")
+        if proc['statename'] == 'RUNNING':
+            time.sleep(1)
+            continue
 
 
+        if proc['statename'] in ('STARTING', 'BACKOFF', 'FATAL', 'EXITED', 'STOPPING'):
+            print(f'[🦸‍♂️] WARNING: Worker {daemon_name} {proc["statename"]} {proc["description"]}')
+            time.sleep(interval)
+            continue
 
 
 
 
 
 
@@ -356,22 +376,8 @@ def start_server_workers(host='0.0.0.0', port='8000', daemonize=False):
     supervisor = get_or_create_supervisord_process(daemonize=daemonize)
     supervisor = get_or_create_supervisord_process(daemonize=daemonize)
     
     
     bg_workers = [
     bg_workers = [
-        {
-            "name": "worker_scheduler",
-            "command": "archivebox manage djangohuey --queue system_tasks -w 4 -k thread --disable-health-check --flush-locks",
-            "autostart": "true",
-            "autorestart": "true",
-            "stdout_logfile": "logs/worker_scheduler.log",
-            "redirect_stderr": "true",
-        },
-        {
-            "name": "worker_system_tasks",
-            "command": "archivebox manage djangohuey --queue system_tasks -w 4 -k thread --no-periodic --disable-health-check",
-            "autostart": "true",
-            "autorestart": "true",
-            "stdout_logfile": "logs/worker_system_tasks.log",
-            "redirect_stderr": "true",
-        },
+        SCHEDULER_WORKER,
+        COMMAND_WORKER,
     ]
     ]
     fg_worker = {
     fg_worker = {
         "name": "worker_daphne",
         "name": "worker_daphne",

+ 6 - 6
archivebox/queues/tasks.py

@@ -1,7 +1,7 @@
 __package__ = 'archivebox.queues'
 __package__ = 'archivebox.queues'
 
 
 from functools import wraps
 from functools import wraps
-from django.utils import timezone
+# from django.utils import timezone
 
 
 from django_huey import db_task, task
 from django_huey import db_task, task
 
 
@@ -10,7 +10,7 @@ from huey_monitor.tqdm import ProcessInfo
 
 
 from .supervisor_util import get_or_create_supervisord_process
 from .supervisor_util import get_or_create_supervisord_process
 
 
-# @db_task(queue="system_tasks", context=True, schedule=1)
+# @db_task(queue="commands", context=True, schedule=1)
 # def scheduler_tick():
 # def scheduler_tick():
 #     print('SCHEDULER TICK', timezone.now().isoformat())
 #     print('SCHEDULER TICK', timezone.now().isoformat())
 #     # abx.archivebox.events.on_scheduler_runloop_start(timezone.now(), machine=Machine.objects.get_current_machine())
 #     # abx.archivebox.events.on_scheduler_runloop_start(timezone.now(), machine=Machine.objects.get_current_machine())
@@ -43,7 +43,7 @@ def db_task_with_parent(func):
     
     
     return wrapper
     return wrapper
 
 
-@db_task(queue="system_tasks", context=True)
+@db_task(queue="commands", context=True)
 def bg_add(add_kwargs, task=None, parent_task_id=None):
 def bg_add(add_kwargs, task=None, parent_task_id=None):
     get_or_create_supervisord_process(daemonize=False)
     get_or_create_supervisord_process(daemonize=False)
     
     
@@ -62,7 +62,7 @@ def bg_add(add_kwargs, task=None, parent_task_id=None):
     return result
     return result
 
 
 
 
-@task(queue="system_tasks", context=True)
+@task(queue="commands", context=True)
 def bg_archive_links(args, kwargs=None, task=None, parent_task_id=None):
 def bg_archive_links(args, kwargs=None, task=None, parent_task_id=None):
     get_or_create_supervisord_process(daemonize=False)
     get_or_create_supervisord_process(daemonize=False)
     
     
@@ -83,7 +83,7 @@ def bg_archive_links(args, kwargs=None, task=None, parent_task_id=None):
     return result
     return result
 
 
 
 
-@task(queue="system_tasks", context=True)
+@task(queue="commands", context=True)
 def bg_archive_link(args, kwargs=None,task=None, parent_task_id=None):
 def bg_archive_link(args, kwargs=None,task=None, parent_task_id=None):
     get_or_create_supervisord_process(daemonize=False)
     get_or_create_supervisord_process(daemonize=False)
     
     
@@ -104,7 +104,7 @@ def bg_archive_link(args, kwargs=None,task=None, parent_task_id=None):
     return result
     return result
 
 
 
 
-@task(queue="system_tasks", context=True)
+@task(queue="commands", context=True)
 def bg_archive_snapshot(snapshot, overwrite=False, methods=None, task=None, parent_task_id=None):
 def bg_archive_snapshot(snapshot, overwrite=False, methods=None, task=None, parent_task_id=None):
     # get_or_create_supervisord_process(daemonize=False)
     # get_or_create_supervisord_process(daemonize=False)