| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265 |
- #!/usr/bin/env python3
- """
- archivebox extract [snapshot_ids...] [--plugins=NAMES]
- Run plugins on Snapshots. Accepts snapshot IDs as arguments, from stdin, or via JSONL.
- Input formats:
- - Snapshot UUIDs (one per line)
- - JSONL: {"type": "Snapshot", "id": "...", "url": "..."}
- - JSONL: {"type": "ArchiveResult", "snapshot_id": "...", "plugin": "..."}
- Output (JSONL):
- {"type": "ArchiveResult", "id": "...", "snapshot_id": "...", "plugin": "...", "status": "..."}
- Examples:
- # Extract specific snapshot
- archivebox extract 01234567-89ab-cdef-0123-456789abcdef
- # Pipe from snapshot command
- archivebox snapshot https://example.com | archivebox extract
- # Run specific plugins only
- archivebox extract --plugins=screenshot,singlefile 01234567-89ab-cdef-0123-456789abcdef
- # Chain commands
- archivebox crawl https://example.com | archivebox snapshot | archivebox extract
- """
- __package__ = 'archivebox.cli'
- __command__ = 'archivebox extract'
- import sys
- from typing import Optional, List
- import rich_click as click
- def process_archiveresult_by_id(archiveresult_id: str) -> int:
- """
- Run extraction for a single ArchiveResult by ID (used by workers).
- Triggers the ArchiveResult's state machine tick() to run the extractor plugin.
- """
- from rich import print as rprint
- from archivebox.core.models import ArchiveResult
- try:
- archiveresult = ArchiveResult.objects.get(id=archiveresult_id)
- except ArchiveResult.DoesNotExist:
- rprint(f'[red]ArchiveResult {archiveresult_id} not found[/red]', file=sys.stderr)
- return 1
- rprint(f'[blue]Extracting {archiveresult.plugin} for {archiveresult.snapshot.url}[/blue]', file=sys.stderr)
- try:
- # Trigger state machine tick - this runs the actual extraction
- archiveresult.sm.tick()
- archiveresult.refresh_from_db()
- if archiveresult.status == ArchiveResult.StatusChoices.SUCCEEDED:
- print(f'[green]Extraction succeeded: {archiveresult.output_str}[/green]')
- return 0
- elif archiveresult.status == ArchiveResult.StatusChoices.FAILED:
- print(f'[red]Extraction failed: {archiveresult.output_str}[/red]', file=sys.stderr)
- return 1
- else:
- # Still in progress or backoff - not a failure
- print(f'[yellow]Extraction status: {archiveresult.status}[/yellow]')
- return 0
- except Exception as e:
- print(f'[red]Extraction error: {type(e).__name__}: {e}[/red]', file=sys.stderr)
- return 1
- def run_plugins(
- args: tuple,
- plugins: str = '',
- wait: bool = True,
- ) -> int:
- """
- Run plugins on Snapshots from input.
- Reads Snapshot IDs or JSONL from args/stdin, runs plugins, outputs JSONL.
- Exit codes:
- 0: Success
- 1: Failure
- """
- from rich import print as rprint
- from django.utils import timezone
- from archivebox.misc.jsonl import (
- read_args_or_stdin, write_record,
- TYPE_SNAPSHOT, TYPE_ARCHIVERESULT
- )
- from archivebox.core.models import Snapshot, ArchiveResult
- from archivebox.workers.orchestrator import Orchestrator
- is_tty = sys.stdout.isatty()
- # Parse comma-separated plugins list once (reused in creation and filtering)
- plugins_list = [p.strip() for p in plugins.split(',') if p.strip()] if plugins else []
- # Collect all input records
- records = list(read_args_or_stdin(args))
- if not records:
- rprint('[yellow]No snapshots provided. Pass snapshot IDs as arguments or via stdin.[/yellow]', file=sys.stderr)
- return 1
- # Gather snapshot IDs to process
- snapshot_ids = set()
- for record in records:
- record_type = record.get('type')
- if record_type == TYPE_SNAPSHOT:
- snapshot_id = record.get('id')
- if snapshot_id:
- snapshot_ids.add(snapshot_id)
- elif record.get('url'):
- # Look up by URL (get most recent if multiple exist)
- snap = Snapshot.objects.filter(url=record['url']).order_by('-created_at').first()
- if snap:
- snapshot_ids.add(str(snap.id))
- else:
- rprint(f'[yellow]Snapshot not found for URL: {record["url"]}[/yellow]', file=sys.stderr)
- elif record_type == TYPE_ARCHIVERESULT:
- snapshot_id = record.get('snapshot_id')
- if snapshot_id:
- snapshot_ids.add(snapshot_id)
- elif 'id' in record:
- # Assume it's a snapshot ID
- snapshot_ids.add(record['id'])
- if not snapshot_ids:
- rprint('[red]No valid snapshot IDs found in input[/red]', file=sys.stderr)
- return 1
- # Get snapshots and ensure they have pending ArchiveResults
- processed_count = 0
- for snapshot_id in snapshot_ids:
- try:
- snapshot = Snapshot.objects.get(id=snapshot_id)
- except Snapshot.DoesNotExist:
- rprint(f'[yellow]Snapshot {snapshot_id} not found[/yellow]', file=sys.stderr)
- continue
- # Create pending ArchiveResults if needed
- if plugins_list:
- # Only create for specific plugins
- for plugin_name in plugins_list:
- result, created = ArchiveResult.objects.get_or_create(
- snapshot=snapshot,
- plugin=plugin_name,
- defaults={
- 'status': ArchiveResult.StatusChoices.QUEUED,
- 'retry_at': timezone.now(),
- }
- )
- if not created and result.status in [ArchiveResult.StatusChoices.FAILED, ArchiveResult.StatusChoices.SKIPPED]:
- # Reset for retry
- result.status = ArchiveResult.StatusChoices.QUEUED
- result.retry_at = timezone.now()
- result.save()
- else:
- # Create all pending plugins
- snapshot.create_pending_archiveresults()
- # Reset snapshot status to allow processing
- if snapshot.status == Snapshot.StatusChoices.SEALED:
- snapshot.status = Snapshot.StatusChoices.STARTED
- snapshot.retry_at = timezone.now()
- snapshot.save()
- processed_count += 1
- if processed_count == 0:
- rprint('[red]No snapshots to process[/red]', file=sys.stderr)
- return 1
- rprint(f'[blue]Queued {processed_count} snapshots for extraction[/blue]', file=sys.stderr)
- # Run orchestrator if --wait (default)
- if wait:
- rprint('[blue]Running plugins...[/blue]', file=sys.stderr)
- orchestrator = Orchestrator(exit_on_idle=True)
- orchestrator.runloop()
- # Output results as JSONL (when piped) or human-readable (when TTY)
- for snapshot_id in snapshot_ids:
- try:
- snapshot = Snapshot.objects.get(id=snapshot_id)
- results = snapshot.archiveresult_set.all()
- if plugins_list:
- results = results.filter(plugin__in=plugins_list)
- for result in results:
- if is_tty:
- status_color = {
- 'succeeded': 'green',
- 'failed': 'red',
- 'skipped': 'yellow',
- }.get(result.status, 'dim')
- rprint(f' [{status_color}]{result.status}[/{status_color}] {result.plugin} → {result.output_str or ""}', file=sys.stderr)
- else:
- write_record(result.to_jsonl())
- except Snapshot.DoesNotExist:
- continue
- return 0
- def is_archiveresult_id(value: str) -> bool:
- """Check if value looks like an ArchiveResult UUID."""
- import re
- uuid_pattern = re.compile(r'^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$', re.I)
- if not uuid_pattern.match(value):
- return False
- # Verify it's actually an ArchiveResult (not a Snapshot or other object)
- from archivebox.core.models import ArchiveResult
- return ArchiveResult.objects.filter(id=value).exists()
- @click.command()
- @click.option('--plugins', '-p', default='', help='Comma-separated list of plugins to run (e.g., screenshot,singlefile)')
- @click.option('--wait/--no-wait', default=True, help='Wait for plugins to complete (default: wait)')
- @click.argument('args', nargs=-1)
- def main(plugins: str, wait: bool, args: tuple):
- """Run plugins on Snapshots, or process existing ArchiveResults by ID"""
- from archivebox.misc.jsonl import read_args_or_stdin
- # Read all input
- records = list(read_args_or_stdin(args))
- if not records:
- from rich import print as rprint
- rprint('[yellow]No Snapshot IDs or ArchiveResult IDs provided. Pass as arguments or via stdin.[/yellow]', file=sys.stderr)
- sys.exit(1)
- # Check if input looks like existing ArchiveResult IDs to process
- all_are_archiveresult_ids = all(
- is_archiveresult_id(r.get('id') or r.get('url', ''))
- for r in records
- )
- if all_are_archiveresult_ids:
- # Process existing ArchiveResults by ID
- exit_code = 0
- for record in records:
- archiveresult_id = record.get('id') or record.get('url')
- result = process_archiveresult_by_id(archiveresult_id)
- if result != 0:
- exit_code = result
- sys.exit(exit_code)
- else:
- # Default behavior: run plugins on Snapshots from input
- sys.exit(run_plugins(args, plugins=plugins, wait=wait))
- if __name__ == '__main__':
- main()
|