| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391 |
- #!/usr/bin/env python3
- """
- archivebox snapshot <action> [args...] [--filters]
- Manage Snapshot records.
- Actions:
- create - Create Snapshots from URLs or Crawl JSONL
- list - List Snapshots as JSONL (with optional filters)
- update - Update Snapshots from stdin JSONL
- delete - Delete Snapshots from stdin JSONL
- Examples:
- # Create
- archivebox snapshot create https://example.com --tag=news
- archivebox crawl create https://example.com | archivebox snapshot create
- # List with filters
- archivebox snapshot list --status=queued
- archivebox snapshot list --url__icontains=example.com
- # Update
- archivebox snapshot list --tag=old | archivebox snapshot update --tag=new
- # Delete
- archivebox snapshot list --url__icontains=spam.com | archivebox snapshot delete --yes
- """
- __package__ = 'archivebox.cli'
- __command__ = 'archivebox snapshot'
- import sys
- from typing import Optional, Iterable
- import rich_click as click
- from rich import print as rprint
- from archivebox.cli.cli_utils import apply_filters
- # =============================================================================
- # CREATE
- # =============================================================================
- def create_snapshots(
- urls: Iterable[str],
- tag: str = '',
- status: str = 'queued',
- depth: int = 0,
- created_by_id: Optional[int] = None,
- ) -> int:
- """
- Create Snapshots from URLs or stdin JSONL (Crawl or Snapshot records).
- Pass-through: Records that are not Crawl/Snapshot/URL are output unchanged.
- Exit codes:
- 0: Success
- 1: Failure
- """
- from archivebox.misc.jsonl import (
- read_args_or_stdin, write_record,
- TYPE_SNAPSHOT, TYPE_CRAWL
- )
- from archivebox.base_models.models import get_or_create_system_user_pk
- from archivebox.core.models import Snapshot
- from archivebox.crawls.models import Crawl
- created_by_id = created_by_id or get_or_create_system_user_pk()
- is_tty = sys.stdout.isatty()
- # Collect all input records
- records = list(read_args_or_stdin(urls))
- if not records:
- rprint('[yellow]No URLs or Crawls provided. Pass URLs as arguments or via stdin.[/yellow]', file=sys.stderr)
- return 1
- # Process each record - handle Crawls and plain URLs/Snapshots
- created_snapshots = []
- pass_through_count = 0
- for record in records:
- record_type = record.get('type', '')
- try:
- if record_type == TYPE_CRAWL:
- # Pass through the Crawl record itself first
- if not is_tty:
- write_record(record)
- # Input is a Crawl - get or create it, then create Snapshots for its URLs
- crawl = None
- crawl_id = record.get('id')
- if crawl_id:
- try:
- crawl = Crawl.objects.get(id=crawl_id)
- except Crawl.DoesNotExist:
- crawl = Crawl.from_json(record, overrides={'created_by_id': created_by_id})
- else:
- crawl = Crawl.from_json(record, overrides={'created_by_id': created_by_id})
- if not crawl:
- continue
- # Create snapshots for each URL in the crawl
- for url in crawl.get_urls_list():
- merged_tags = crawl.tags_str
- if tag:
- merged_tags = f"{merged_tags},{tag}" if merged_tags else tag
- snapshot_record = {
- 'url': url,
- 'tags': merged_tags,
- 'crawl_id': str(crawl.id),
- 'depth': depth,
- 'status': status,
- }
- snapshot = Snapshot.from_json(snapshot_record, overrides={'created_by_id': created_by_id})
- if snapshot:
- created_snapshots.append(snapshot)
- if not is_tty:
- write_record(snapshot.to_json())
- elif record_type == TYPE_SNAPSHOT or record.get('url'):
- # Input is a Snapshot or plain URL
- if tag and not record.get('tags'):
- record['tags'] = tag
- if status:
- record['status'] = status
- record['depth'] = record.get('depth', depth)
- snapshot = Snapshot.from_json(record, overrides={'created_by_id': created_by_id})
- if snapshot:
- created_snapshots.append(snapshot)
- if not is_tty:
- write_record(snapshot.to_json())
- else:
- # Pass-through: output records we don't handle
- if not is_tty:
- write_record(record)
- pass_through_count += 1
- except Exception as e:
- rprint(f'[red]Error creating snapshot: {e}[/red]', file=sys.stderr)
- continue
- if not created_snapshots:
- if pass_through_count > 0:
- rprint(f'[dim]Passed through {pass_through_count} records, no new snapshots[/dim]', file=sys.stderr)
- return 0
- rprint('[red]No snapshots created[/red]', file=sys.stderr)
- return 1
- rprint(f'[green]Created {len(created_snapshots)} snapshots[/green]', file=sys.stderr)
- if is_tty:
- for snapshot in created_snapshots:
- rprint(f' [dim]{snapshot.id}[/dim] {snapshot.url[:60]}', file=sys.stderr)
- return 0
- # =============================================================================
- # LIST
- # =============================================================================
- def list_snapshots(
- status: Optional[str] = None,
- url__icontains: Optional[str] = None,
- url__istartswith: Optional[str] = None,
- tag: Optional[str] = None,
- crawl_id: Optional[str] = None,
- limit: Optional[int] = None,
- ) -> int:
- """
- List Snapshots as JSONL with optional filters.
- Exit codes:
- 0: Success (even if no results)
- """
- from archivebox.misc.jsonl import write_record
- from archivebox.core.models import Snapshot
- is_tty = sys.stdout.isatty()
- queryset = Snapshot.objects.all().order_by('-created_at')
- # Apply filters
- filter_kwargs = {
- 'status': status,
- 'url__icontains': url__icontains,
- 'url__istartswith': url__istartswith,
- 'crawl_id': crawl_id,
- }
- queryset = apply_filters(queryset, filter_kwargs, limit=limit)
- # Tag filter requires special handling (M2M)
- if tag:
- queryset = queryset.filter(tags__name__iexact=tag)
- count = 0
- for snapshot in queryset:
- if is_tty:
- status_color = {
- 'queued': 'yellow',
- 'started': 'blue',
- 'sealed': 'green',
- }.get(snapshot.status, 'dim')
- rprint(f'[{status_color}]{snapshot.status:8}[/{status_color}] [dim]{snapshot.id}[/dim] {snapshot.url[:60]}')
- else:
- write_record(snapshot.to_json())
- count += 1
- rprint(f'[dim]Listed {count} snapshots[/dim]', file=sys.stderr)
- return 0
- # =============================================================================
- # UPDATE
- # =============================================================================
- def update_snapshots(
- status: Optional[str] = None,
- tag: Optional[str] = None,
- ) -> int:
- """
- Update Snapshots from stdin JSONL.
- Reads Snapshot records from stdin and applies updates.
- Uses PATCH semantics - only specified fields are updated.
- Exit codes:
- 0: Success
- 1: No input or error
- """
- from django.utils import timezone
- from archivebox.misc.jsonl import read_stdin, write_record
- from archivebox.core.models import Snapshot
- is_tty = sys.stdout.isatty()
- records = list(read_stdin())
- if not records:
- rprint('[yellow]No records provided via stdin[/yellow]', file=sys.stderr)
- return 1
- updated_count = 0
- for record in records:
- snapshot_id = record.get('id')
- if not snapshot_id:
- continue
- try:
- snapshot = Snapshot.objects.get(id=snapshot_id)
- # Apply updates from CLI flags (override stdin values)
- if status:
- snapshot.status = status
- snapshot.retry_at = timezone.now()
- if tag:
- # Add tag to existing tags
- snapshot.save() # Ensure saved before M2M
- from archivebox.core.models import Tag
- tag_obj, _ = Tag.objects.get_or_create(name=tag)
- snapshot.tags.add(tag_obj)
- snapshot.save()
- updated_count += 1
- if not is_tty:
- write_record(snapshot.to_json())
- except Snapshot.DoesNotExist:
- rprint(f'[yellow]Snapshot not found: {snapshot_id}[/yellow]', file=sys.stderr)
- continue
- rprint(f'[green]Updated {updated_count} snapshots[/green]', file=sys.stderr)
- return 0
- # =============================================================================
- # DELETE
- # =============================================================================
- def delete_snapshots(yes: bool = False, dry_run: bool = False) -> int:
- """
- Delete Snapshots from stdin JSONL.
- Requires --yes flag to confirm deletion.
- Exit codes:
- 0: Success
- 1: No input or missing --yes flag
- """
- from archivebox.misc.jsonl import read_stdin
- from archivebox.core.models import Snapshot
- records = list(read_stdin())
- if not records:
- rprint('[yellow]No records provided via stdin[/yellow]', file=sys.stderr)
- return 1
- snapshot_ids = [r.get('id') for r in records if r.get('id')]
- if not snapshot_ids:
- rprint('[yellow]No valid snapshot IDs in input[/yellow]', file=sys.stderr)
- return 1
- snapshots = Snapshot.objects.filter(id__in=snapshot_ids)
- count = snapshots.count()
- if count == 0:
- rprint('[yellow]No matching snapshots found[/yellow]', file=sys.stderr)
- return 0
- if dry_run:
- rprint(f'[yellow]Would delete {count} snapshots (dry run)[/yellow]', file=sys.stderr)
- for snapshot in snapshots:
- rprint(f' [dim]{snapshot.id}[/dim] {snapshot.url[:60]}', file=sys.stderr)
- return 0
- if not yes:
- rprint('[red]Use --yes to confirm deletion[/red]', file=sys.stderr)
- return 1
- # Perform deletion
- deleted_count, _ = snapshots.delete()
- rprint(f'[green]Deleted {deleted_count} snapshots[/green]', file=sys.stderr)
- return 0
- # =============================================================================
- # CLI Commands
- # =============================================================================
- @click.group()
- def main():
- """Manage Snapshot records."""
- pass
- @main.command('create')
- @click.argument('urls', nargs=-1)
- @click.option('--tag', '-t', default='', help='Comma-separated tags to add')
- @click.option('--status', '-s', default='queued', help='Initial status (default: queued)')
- @click.option('--depth', '-d', type=int, default=0, help='Crawl depth (default: 0)')
- def create_cmd(urls: tuple, tag: str, status: str, depth: int):
- """Create Snapshots from URLs or stdin JSONL."""
- sys.exit(create_snapshots(urls, tag=tag, status=status, depth=depth))
- @main.command('list')
- @click.option('--status', '-s', help='Filter by status (queued, started, sealed)')
- @click.option('--url__icontains', help='Filter by URL contains')
- @click.option('--url__istartswith', help='Filter by URL starts with')
- @click.option('--tag', '-t', help='Filter by tag name')
- @click.option('--crawl-id', help='Filter by crawl ID')
- @click.option('--limit', '-n', type=int, help='Limit number of results')
- def list_cmd(status: Optional[str], url__icontains: Optional[str], url__istartswith: Optional[str],
- tag: Optional[str], crawl_id: Optional[str], limit: Optional[int]):
- """List Snapshots as JSONL."""
- sys.exit(list_snapshots(
- status=status,
- url__icontains=url__icontains,
- url__istartswith=url__istartswith,
- tag=tag,
- crawl_id=crawl_id,
- limit=limit,
- ))
- @main.command('update')
- @click.option('--status', '-s', help='Set status')
- @click.option('--tag', '-t', help='Add tag')
- def update_cmd(status: Optional[str], tag: Optional[str]):
- """Update Snapshots from stdin JSONL."""
- sys.exit(update_snapshots(status=status, tag=tag))
- @main.command('delete')
- @click.option('--yes', '-y', is_flag=True, help='Confirm deletion')
- @click.option('--dry-run', is_flag=True, help='Show what would be deleted')
- def delete_cmd(yes: bool, dry_run: bool):
- """Delete Snapshots from stdin JSONL."""
- sys.exit(delete_snapshots(yes=yes, dry_run=dry_run))
- if __name__ == '__main__':
- main()
|