archivebox_tag.py 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293
  1. #!/usr/bin/env python3
  2. """
  3. archivebox tag <action> [args...] [--filters]
  4. Manage Tag records.
  5. Actions:
  6. create - Create Tags
  7. list - List Tags as JSONL (with optional filters)
  8. update - Update Tags from stdin JSONL
  9. delete - Delete Tags from stdin JSONL
  10. Examples:
  11. # Create
  12. archivebox tag create news tech science
  13. archivebox tag create "important stuff"
  14. # List
  15. archivebox tag list
  16. archivebox tag list --name__icontains=news
  17. # Update (rename tags)
  18. archivebox tag list --name=oldname | archivebox tag update --name=newname
  19. # Delete
  20. archivebox tag list --name=unused | archivebox tag delete --yes
  21. """
  22. __package__ = 'archivebox.cli'
  23. __command__ = 'archivebox tag'
  24. import sys
  25. from typing import Optional, Iterable
  26. import rich_click as click
  27. from rich import print as rprint
  28. from archivebox.cli.cli_utils import apply_filters
  29. # =============================================================================
  30. # CREATE
  31. # =============================================================================
  32. def create_tags(names: Iterable[str]) -> int:
  33. """
  34. Create Tags from names.
  35. Exit codes:
  36. 0: Success
  37. 1: Failure
  38. """
  39. from archivebox.misc.jsonl import write_record
  40. from archivebox.core.models import Tag
  41. is_tty = sys.stdout.isatty()
  42. # Convert to list if needed
  43. name_list = list(names) if names else []
  44. if not name_list:
  45. rprint('[yellow]No tag names provided. Pass names as arguments.[/yellow]', file=sys.stderr)
  46. return 1
  47. created_count = 0
  48. for name in name_list:
  49. name = name.strip()
  50. if not name:
  51. continue
  52. tag, created = Tag.objects.get_or_create(name=name)
  53. if not is_tty:
  54. write_record(tag.to_json())
  55. if created:
  56. created_count += 1
  57. rprint(f'[green]Created tag: {name}[/green]', file=sys.stderr)
  58. else:
  59. rprint(f'[dim]Tag already exists: {name}[/dim]', file=sys.stderr)
  60. rprint(f'[green]Created {created_count} new tags[/green]', file=sys.stderr)
  61. return 0
  62. # =============================================================================
  63. # LIST
  64. # =============================================================================
  65. def list_tags(
  66. name: Optional[str] = None,
  67. name__icontains: Optional[str] = None,
  68. limit: Optional[int] = None,
  69. ) -> int:
  70. """
  71. List Tags as JSONL with optional filters.
  72. Exit codes:
  73. 0: Success (even if no results)
  74. """
  75. from archivebox.misc.jsonl import write_record
  76. from archivebox.core.models import Tag
  77. is_tty = sys.stdout.isatty()
  78. queryset = Tag.objects.all().order_by('name')
  79. # Apply filters
  80. filter_kwargs = {
  81. 'name': name,
  82. 'name__icontains': name__icontains,
  83. }
  84. queryset = apply_filters(queryset, filter_kwargs, limit=limit)
  85. count = 0
  86. for tag in queryset:
  87. snapshot_count = tag.snapshot_set.count()
  88. if is_tty:
  89. rprint(f'[cyan]{tag.name:30}[/cyan] [dim]({snapshot_count} snapshots)[/dim]')
  90. else:
  91. write_record(tag.to_json())
  92. count += 1
  93. rprint(f'[dim]Listed {count} tags[/dim]', file=sys.stderr)
  94. return 0
  95. # =============================================================================
  96. # UPDATE
  97. # =============================================================================
  98. def update_tags(name: Optional[str] = None) -> int:
  99. """
  100. Update Tags from stdin JSONL.
  101. Reads Tag records from stdin and applies updates.
  102. Uses PATCH semantics - only specified fields are updated.
  103. Exit codes:
  104. 0: Success
  105. 1: No input or error
  106. """
  107. from archivebox.misc.jsonl import read_stdin, write_record
  108. from archivebox.core.models import Tag
  109. is_tty = sys.stdout.isatty()
  110. records = list(read_stdin())
  111. if not records:
  112. rprint('[yellow]No records provided via stdin[/yellow]', file=sys.stderr)
  113. return 1
  114. updated_count = 0
  115. for record in records:
  116. tag_id = record.get('id')
  117. old_name = record.get('name')
  118. if not tag_id and not old_name:
  119. continue
  120. try:
  121. if tag_id:
  122. tag = Tag.objects.get(id=tag_id)
  123. else:
  124. tag = Tag.objects.get(name=old_name)
  125. # Apply updates from CLI flags
  126. if name:
  127. tag.name = name
  128. tag.save()
  129. updated_count += 1
  130. if not is_tty:
  131. write_record(tag.to_json())
  132. except Tag.DoesNotExist:
  133. rprint(f'[yellow]Tag not found: {tag_id or old_name}[/yellow]', file=sys.stderr)
  134. continue
  135. rprint(f'[green]Updated {updated_count} tags[/green]', file=sys.stderr)
  136. return 0
  137. # =============================================================================
  138. # DELETE
  139. # =============================================================================
  140. def delete_tags(yes: bool = False, dry_run: bool = False) -> int:
  141. """
  142. Delete Tags from stdin JSONL.
  143. Requires --yes flag to confirm deletion.
  144. Exit codes:
  145. 0: Success
  146. 1: No input or missing --yes flag
  147. """
  148. from archivebox.misc.jsonl import read_stdin
  149. from archivebox.core.models import Tag
  150. records = list(read_stdin())
  151. if not records:
  152. rprint('[yellow]No records provided via stdin[/yellow]', file=sys.stderr)
  153. return 1
  154. # Collect tag IDs or names
  155. tag_ids = []
  156. tag_names = []
  157. for r in records:
  158. if r.get('id'):
  159. tag_ids.append(r['id'])
  160. elif r.get('name'):
  161. tag_names.append(r['name'])
  162. if not tag_ids and not tag_names:
  163. rprint('[yellow]No valid tag IDs or names in input[/yellow]', file=sys.stderr)
  164. return 1
  165. from django.db.models import Q
  166. query = Q()
  167. if tag_ids:
  168. query |= Q(id__in=tag_ids)
  169. if tag_names:
  170. query |= Q(name__in=tag_names)
  171. tags = Tag.objects.filter(query)
  172. count = tags.count()
  173. if count == 0:
  174. rprint('[yellow]No matching tags found[/yellow]', file=sys.stderr)
  175. return 0
  176. if dry_run:
  177. rprint(f'[yellow]Would delete {count} tags (dry run)[/yellow]', file=sys.stderr)
  178. for tag in tags:
  179. rprint(f' {tag.name}', file=sys.stderr)
  180. return 0
  181. if not yes:
  182. rprint('[red]Use --yes to confirm deletion[/red]', file=sys.stderr)
  183. return 1
  184. # Perform deletion
  185. deleted_count, _ = tags.delete()
  186. rprint(f'[green]Deleted {deleted_count} tags[/green]', file=sys.stderr)
  187. return 0
  188. # =============================================================================
  189. # CLI Commands
  190. # =============================================================================
  191. @click.group()
  192. def main():
  193. """Manage Tag records."""
  194. pass
  195. @main.command('create')
  196. @click.argument('names', nargs=-1)
  197. def create_cmd(names: tuple):
  198. """Create Tags from names."""
  199. sys.exit(create_tags(names))
  200. @main.command('list')
  201. @click.option('--name', help='Filter by exact name')
  202. @click.option('--name__icontains', help='Filter by name contains')
  203. @click.option('--limit', '-n', type=int, help='Limit number of results')
  204. def list_cmd(name: Optional[str], name__icontains: Optional[str], limit: Optional[int]):
  205. """List Tags as JSONL."""
  206. sys.exit(list_tags(name=name, name__icontains=name__icontains, limit=limit))
  207. @main.command('update')
  208. @click.option('--name', '-n', help='Set new name')
  209. def update_cmd(name: Optional[str]):
  210. """Update Tags from stdin JSONL."""
  211. sys.exit(update_tags(name=name))
  212. @main.command('delete')
  213. @click.option('--yes', '-y', is_flag=True, help='Confirm deletion')
  214. @click.option('--dry-run', is_flag=True, help='Show what would be deleted')
  215. def delete_cmd(yes: bool, dry_run: bool):
  216. """Delete Tags from stdin JSONL."""
  217. sys.exit(delete_tags(yes=yes, dry_run=dry_run))
  218. if __name__ == '__main__':
  219. main()