archivebox_run.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320
  1. #!/usr/bin/env python3
  2. """
  3. archivebox run [--daemon] [--crawl-id=...] [--snapshot-id=...]
  4. Unified command for processing queued work.
  5. Modes:
  6. - With stdin JSONL: Process piped records, exit when complete
  7. - Without stdin (TTY): Run orchestrator in foreground until killed
  8. - --crawl-id: Run orchestrator for specific crawl only
  9. - --snapshot-id: Run worker for specific snapshot only (internal use)
  10. Examples:
  11. # Run orchestrator in foreground
  12. archivebox run
  13. # Run as daemon (don't exit on idle)
  14. archivebox run --daemon
  15. # Process specific records (pipe any JSONL type, exits when done)
  16. archivebox snapshot list --status=queued | archivebox run
  17. archivebox archiveresult list --status=failed | archivebox run
  18. archivebox crawl list --status=queued | archivebox run
  19. # Mixed types work too
  20. cat mixed_records.jsonl | archivebox run
  21. # Run orchestrator for specific crawl (shows live progress for that crawl)
  22. archivebox run --crawl-id=019b7e90-04d0-73ed-adec-aad9cfcd863e
  23. # Run worker for specific snapshot (internal use by orchestrator)
  24. archivebox run --snapshot-id=019b7e90-5a8e-712c-9877-2c70eebe80ad
  25. """
  26. __package__ = 'archivebox.cli'
  27. __command__ = 'archivebox run'
  28. import sys
  29. import rich_click as click
  30. from rich import print as rprint
  31. def process_stdin_records() -> int:
  32. """
  33. Process JSONL records from stdin.
  34. Create-or-update behavior:
  35. - Records WITHOUT id: Create via Model.from_json(), then queue
  36. - Records WITH id: Lookup existing, re-queue for processing
  37. Outputs JSONL of all processed records (for chaining).
  38. Handles any record type: Crawl, Snapshot, ArchiveResult.
  39. Auto-cascades: Crawl → Snapshots → ArchiveResults.
  40. Returns exit code (0 = success, 1 = error).
  41. """
  42. from django.utils import timezone
  43. from archivebox.misc.jsonl import read_stdin, write_record, TYPE_CRAWL, TYPE_SNAPSHOT, TYPE_ARCHIVERESULT, TYPE_BINARY
  44. from archivebox.base_models.models import get_or_create_system_user_pk
  45. from archivebox.core.models import Snapshot, ArchiveResult
  46. from archivebox.crawls.models import Crawl
  47. from archivebox.machine.models import Binary
  48. from archivebox.workers.orchestrator import Orchestrator
  49. records = list(read_stdin())
  50. is_tty = sys.stdout.isatty()
  51. if not records:
  52. return 0 # Nothing to process
  53. created_by_id = get_or_create_system_user_pk()
  54. queued_count = 0
  55. output_records = []
  56. for record in records:
  57. record_type = record.get('type', '')
  58. record_id = record.get('id')
  59. try:
  60. if record_type == TYPE_CRAWL:
  61. if record_id:
  62. # Existing crawl - re-queue
  63. try:
  64. crawl = Crawl.objects.get(id=record_id)
  65. except Crawl.DoesNotExist:
  66. crawl = Crawl.from_json(record, overrides={'created_by_id': created_by_id})
  67. else:
  68. # New crawl - create it
  69. crawl = Crawl.from_json(record, overrides={'created_by_id': created_by_id})
  70. if crawl:
  71. crawl.retry_at = timezone.now()
  72. if crawl.status not in [Crawl.StatusChoices.SEALED]:
  73. crawl.status = Crawl.StatusChoices.QUEUED
  74. crawl.save()
  75. output_records.append(crawl.to_json())
  76. queued_count += 1
  77. elif record_type == TYPE_SNAPSHOT or (record.get('url') and not record_type):
  78. if record_id:
  79. # Existing snapshot - re-queue
  80. try:
  81. snapshot = Snapshot.objects.get(id=record_id)
  82. except Snapshot.DoesNotExist:
  83. snapshot = Snapshot.from_json(record, overrides={'created_by_id': created_by_id})
  84. else:
  85. # New snapshot - create it
  86. snapshot = Snapshot.from_json(record, overrides={'created_by_id': created_by_id})
  87. if snapshot:
  88. snapshot.retry_at = timezone.now()
  89. if snapshot.status not in [Snapshot.StatusChoices.SEALED]:
  90. snapshot.status = Snapshot.StatusChoices.QUEUED
  91. snapshot.save()
  92. output_records.append(snapshot.to_json())
  93. queued_count += 1
  94. elif record_type == TYPE_ARCHIVERESULT:
  95. if record_id:
  96. # Existing archiveresult - re-queue
  97. try:
  98. archiveresult = ArchiveResult.objects.get(id=record_id)
  99. except ArchiveResult.DoesNotExist:
  100. archiveresult = ArchiveResult.from_json(record)
  101. else:
  102. # New archiveresult - create it
  103. archiveresult = ArchiveResult.from_json(record)
  104. if archiveresult:
  105. archiveresult.retry_at = timezone.now()
  106. if archiveresult.status in [ArchiveResult.StatusChoices.FAILED, ArchiveResult.StatusChoices.SKIPPED, ArchiveResult.StatusChoices.BACKOFF]:
  107. archiveresult.status = ArchiveResult.StatusChoices.QUEUED
  108. archiveresult.save()
  109. output_records.append(archiveresult.to_json())
  110. queued_count += 1
  111. elif record_type == TYPE_BINARY:
  112. # Binary records - create or update and queue for installation
  113. if record_id:
  114. # Existing binary - re-queue
  115. try:
  116. binary = Binary.objects.get(id=record_id)
  117. except Binary.DoesNotExist:
  118. binary = Binary.from_json(record)
  119. else:
  120. # New binary - create it
  121. binary = Binary.from_json(record)
  122. if binary:
  123. binary.retry_at = timezone.now()
  124. if binary.status != Binary.StatusChoices.INSTALLED:
  125. binary.status = Binary.StatusChoices.QUEUED
  126. binary.save()
  127. output_records.append(binary.to_json())
  128. queued_count += 1
  129. else:
  130. # Unknown type - pass through
  131. output_records.append(record)
  132. except Exception as e:
  133. rprint(f'[yellow]Error processing record: {e}[/yellow]', file=sys.stderr)
  134. continue
  135. # Output all processed records (for chaining)
  136. if not is_tty:
  137. for rec in output_records:
  138. write_record(rec)
  139. if queued_count == 0:
  140. rprint('[yellow]No records to process[/yellow]', file=sys.stderr)
  141. return 0
  142. rprint(f'[blue]Processing {queued_count} records...[/blue]', file=sys.stderr)
  143. # Run orchestrator until all queued work is done
  144. orchestrator = Orchestrator(exit_on_idle=True)
  145. orchestrator.runloop()
  146. return 0
  147. def run_orchestrator(daemon: bool = False) -> int:
  148. """
  149. Run the orchestrator process.
  150. The orchestrator:
  151. 1. Polls each model queue (Crawl, Snapshot, ArchiveResult)
  152. 2. Spawns worker processes when there is work to do
  153. 3. Monitors worker health and restarts failed workers
  154. 4. Exits when all queues are empty (unless --daemon)
  155. Args:
  156. daemon: Run forever (don't exit when idle)
  157. Returns exit code (0 = success, 1 = error).
  158. """
  159. from archivebox.workers.orchestrator import Orchestrator
  160. if Orchestrator.is_running():
  161. rprint('[yellow]Orchestrator is already running[/yellow]', file=sys.stderr)
  162. return 0
  163. try:
  164. orchestrator = Orchestrator(exit_on_idle=not daemon)
  165. orchestrator.runloop()
  166. return 0
  167. except KeyboardInterrupt:
  168. return 0
  169. except Exception as e:
  170. rprint(f'[red]Orchestrator error: {type(e).__name__}: {e}[/red]', file=sys.stderr)
  171. return 1
  172. def run_snapshot_worker(snapshot_id: str) -> int:
  173. """
  174. Run a SnapshotWorker for a specific snapshot.
  175. Args:
  176. snapshot_id: Snapshot UUID to process
  177. Returns exit code (0 = success, 1 = error).
  178. """
  179. from archivebox.workers.worker import _run_snapshot_worker
  180. try:
  181. _run_snapshot_worker(snapshot_id=snapshot_id, worker_id=0)
  182. return 0
  183. except KeyboardInterrupt:
  184. return 0
  185. except Exception as e:
  186. rprint(f'[red]Worker error: {type(e).__name__}: {e}[/red]', file=sys.stderr)
  187. import traceback
  188. traceback.print_exc()
  189. return 1
  190. @click.command()
  191. @click.option('--daemon', '-d', is_flag=True, help="Run forever (don't exit on idle)")
  192. @click.option('--crawl-id', help="Run orchestrator for specific crawl only")
  193. @click.option('--snapshot-id', help="Run worker for specific snapshot only")
  194. @click.option('--binary-id', help="Run worker for specific binary only")
  195. @click.option('--worker-type', help="Run worker of specific type (binary)")
  196. def main(daemon: bool, crawl_id: str, snapshot_id: str, binary_id: str, worker_type: str):
  197. """
  198. Process queued work.
  199. Modes:
  200. - No args + stdin piped: Process piped JSONL records
  201. - No args + TTY: Run orchestrator for all work
  202. - --crawl-id: Run orchestrator for that crawl only
  203. - --snapshot-id: Run worker for that snapshot only
  204. - --binary-id: Run worker for that binary only
  205. """
  206. # Snapshot worker mode
  207. if snapshot_id:
  208. sys.exit(run_snapshot_worker(snapshot_id))
  209. # Binary worker mode (specific binary)
  210. if binary_id:
  211. from archivebox.workers.worker import BinaryWorker
  212. try:
  213. worker = BinaryWorker(binary_id=binary_id, worker_id=0)
  214. worker.runloop()
  215. sys.exit(0)
  216. except KeyboardInterrupt:
  217. sys.exit(0)
  218. except Exception as e:
  219. rprint(f'[red]Worker error: {type(e).__name__}: {e}[/red]', file=sys.stderr)
  220. import traceback
  221. traceback.print_exc()
  222. sys.exit(1)
  223. # Worker type mode (daemon - processes all pending items)
  224. if worker_type:
  225. if worker_type == 'binary':
  226. from archivebox.workers.worker import BinaryWorker
  227. try:
  228. worker = BinaryWorker(worker_id=0) # No binary_id = daemon mode
  229. worker.runloop()
  230. sys.exit(0)
  231. except KeyboardInterrupt:
  232. sys.exit(0)
  233. except Exception as e:
  234. rprint(f'[red]Worker error: {type(e).__name__}: {e}[/red]', file=sys.stderr)
  235. import traceback
  236. traceback.print_exc()
  237. sys.exit(1)
  238. else:
  239. rprint(f'[red]Unknown worker type: {worker_type}[/red]', file=sys.stderr)
  240. sys.exit(1)
  241. # Crawl worker mode
  242. if crawl_id:
  243. from archivebox.workers.worker import CrawlWorker
  244. try:
  245. worker = CrawlWorker(crawl_id=crawl_id, worker_id=0)
  246. worker.runloop()
  247. sys.exit(0)
  248. except KeyboardInterrupt:
  249. sys.exit(0)
  250. except Exception as e:
  251. rprint(f'[red]Worker error: {type(e).__name__}: {e}[/red]', file=sys.stderr)
  252. import traceback
  253. traceback.print_exc()
  254. sys.exit(1)
  255. # Check if stdin has data (non-TTY means piped input)
  256. if not sys.stdin.isatty():
  257. sys.exit(process_stdin_records())
  258. else:
  259. sys.exit(run_orchestrator(daemon=daemon))
  260. if __name__ == '__main__':
  261. main()