test_cli_add_interrupt.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. import os
  2. import signal
  3. import sqlite3
  4. import subprocess
  5. import sys
  6. import time
  7. from pathlib import Path
  8. def _run(cmd, data_dir: Path, env: dict, timeout: int = 120):
  9. return subprocess.run(
  10. cmd,
  11. cwd=data_dir,
  12. env=env,
  13. capture_output=True,
  14. text=True,
  15. timeout=timeout,
  16. )
  17. def _make_env(data_dir: Path) -> dict:
  18. env = os.environ.copy()
  19. env["DATA_DIR"] = str(data_dir)
  20. env["USE_COLOR"] = "False"
  21. env["SHOW_PROGRESS"] = "False"
  22. env["ARCHIVEBOX_ALLOW_NO_UNIX_SOCKETS"] = "true"
  23. env["PLUGINS"] = "title,favicon"
  24. # Keep it fast but still real hooks
  25. env["SAVE_TITLE"] = "True"
  26. env["SAVE_FAVICON"] = "True"
  27. env["SAVE_WGET"] = "False"
  28. env["SAVE_WARC"] = "False"
  29. env["SAVE_PDF"] = "False"
  30. env["SAVE_SCREENSHOT"] = "False"
  31. env["SAVE_DOM"] = "False"
  32. env["SAVE_SINGLEFILE"] = "False"
  33. env["SAVE_READABILITY"] = "False"
  34. env["SAVE_MERCURY"] = "False"
  35. env["SAVE_GIT"] = "False"
  36. env["SAVE_YTDLP"] = "False"
  37. env["SAVE_HEADERS"] = "False"
  38. env["SAVE_HTMLTOTEXT"] = "False"
  39. return env
  40. def _count_running_processes(db_path: Path, where: str) -> int:
  41. for _ in range(50):
  42. try:
  43. conn = sqlite3.connect(db_path, timeout=1)
  44. cur = conn.cursor()
  45. count = cur.execute(
  46. f"SELECT COUNT(*) FROM machine_process WHERE status = 'running' AND {where}"
  47. ).fetchone()[0]
  48. conn.close()
  49. return count
  50. except sqlite3.OperationalError:
  51. time.sleep(0.1)
  52. return 0
  53. def _wait_for_count(db_path: Path, where: str, target: int, timeout: int = 20) -> bool:
  54. start = time.time()
  55. while time.time() - start < timeout:
  56. if _count_running_processes(db_path, where) >= target:
  57. return True
  58. time.sleep(0.1)
  59. return False
  60. def test_add_parents_workers_to_orchestrator(tmp_path):
  61. data_dir = tmp_path / "data"
  62. data_dir.mkdir()
  63. env = _make_env(data_dir)
  64. init = _run([sys.executable, "-m", "archivebox", "init", "--quick"], data_dir, env)
  65. assert init.returncode == 0, init.stderr
  66. add = _run([sys.executable, "-m", "archivebox", "add", "https://example.com"], data_dir, env, timeout=120)
  67. assert add.returncode == 0, add.stderr
  68. conn = sqlite3.connect(data_dir / "index.sqlite3")
  69. cur = conn.cursor()
  70. orchestrator = cur.execute(
  71. "SELECT id FROM machine_process WHERE process_type = 'orchestrator' ORDER BY created_at DESC LIMIT 1"
  72. ).fetchone()
  73. assert orchestrator is not None
  74. orchestrator_id = orchestrator[0]
  75. worker_count = cur.execute(
  76. "SELECT COUNT(*) FROM machine_process WHERE process_type = 'worker' AND worker_type = 'crawl' "
  77. "AND parent_id = ?",
  78. (orchestrator_id,),
  79. ).fetchone()[0]
  80. conn.close()
  81. assert worker_count >= 1, "Expected crawl worker to be parented to orchestrator"
  82. def test_add_interrupt_cleans_orphaned_processes(tmp_path):
  83. data_dir = tmp_path / "data"
  84. data_dir.mkdir()
  85. env = _make_env(data_dir)
  86. init = _run([sys.executable, "-m", "archivebox", "init", "--quick"], data_dir, env)
  87. assert init.returncode == 0, init.stderr
  88. proc = subprocess.Popen(
  89. [sys.executable, "-m", "archivebox", "add", "https://example.com"],
  90. cwd=data_dir,
  91. env=env,
  92. stdout=subprocess.PIPE,
  93. stderr=subprocess.PIPE,
  94. text=True,
  95. )
  96. db_path = data_dir / "index.sqlite3"
  97. saw_worker = _wait_for_count(db_path, "process_type = 'worker'", 1, timeout=20)
  98. assert saw_worker, "Expected at least one worker to start before interrupt"
  99. proc.send_signal(signal.SIGINT)
  100. proc.wait(timeout=30)
  101. # Wait for workers/hooks to be cleaned up
  102. start = time.time()
  103. while time.time() - start < 30:
  104. running = _count_running_processes(db_path, "process_type IN ('worker','hook')")
  105. if running == 0:
  106. break
  107. time.sleep(0.2)
  108. assert _count_running_processes(db_path, "process_type IN ('worker','hook')") == 0, (
  109. "Expected no running worker/hook processes after interrupt"
  110. )