| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293 |
- #!/usr/bin/env python3
- """
- archivebox tag <action> [args...] [--filters]
- Manage Tag records.
- Actions:
- create - Create Tags
- list - List Tags as JSONL (with optional filters)
- update - Update Tags from stdin JSONL
- delete - Delete Tags from stdin JSONL
- Examples:
- # Create
- archivebox tag create news tech science
- archivebox tag create "important stuff"
- # List
- archivebox tag list
- archivebox tag list --name__icontains=news
- # Update (rename tags)
- archivebox tag list --name=oldname | archivebox tag update --name=newname
- # Delete
- archivebox tag list --name=unused | archivebox tag delete --yes
- """
- __package__ = 'archivebox.cli'
- __command__ = 'archivebox tag'
- import sys
- from typing import Optional, Iterable
- import rich_click as click
- from rich import print as rprint
- from archivebox.cli.cli_utils import apply_filters
- # =============================================================================
- # CREATE
- # =============================================================================
- def create_tags(names: Iterable[str]) -> int:
- """
- Create Tags from names.
- Exit codes:
- 0: Success
- 1: Failure
- """
- from archivebox.misc.jsonl import write_record
- from archivebox.core.models import Tag
- is_tty = sys.stdout.isatty()
- # Convert to list if needed
- name_list = list(names) if names else []
- if not name_list:
- rprint('[yellow]No tag names provided. Pass names as arguments.[/yellow]', file=sys.stderr)
- return 1
- created_count = 0
- for name in name_list:
- name = name.strip()
- if not name:
- continue
- tag, created = Tag.objects.get_or_create(name=name)
- if not is_tty:
- write_record(tag.to_json())
- if created:
- created_count += 1
- rprint(f'[green]Created tag: {name}[/green]', file=sys.stderr)
- else:
- rprint(f'[dim]Tag already exists: {name}[/dim]', file=sys.stderr)
- rprint(f'[green]Created {created_count} new tags[/green]', file=sys.stderr)
- return 0
- # =============================================================================
- # LIST
- # =============================================================================
- def list_tags(
- name: Optional[str] = None,
- name__icontains: Optional[str] = None,
- limit: Optional[int] = None,
- ) -> int:
- """
- List Tags as JSONL with optional filters.
- Exit codes:
- 0: Success (even if no results)
- """
- from archivebox.misc.jsonl import write_record
- from archivebox.core.models import Tag
- is_tty = sys.stdout.isatty()
- queryset = Tag.objects.all().order_by('name')
- # Apply filters
- filter_kwargs = {
- 'name': name,
- 'name__icontains': name__icontains,
- }
- queryset = apply_filters(queryset, filter_kwargs, limit=limit)
- count = 0
- for tag in queryset:
- snapshot_count = tag.snapshot_set.count()
- if is_tty:
- rprint(f'[cyan]{tag.name:30}[/cyan] [dim]({snapshot_count} snapshots)[/dim]')
- else:
- write_record(tag.to_json())
- count += 1
- rprint(f'[dim]Listed {count} tags[/dim]', file=sys.stderr)
- return 0
- # =============================================================================
- # UPDATE
- # =============================================================================
- def update_tags(name: Optional[str] = None) -> int:
- """
- Update Tags from stdin JSONL.
- Reads Tag records from stdin and applies updates.
- Uses PATCH semantics - only specified fields are updated.
- Exit codes:
- 0: Success
- 1: No input or error
- """
- from archivebox.misc.jsonl import read_stdin, write_record
- from archivebox.core.models import Tag
- is_tty = sys.stdout.isatty()
- records = list(read_stdin())
- if not records:
- rprint('[yellow]No records provided via stdin[/yellow]', file=sys.stderr)
- return 1
- updated_count = 0
- for record in records:
- tag_id = record.get('id')
- old_name = record.get('name')
- if not tag_id and not old_name:
- continue
- try:
- if tag_id:
- tag = Tag.objects.get(id=tag_id)
- else:
- tag = Tag.objects.get(name=old_name)
- # Apply updates from CLI flags
- if name:
- tag.name = name
- tag.save()
- updated_count += 1
- if not is_tty:
- write_record(tag.to_json())
- except Tag.DoesNotExist:
- rprint(f'[yellow]Tag not found: {tag_id or old_name}[/yellow]', file=sys.stderr)
- continue
- rprint(f'[green]Updated {updated_count} tags[/green]', file=sys.stderr)
- return 0
- # =============================================================================
- # DELETE
- # =============================================================================
- def delete_tags(yes: bool = False, dry_run: bool = False) -> int:
- """
- Delete Tags from stdin JSONL.
- Requires --yes flag to confirm deletion.
- Exit codes:
- 0: Success
- 1: No input or missing --yes flag
- """
- from archivebox.misc.jsonl import read_stdin
- from archivebox.core.models import Tag
- records = list(read_stdin())
- if not records:
- rprint('[yellow]No records provided via stdin[/yellow]', file=sys.stderr)
- return 1
- # Collect tag IDs or names
- tag_ids = []
- tag_names = []
- for r in records:
- if r.get('id'):
- tag_ids.append(r['id'])
- elif r.get('name'):
- tag_names.append(r['name'])
- if not tag_ids and not tag_names:
- rprint('[yellow]No valid tag IDs or names in input[/yellow]', file=sys.stderr)
- return 1
- from django.db.models import Q
- query = Q()
- if tag_ids:
- query |= Q(id__in=tag_ids)
- if tag_names:
- query |= Q(name__in=tag_names)
- tags = Tag.objects.filter(query)
- count = tags.count()
- if count == 0:
- rprint('[yellow]No matching tags found[/yellow]', file=sys.stderr)
- return 0
- if dry_run:
- rprint(f'[yellow]Would delete {count} tags (dry run)[/yellow]', file=sys.stderr)
- for tag in tags:
- rprint(f' {tag.name}', file=sys.stderr)
- return 0
- if not yes:
- rprint('[red]Use --yes to confirm deletion[/red]', file=sys.stderr)
- return 1
- # Perform deletion
- deleted_count, _ = tags.delete()
- rprint(f'[green]Deleted {deleted_count} tags[/green]', file=sys.stderr)
- return 0
- # =============================================================================
- # CLI Commands
- # =============================================================================
- @click.group()
- def main():
- """Manage Tag records."""
- pass
- @main.command('create')
- @click.argument('names', nargs=-1)
- def create_cmd(names: tuple):
- """Create Tags from names."""
- sys.exit(create_tags(names))
- @main.command('list')
- @click.option('--name', help='Filter by exact name')
- @click.option('--name__icontains', help='Filter by name contains')
- @click.option('--limit', '-n', type=int, help='Limit number of results')
- def list_cmd(name: Optional[str], name__icontains: Optional[str], limit: Optional[int]):
- """List Tags as JSONL."""
- sys.exit(list_tags(name=name, name__icontains=name__icontains, limit=limit))
- @main.command('update')
- @click.option('--name', '-n', help='Set new name')
- def update_cmd(name: Optional[str]):
- """Update Tags from stdin JSONL."""
- sys.exit(update_tags(name=name))
- @main.command('delete')
- @click.option('--yes', '-y', is_flag=True, help='Confirm deletion')
- @click.option('--dry-run', is_flag=True, help='Show what would be deleted')
- def delete_cmd(yes: bool, dry_run: bool):
- """Delete Tags from stdin JSONL."""
- sys.exit(delete_tags(yes=yes, dry_run=dry_run))
- if __name__ == '__main__':
- main()
|