archivebox_extract.py 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265
  1. #!/usr/bin/env python3
  2. """
  3. archivebox extract [snapshot_ids...] [--plugins=NAMES]
  4. Run plugins on Snapshots. Accepts snapshot IDs as arguments, from stdin, or via JSONL.
  5. Input formats:
  6. - Snapshot UUIDs (one per line)
  7. - JSONL: {"type": "Snapshot", "id": "...", "url": "..."}
  8. - JSONL: {"type": "ArchiveResult", "snapshot_id": "...", "plugin": "..."}
  9. Output (JSONL):
  10. {"type": "ArchiveResult", "id": "...", "snapshot_id": "...", "plugin": "...", "status": "..."}
  11. Examples:
  12. # Extract specific snapshot
  13. archivebox extract 01234567-89ab-cdef-0123-456789abcdef
  14. # Pipe from snapshot command
  15. archivebox snapshot https://example.com | archivebox extract
  16. # Run specific plugins only
  17. archivebox extract --plugins=screenshot,singlefile 01234567-89ab-cdef-0123-456789abcdef
  18. # Chain commands
  19. archivebox crawl https://example.com | archivebox snapshot | archivebox extract
  20. """
  21. __package__ = 'archivebox.cli'
  22. __command__ = 'archivebox extract'
  23. import sys
  24. from typing import Optional, List
  25. import rich_click as click
  26. def process_archiveresult_by_id(archiveresult_id: str) -> int:
  27. """
  28. Run extraction for a single ArchiveResult by ID (used by workers).
  29. Triggers the ArchiveResult's state machine tick() to run the extractor plugin.
  30. """
  31. from rich import print as rprint
  32. from archivebox.core.models import ArchiveResult
  33. try:
  34. archiveresult = ArchiveResult.objects.get(id=archiveresult_id)
  35. except ArchiveResult.DoesNotExist:
  36. rprint(f'[red]ArchiveResult {archiveresult_id} not found[/red]', file=sys.stderr)
  37. return 1
  38. rprint(f'[blue]Extracting {archiveresult.plugin} for {archiveresult.snapshot.url}[/blue]', file=sys.stderr)
  39. try:
  40. # Trigger state machine tick - this runs the actual extraction
  41. archiveresult.sm.tick()
  42. archiveresult.refresh_from_db()
  43. if archiveresult.status == ArchiveResult.StatusChoices.SUCCEEDED:
  44. print(f'[green]Extraction succeeded: {archiveresult.output_str}[/green]')
  45. return 0
  46. elif archiveresult.status == ArchiveResult.StatusChoices.FAILED:
  47. print(f'[red]Extraction failed: {archiveresult.output_str}[/red]', file=sys.stderr)
  48. return 1
  49. else:
  50. # Still in progress or backoff - not a failure
  51. print(f'[yellow]Extraction status: {archiveresult.status}[/yellow]')
  52. return 0
  53. except Exception as e:
  54. print(f'[red]Extraction error: {type(e).__name__}: {e}[/red]', file=sys.stderr)
  55. return 1
  56. def run_plugins(
  57. args: tuple,
  58. plugins: str = '',
  59. wait: bool = True,
  60. ) -> int:
  61. """
  62. Run plugins on Snapshots from input.
  63. Reads Snapshot IDs or JSONL from args/stdin, runs plugins, outputs JSONL.
  64. Exit codes:
  65. 0: Success
  66. 1: Failure
  67. """
  68. from rich import print as rprint
  69. from django.utils import timezone
  70. from archivebox.misc.jsonl import (
  71. read_args_or_stdin, write_record,
  72. TYPE_SNAPSHOT, TYPE_ARCHIVERESULT
  73. )
  74. from archivebox.core.models import Snapshot, ArchiveResult
  75. from archivebox.workers.orchestrator import Orchestrator
  76. is_tty = sys.stdout.isatty()
  77. # Parse comma-separated plugins list once (reused in creation and filtering)
  78. plugins_list = [p.strip() for p in plugins.split(',') if p.strip()] if plugins else []
  79. # Collect all input records
  80. records = list(read_args_or_stdin(args))
  81. if not records:
  82. rprint('[yellow]No snapshots provided. Pass snapshot IDs as arguments or via stdin.[/yellow]', file=sys.stderr)
  83. return 1
  84. # Gather snapshot IDs to process
  85. snapshot_ids = set()
  86. for record in records:
  87. record_type = record.get('type')
  88. if record_type == TYPE_SNAPSHOT:
  89. snapshot_id = record.get('id')
  90. if snapshot_id:
  91. snapshot_ids.add(snapshot_id)
  92. elif record.get('url'):
  93. # Look up by URL (get most recent if multiple exist)
  94. snap = Snapshot.objects.filter(url=record['url']).order_by('-created_at').first()
  95. if snap:
  96. snapshot_ids.add(str(snap.id))
  97. else:
  98. rprint(f'[yellow]Snapshot not found for URL: {record["url"]}[/yellow]', file=sys.stderr)
  99. elif record_type == TYPE_ARCHIVERESULT:
  100. snapshot_id = record.get('snapshot_id')
  101. if snapshot_id:
  102. snapshot_ids.add(snapshot_id)
  103. elif 'id' in record:
  104. # Assume it's a snapshot ID
  105. snapshot_ids.add(record['id'])
  106. if not snapshot_ids:
  107. rprint('[red]No valid snapshot IDs found in input[/red]', file=sys.stderr)
  108. return 1
  109. # Get snapshots and ensure they have pending ArchiveResults
  110. processed_count = 0
  111. for snapshot_id in snapshot_ids:
  112. try:
  113. snapshot = Snapshot.objects.get(id=snapshot_id)
  114. except Snapshot.DoesNotExist:
  115. rprint(f'[yellow]Snapshot {snapshot_id} not found[/yellow]', file=sys.stderr)
  116. continue
  117. # Create pending ArchiveResults if needed
  118. if plugins_list:
  119. # Only create for specific plugins
  120. for plugin_name in plugins_list:
  121. result, created = ArchiveResult.objects.get_or_create(
  122. snapshot=snapshot,
  123. plugin=plugin_name,
  124. defaults={
  125. 'status': ArchiveResult.StatusChoices.QUEUED,
  126. 'retry_at': timezone.now(),
  127. }
  128. )
  129. if not created and result.status in [ArchiveResult.StatusChoices.FAILED, ArchiveResult.StatusChoices.SKIPPED]:
  130. # Reset for retry
  131. result.status = ArchiveResult.StatusChoices.QUEUED
  132. result.retry_at = timezone.now()
  133. result.save()
  134. else:
  135. # Create all pending plugins
  136. snapshot.create_pending_archiveresults()
  137. # Reset snapshot status to allow processing
  138. if snapshot.status == Snapshot.StatusChoices.SEALED:
  139. snapshot.status = Snapshot.StatusChoices.STARTED
  140. snapshot.retry_at = timezone.now()
  141. snapshot.save()
  142. processed_count += 1
  143. if processed_count == 0:
  144. rprint('[red]No snapshots to process[/red]', file=sys.stderr)
  145. return 1
  146. rprint(f'[blue]Queued {processed_count} snapshots for extraction[/blue]', file=sys.stderr)
  147. # Run orchestrator if --wait (default)
  148. if wait:
  149. rprint('[blue]Running plugins...[/blue]', file=sys.stderr)
  150. orchestrator = Orchestrator(exit_on_idle=True)
  151. orchestrator.runloop()
  152. # Output results as JSONL (when piped) or human-readable (when TTY)
  153. for snapshot_id in snapshot_ids:
  154. try:
  155. snapshot = Snapshot.objects.get(id=snapshot_id)
  156. results = snapshot.archiveresult_set.all()
  157. if plugins_list:
  158. results = results.filter(plugin__in=plugins_list)
  159. for result in results:
  160. if is_tty:
  161. status_color = {
  162. 'succeeded': 'green',
  163. 'failed': 'red',
  164. 'skipped': 'yellow',
  165. }.get(result.status, 'dim')
  166. rprint(f' [{status_color}]{result.status}[/{status_color}] {result.plugin} → {result.output_str or ""}', file=sys.stderr)
  167. else:
  168. write_record(result.to_jsonl())
  169. except Snapshot.DoesNotExist:
  170. continue
  171. return 0
  172. def is_archiveresult_id(value: str) -> bool:
  173. """Check if value looks like an ArchiveResult UUID."""
  174. import re
  175. uuid_pattern = re.compile(r'^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$', re.I)
  176. if not uuid_pattern.match(value):
  177. return False
  178. # Verify it's actually an ArchiveResult (not a Snapshot or other object)
  179. from archivebox.core.models import ArchiveResult
  180. return ArchiveResult.objects.filter(id=value).exists()
  181. @click.command()
  182. @click.option('--plugins', '-p', default='', help='Comma-separated list of plugins to run (e.g., screenshot,singlefile)')
  183. @click.option('--wait/--no-wait', default=True, help='Wait for plugins to complete (default: wait)')
  184. @click.argument('args', nargs=-1)
  185. def main(plugins: str, wait: bool, args: tuple):
  186. """Run plugins on Snapshots, or process existing ArchiveResults by ID"""
  187. from archivebox.misc.jsonl import read_args_or_stdin
  188. # Read all input
  189. records = list(read_args_or_stdin(args))
  190. if not records:
  191. from rich import print as rprint
  192. rprint('[yellow]No Snapshot IDs or ArchiveResult IDs provided. Pass as arguments or via stdin.[/yellow]', file=sys.stderr)
  193. sys.exit(1)
  194. # Check if input looks like existing ArchiveResult IDs to process
  195. all_are_archiveresult_ids = all(
  196. is_archiveresult_id(r.get('id') or r.get('url', ''))
  197. for r in records
  198. )
  199. if all_are_archiveresult_ids:
  200. # Process existing ArchiveResults by ID
  201. exit_code = 0
  202. for record in records:
  203. archiveresult_id = record.get('id') or record.get('url')
  204. result = process_archiveresult_by_id(archiveresult_id)
  205. if result != 0:
  206. exit_code = result
  207. sys.exit(exit_code)
  208. else:
  209. # Default behavior: run plugins on Snapshots from input
  210. sys.exit(run_plugins(args, plugins=plugins, wait=wait))
  211. if __name__ == '__main__':
  212. main()