| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133 |
- import os
- import signal
- import sqlite3
- import subprocess
- import sys
- import time
- from pathlib import Path
- def _run(cmd, data_dir: Path, env: dict, timeout: int = 120):
- return subprocess.run(
- cmd,
- cwd=data_dir,
- env=env,
- capture_output=True,
- text=True,
- timeout=timeout,
- )
- def _make_env(data_dir: Path) -> dict:
- env = os.environ.copy()
- env["DATA_DIR"] = str(data_dir)
- env["USE_COLOR"] = "False"
- env["SHOW_PROGRESS"] = "False"
- env["ARCHIVEBOX_ALLOW_NO_UNIX_SOCKETS"] = "true"
- env["PLUGINS"] = "title,favicon"
- # Keep it fast but still real hooks
- env["SAVE_TITLE"] = "True"
- env["SAVE_FAVICON"] = "True"
- env["SAVE_WGET"] = "False"
- env["SAVE_WARC"] = "False"
- env["SAVE_PDF"] = "False"
- env["SAVE_SCREENSHOT"] = "False"
- env["SAVE_DOM"] = "False"
- env["SAVE_SINGLEFILE"] = "False"
- env["SAVE_READABILITY"] = "False"
- env["SAVE_MERCURY"] = "False"
- env["SAVE_GIT"] = "False"
- env["SAVE_YTDLP"] = "False"
- env["SAVE_HEADERS"] = "False"
- env["SAVE_HTMLTOTEXT"] = "False"
- return env
- def _count_running_processes(db_path: Path, where: str) -> int:
- for _ in range(50):
- try:
- conn = sqlite3.connect(db_path, timeout=1)
- cur = conn.cursor()
- count = cur.execute(
- f"SELECT COUNT(*) FROM machine_process WHERE status = 'running' AND {where}"
- ).fetchone()[0]
- conn.close()
- return count
- except sqlite3.OperationalError:
- time.sleep(0.1)
- return 0
- def _wait_for_count(db_path: Path, where: str, target: int, timeout: int = 20) -> bool:
- start = time.time()
- while time.time() - start < timeout:
- if _count_running_processes(db_path, where) >= target:
- return True
- time.sleep(0.1)
- return False
- def test_add_parents_workers_to_orchestrator(tmp_path):
- data_dir = tmp_path / "data"
- data_dir.mkdir()
- env = _make_env(data_dir)
- init = _run([sys.executable, "-m", "archivebox", "init", "--quick"], data_dir, env)
- assert init.returncode == 0, init.stderr
- add = _run([sys.executable, "-m", "archivebox", "add", "https://example.com"], data_dir, env, timeout=120)
- assert add.returncode == 0, add.stderr
- conn = sqlite3.connect(data_dir / "index.sqlite3")
- cur = conn.cursor()
- orchestrator = cur.execute(
- "SELECT id FROM machine_process WHERE process_type = 'orchestrator' ORDER BY created_at DESC LIMIT 1"
- ).fetchone()
- assert orchestrator is not None
- orchestrator_id = orchestrator[0]
- worker_count = cur.execute(
- "SELECT COUNT(*) FROM machine_process WHERE process_type = 'worker' AND worker_type = 'crawl' "
- "AND parent_id = ?",
- (orchestrator_id,),
- ).fetchone()[0]
- conn.close()
- assert worker_count >= 1, "Expected crawl worker to be parented to orchestrator"
- def test_add_interrupt_cleans_orphaned_processes(tmp_path):
- data_dir = tmp_path / "data"
- data_dir.mkdir()
- env = _make_env(data_dir)
- init = _run([sys.executable, "-m", "archivebox", "init", "--quick"], data_dir, env)
- assert init.returncode == 0, init.stderr
- proc = subprocess.Popen(
- [sys.executable, "-m", "archivebox", "add", "https://example.com"],
- cwd=data_dir,
- env=env,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- text=True,
- )
- db_path = data_dir / "index.sqlite3"
- saw_worker = _wait_for_count(db_path, "process_type = 'worker'", 1, timeout=20)
- assert saw_worker, "Expected at least one worker to start before interrupt"
- proc.send_signal(signal.SIGINT)
- proc.wait(timeout=30)
- # Wait for workers/hooks to be cleaned up
- start = time.time()
- while time.time() - start < 30:
- running = _count_running_processes(db_path, "process_type IN ('worker','hook')")
- if running == 0:
- break
- time.sleep(0.2)
- assert _count_running_processes(db_path, "process_type IN ('worker','hook')") == 0, (
- "Expected no running worker/hook processes after interrupt"
- )
|