| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320 |
- #!/usr/bin/env python3
- """
- archivebox run [--daemon] [--crawl-id=...] [--snapshot-id=...]
- Unified command for processing queued work.
- Modes:
- - With stdin JSONL: Process piped records, exit when complete
- - Without stdin (TTY): Run orchestrator in foreground until killed
- - --crawl-id: Run orchestrator for specific crawl only
- - --snapshot-id: Run worker for specific snapshot only (internal use)
- Examples:
- # Run orchestrator in foreground
- archivebox run
- # Run as daemon (don't exit on idle)
- archivebox run --daemon
- # Process specific records (pipe any JSONL type, exits when done)
- archivebox snapshot list --status=queued | archivebox run
- archivebox archiveresult list --status=failed | archivebox run
- archivebox crawl list --status=queued | archivebox run
- # Mixed types work too
- cat mixed_records.jsonl | archivebox run
- # Run orchestrator for specific crawl (shows live progress for that crawl)
- archivebox run --crawl-id=019b7e90-04d0-73ed-adec-aad9cfcd863e
- # Run worker for specific snapshot (internal use by orchestrator)
- archivebox run --snapshot-id=019b7e90-5a8e-712c-9877-2c70eebe80ad
- """
- __package__ = 'archivebox.cli'
- __command__ = 'archivebox run'
- import sys
- import rich_click as click
- from rich import print as rprint
- def process_stdin_records() -> int:
- """
- Process JSONL records from stdin.
- Create-or-update behavior:
- - Records WITHOUT id: Create via Model.from_json(), then queue
- - Records WITH id: Lookup existing, re-queue for processing
- Outputs JSONL of all processed records (for chaining).
- Handles any record type: Crawl, Snapshot, ArchiveResult.
- Auto-cascades: Crawl → Snapshots → ArchiveResults.
- Returns exit code (0 = success, 1 = error).
- """
- from django.utils import timezone
- from archivebox.misc.jsonl import read_stdin, write_record, TYPE_CRAWL, TYPE_SNAPSHOT, TYPE_ARCHIVERESULT, TYPE_BINARY
- from archivebox.base_models.models import get_or_create_system_user_pk
- from archivebox.core.models import Snapshot, ArchiveResult
- from archivebox.crawls.models import Crawl
- from archivebox.machine.models import Binary
- from archivebox.workers.orchestrator import Orchestrator
- records = list(read_stdin())
- is_tty = sys.stdout.isatty()
- if not records:
- return 0 # Nothing to process
- created_by_id = get_or_create_system_user_pk()
- queued_count = 0
- output_records = []
- for record in records:
- record_type = record.get('type', '')
- record_id = record.get('id')
- try:
- if record_type == TYPE_CRAWL:
- if record_id:
- # Existing crawl - re-queue
- try:
- crawl = Crawl.objects.get(id=record_id)
- except Crawl.DoesNotExist:
- crawl = Crawl.from_json(record, overrides={'created_by_id': created_by_id})
- else:
- # New crawl - create it
- crawl = Crawl.from_json(record, overrides={'created_by_id': created_by_id})
- if crawl:
- crawl.retry_at = timezone.now()
- if crawl.status not in [Crawl.StatusChoices.SEALED]:
- crawl.status = Crawl.StatusChoices.QUEUED
- crawl.save()
- output_records.append(crawl.to_json())
- queued_count += 1
- elif record_type == TYPE_SNAPSHOT or (record.get('url') and not record_type):
- if record_id:
- # Existing snapshot - re-queue
- try:
- snapshot = Snapshot.objects.get(id=record_id)
- except Snapshot.DoesNotExist:
- snapshot = Snapshot.from_json(record, overrides={'created_by_id': created_by_id})
- else:
- # New snapshot - create it
- snapshot = Snapshot.from_json(record, overrides={'created_by_id': created_by_id})
- if snapshot:
- snapshot.retry_at = timezone.now()
- if snapshot.status not in [Snapshot.StatusChoices.SEALED]:
- snapshot.status = Snapshot.StatusChoices.QUEUED
- snapshot.save()
- output_records.append(snapshot.to_json())
- queued_count += 1
- elif record_type == TYPE_ARCHIVERESULT:
- if record_id:
- # Existing archiveresult - re-queue
- try:
- archiveresult = ArchiveResult.objects.get(id=record_id)
- except ArchiveResult.DoesNotExist:
- archiveresult = ArchiveResult.from_json(record)
- else:
- # New archiveresult - create it
- archiveresult = ArchiveResult.from_json(record)
- if archiveresult:
- archiveresult.retry_at = timezone.now()
- if archiveresult.status in [ArchiveResult.StatusChoices.FAILED, ArchiveResult.StatusChoices.SKIPPED, ArchiveResult.StatusChoices.BACKOFF]:
- archiveresult.status = ArchiveResult.StatusChoices.QUEUED
- archiveresult.save()
- output_records.append(archiveresult.to_json())
- queued_count += 1
- elif record_type == TYPE_BINARY:
- # Binary records - create or update and queue for installation
- if record_id:
- # Existing binary - re-queue
- try:
- binary = Binary.objects.get(id=record_id)
- except Binary.DoesNotExist:
- binary = Binary.from_json(record)
- else:
- # New binary - create it
- binary = Binary.from_json(record)
- if binary:
- binary.retry_at = timezone.now()
- if binary.status != Binary.StatusChoices.INSTALLED:
- binary.status = Binary.StatusChoices.QUEUED
- binary.save()
- output_records.append(binary.to_json())
- queued_count += 1
- else:
- # Unknown type - pass through
- output_records.append(record)
- except Exception as e:
- rprint(f'[yellow]Error processing record: {e}[/yellow]', file=sys.stderr)
- continue
- # Output all processed records (for chaining)
- if not is_tty:
- for rec in output_records:
- write_record(rec)
- if queued_count == 0:
- rprint('[yellow]No records to process[/yellow]', file=sys.stderr)
- return 0
- rprint(f'[blue]Processing {queued_count} records...[/blue]', file=sys.stderr)
- # Run orchestrator until all queued work is done
- orchestrator = Orchestrator(exit_on_idle=True)
- orchestrator.runloop()
- return 0
- def run_orchestrator(daemon: bool = False) -> int:
- """
- Run the orchestrator process.
- The orchestrator:
- 1. Polls each model queue (Crawl, Snapshot, ArchiveResult)
- 2. Spawns worker processes when there is work to do
- 3. Monitors worker health and restarts failed workers
- 4. Exits when all queues are empty (unless --daemon)
- Args:
- daemon: Run forever (don't exit when idle)
- Returns exit code (0 = success, 1 = error).
- """
- from archivebox.workers.orchestrator import Orchestrator
- if Orchestrator.is_running():
- rprint('[yellow]Orchestrator is already running[/yellow]', file=sys.stderr)
- return 0
- try:
- orchestrator = Orchestrator(exit_on_idle=not daemon)
- orchestrator.runloop()
- return 0
- except KeyboardInterrupt:
- return 0
- except Exception as e:
- rprint(f'[red]Orchestrator error: {type(e).__name__}: {e}[/red]', file=sys.stderr)
- return 1
- def run_snapshot_worker(snapshot_id: str) -> int:
- """
- Run a SnapshotWorker for a specific snapshot.
- Args:
- snapshot_id: Snapshot UUID to process
- Returns exit code (0 = success, 1 = error).
- """
- from archivebox.workers.worker import _run_snapshot_worker
- try:
- _run_snapshot_worker(snapshot_id=snapshot_id, worker_id=0)
- return 0
- except KeyboardInterrupt:
- return 0
- except Exception as e:
- rprint(f'[red]Worker error: {type(e).__name__}: {e}[/red]', file=sys.stderr)
- import traceback
- traceback.print_exc()
- return 1
- @click.command()
- @click.option('--daemon', '-d', is_flag=True, help="Run forever (don't exit on idle)")
- @click.option('--crawl-id', help="Run orchestrator for specific crawl only")
- @click.option('--snapshot-id', help="Run worker for specific snapshot only")
- @click.option('--binary-id', help="Run worker for specific binary only")
- @click.option('--worker-type', help="Run worker of specific type (binary)")
- def main(daemon: bool, crawl_id: str, snapshot_id: str, binary_id: str, worker_type: str):
- """
- Process queued work.
- Modes:
- - No args + stdin piped: Process piped JSONL records
- - No args + TTY: Run orchestrator for all work
- - --crawl-id: Run orchestrator for that crawl only
- - --snapshot-id: Run worker for that snapshot only
- - --binary-id: Run worker for that binary only
- """
- # Snapshot worker mode
- if snapshot_id:
- sys.exit(run_snapshot_worker(snapshot_id))
- # Binary worker mode (specific binary)
- if binary_id:
- from archivebox.workers.worker import BinaryWorker
- try:
- worker = BinaryWorker(binary_id=binary_id, worker_id=0)
- worker.runloop()
- sys.exit(0)
- except KeyboardInterrupt:
- sys.exit(0)
- except Exception as e:
- rprint(f'[red]Worker error: {type(e).__name__}: {e}[/red]', file=sys.stderr)
- import traceback
- traceback.print_exc()
- sys.exit(1)
- # Worker type mode (daemon - processes all pending items)
- if worker_type:
- if worker_type == 'binary':
- from archivebox.workers.worker import BinaryWorker
- try:
- worker = BinaryWorker(worker_id=0) # No binary_id = daemon mode
- worker.runloop()
- sys.exit(0)
- except KeyboardInterrupt:
- sys.exit(0)
- except Exception as e:
- rprint(f'[red]Worker error: {type(e).__name__}: {e}[/red]', file=sys.stderr)
- import traceback
- traceback.print_exc()
- sys.exit(1)
- else:
- rprint(f'[red]Unknown worker type: {worker_type}[/red]', file=sys.stderr)
- sys.exit(1)
- # Crawl worker mode
- if crawl_id:
- from archivebox.workers.worker import CrawlWorker
- try:
- worker = CrawlWorker(crawl_id=crawl_id, worker_id=0)
- worker.runloop()
- sys.exit(0)
- except KeyboardInterrupt:
- sys.exit(0)
- except Exception as e:
- rprint(f'[red]Worker error: {type(e).__name__}: {e}[/red]', file=sys.stderr)
- import traceback
- traceback.print_exc()
- sys.exit(1)
- # Check if stdin has data (non-TTY means piped input)
- if not sys.stdin.isatty():
- sys.exit(process_stdin_records())
- else:
- sys.exit(run_orchestrator(daemon=daemon))
- if __name__ == '__main__':
- main()
|