archivebox_update.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336
  1. #!/usr/bin/env python3
  2. __package__ = 'archivebox.cli'
  3. import os
  4. import time
  5. import rich_click as click
  6. from typing import Iterable
  7. from pathlib import Path
  8. from archivebox.misc.util import enforce_types, docstring
  9. @enforce_types
  10. def update(filter_patterns: Iterable[str] = (),
  11. filter_type: str = 'exact',
  12. before: float | None = None,
  13. after: float | None = None,
  14. resume: str | None = None,
  15. batch_size: int = 100,
  16. continuous: bool = False) -> None:
  17. """
  18. Update snapshots: migrate old dirs, reconcile DB, and re-queue for archiving.
  19. Three-phase operation (without filters):
  20. - Phase 1: Drain old archive/ dirs by moving to new fs location (0.8.x → 0.9.x)
  21. - Phase 2: O(n) scan over entire DB from most recent to least recent
  22. - No orphan scans needed (trust 1:1 mapping between DB and filesystem after phase 1)
  23. With filters: Only phase 2 (DB query), no filesystem operations.
  24. Without filters: All phases (full update).
  25. """
  26. from rich import print
  27. from archivebox.config.django import setup_django
  28. setup_django()
  29. from archivebox.core.models import Snapshot
  30. from django.utils import timezone
  31. from django.core.management import call_command
  32. # Run migrations first to ensure DB schema is up-to-date
  33. print('[*] Checking for pending migrations...')
  34. try:
  35. call_command('migrate', '--no-input', verbosity=0)
  36. except Exception as e:
  37. print(f'[!] Warning: Migration check failed: {e}')
  38. while True:
  39. if filter_patterns or before or after:
  40. # Filtered mode: query DB only
  41. print('[*] Processing filtered snapshots from database...')
  42. stats = process_filtered_snapshots(
  43. filter_patterns=filter_patterns,
  44. filter_type=filter_type,
  45. before=before,
  46. after=after,
  47. batch_size=batch_size
  48. )
  49. print_stats(stats)
  50. else:
  51. # Full mode: drain old dirs + process DB
  52. stats_combined = {'phase1': {}, 'phase2': {}}
  53. print('[*] Phase 1: Draining old archive/ directories (0.8.x → 0.9.x migration)...')
  54. stats_combined['phase1'] = drain_old_archive_dirs(
  55. resume_from=resume,
  56. batch_size=batch_size
  57. )
  58. print('[*] Phase 2: Processing all database snapshots (most recent first)...')
  59. stats_combined['phase2'] = process_all_db_snapshots(batch_size=batch_size)
  60. # Phase 3: Deduplication (disabled for now)
  61. # print('[*] Phase 3: Deduplicating...')
  62. # stats_combined['deduplicated'] = Snapshot.find_and_merge_duplicates()
  63. print_combined_stats(stats_combined)
  64. if not continuous:
  65. break
  66. print('[yellow]Sleeping 60s before next pass...[/yellow]')
  67. time.sleep(60)
  68. resume = None
  69. def drain_old_archive_dirs(resume_from: str = None, batch_size: int = 100) -> dict:
  70. """
  71. Drain old archive/ directories (0.8.x → 0.9.x migration).
  72. Only processes real directories (skips symlinks - those are already migrated).
  73. For each old dir found in archive/:
  74. 1. Load or create DB snapshot
  75. 2. Trigger fs migration on save() to move to data/users/{user}/...
  76. 3. Leave symlink in archive/ pointing to new location
  77. After this drains, archive/ should only contain symlinks and we can trust
  78. 1:1 mapping between DB and filesystem.
  79. """
  80. from archivebox.core.models import Snapshot
  81. from archivebox.config import CONSTANTS
  82. from django.db import transaction
  83. stats = {'processed': 0, 'migrated': 0, 'skipped': 0, 'invalid': 0}
  84. archive_dir = CONSTANTS.ARCHIVE_DIR
  85. if not archive_dir.exists():
  86. return stats
  87. print('[*] Scanning for old directories in archive/...')
  88. # Scan for real directories only (skip symlinks - they're already migrated)
  89. entries = [
  90. (e.stat().st_mtime, e.path)
  91. for e in os.scandir(archive_dir)
  92. if e.is_dir(follow_symlinks=False) # Skip symlinks
  93. ]
  94. entries.sort(reverse=True) # Newest first
  95. print(f'[*] Found {len(entries)} old directories to drain')
  96. for mtime, entry_path in entries:
  97. entry_path = Path(entry_path)
  98. # Resume from timestamp if specified
  99. if resume_from and entry_path.name < resume_from:
  100. continue
  101. stats['processed'] += 1
  102. # Try to load existing snapshot from DB
  103. snapshot = Snapshot.load_from_directory(entry_path)
  104. if not snapshot:
  105. # Not in DB - create new snapshot record
  106. snapshot = Snapshot.create_from_directory(entry_path)
  107. if not snapshot:
  108. # Invalid directory - move to invalid/
  109. Snapshot.move_directory_to_invalid(entry_path)
  110. stats['invalid'] += 1
  111. print(f" [{stats['processed']}] Invalid: {entry_path.name}")
  112. continue
  113. # Check if needs migration (0.8.x → 0.9.x)
  114. if snapshot.fs_migration_needed:
  115. try:
  116. snapshot.save() # Triggers migration + creates symlink
  117. stats['migrated'] += 1
  118. print(f" [{stats['processed']}] Migrated: {entry_path.name}")
  119. except Exception as e:
  120. # Snapshot already exists in DB with different crawl - skip it
  121. if 'UNIQUE constraint failed' in str(e):
  122. stats['skipped'] += 1
  123. print(f" [{stats['processed']}] Skipped (already in DB): {entry_path.name}")
  124. else:
  125. raise
  126. else:
  127. stats['skipped'] += 1
  128. if stats['processed'] % batch_size == 0:
  129. transaction.commit()
  130. transaction.commit()
  131. return stats
  132. def process_all_db_snapshots(batch_size: int = 100) -> dict:
  133. """
  134. O(n) scan over entire DB from most recent to least recent.
  135. For each snapshot:
  136. 1. Reconcile index.json with DB (merge titles, tags, archive results)
  137. 2. Queue for archiving (state machine will handle it)
  138. No orphan detection needed - we trust 1:1 mapping between DB and filesystem
  139. after Phase 1 has drained all old archive/ directories.
  140. """
  141. from archivebox.core.models import Snapshot
  142. from django.db import transaction
  143. from django.utils import timezone
  144. stats = {'processed': 0, 'reconciled': 0, 'queued': 0}
  145. total = Snapshot.objects.count()
  146. print(f'[*] Processing {total} snapshots from database (most recent first)...')
  147. # Process from most recent to least recent
  148. for snapshot in Snapshot.objects.select_related('crawl').order_by('-bookmarked_at').iterator(chunk_size=batch_size):
  149. stats['processed'] += 1
  150. # Skip snapshots with missing crawl references (orphaned by migration errors)
  151. if not snapshot.crawl_id:
  152. continue
  153. try:
  154. # Reconcile index.json with DB
  155. snapshot.reconcile_with_index_json()
  156. # Clean up invalid field values from old migrations
  157. if not isinstance(snapshot.current_step, int):
  158. snapshot.current_step = 0
  159. # Queue for archiving (state machine will handle it)
  160. snapshot.status = Snapshot.StatusChoices.QUEUED
  161. snapshot.retry_at = timezone.now()
  162. snapshot.save()
  163. stats['reconciled'] += 1
  164. stats['queued'] += 1
  165. except Exception as e:
  166. # Skip snapshots that can't be processed (e.g., missing crawl)
  167. print(f" [!] Skipping snapshot {snapshot.id}: {e}")
  168. continue
  169. if stats['processed'] % batch_size == 0:
  170. transaction.commit()
  171. print(f" [{stats['processed']}/{total}] Processed...")
  172. transaction.commit()
  173. return stats
  174. def process_filtered_snapshots(
  175. filter_patterns: Iterable[str],
  176. filter_type: str,
  177. before: float | None,
  178. after: float | None,
  179. batch_size: int
  180. ) -> dict:
  181. """Process snapshots matching filters (DB query only)."""
  182. from archivebox.core.models import Snapshot
  183. from django.db import transaction
  184. from django.utils import timezone
  185. from datetime import datetime
  186. stats = {'processed': 0, 'reconciled': 0, 'queued': 0}
  187. snapshots = Snapshot.objects.all()
  188. if filter_patterns:
  189. snapshots = Snapshot.objects.filter_by_patterns(list(filter_patterns), filter_type)
  190. if before:
  191. snapshots = snapshots.filter(bookmarked_at__lt=datetime.fromtimestamp(before))
  192. if after:
  193. snapshots = snapshots.filter(bookmarked_at__gt=datetime.fromtimestamp(after))
  194. total = snapshots.count()
  195. print(f'[*] Found {total} matching snapshots')
  196. for snapshot in snapshots.select_related('crawl').iterator(chunk_size=batch_size):
  197. stats['processed'] += 1
  198. # Skip snapshots with missing crawl references
  199. if not snapshot.crawl_id:
  200. continue
  201. try:
  202. # Reconcile index.json with DB
  203. snapshot.reconcile_with_index_json()
  204. # Clean up invalid field values from old migrations
  205. if not isinstance(snapshot.current_step, int):
  206. snapshot.current_step = 0
  207. # Queue for archiving
  208. snapshot.status = Snapshot.StatusChoices.QUEUED
  209. snapshot.retry_at = timezone.now()
  210. snapshot.save()
  211. stats['reconciled'] += 1
  212. stats['queued'] += 1
  213. except Exception as e:
  214. # Skip snapshots that can't be processed
  215. print(f" [!] Skipping snapshot {snapshot.id}: {e}")
  216. continue
  217. if stats['processed'] % batch_size == 0:
  218. transaction.commit()
  219. print(f" [{stats['processed']}/{total}] Processed...")
  220. transaction.commit()
  221. return stats
  222. def print_stats(stats: dict):
  223. """Print statistics for filtered mode."""
  224. from rich import print
  225. print(f"""
  226. [green]Update Complete[/green]
  227. Processed: {stats['processed']}
  228. Reconciled: {stats['reconciled']}
  229. Queued: {stats['queued']}
  230. """)
  231. def print_combined_stats(stats_combined: dict):
  232. """Print statistics for full mode."""
  233. from rich import print
  234. s1 = stats_combined['phase1']
  235. s2 = stats_combined['phase2']
  236. print(f"""
  237. [green]Archive Update Complete[/green]
  238. Phase 1 (Drain Old Dirs):
  239. Checked: {s1.get('processed', 0)}
  240. Migrated: {s1.get('migrated', 0)}
  241. Skipped: {s1.get('skipped', 0)}
  242. Invalid: {s1.get('invalid', 0)}
  243. Phase 2 (Process DB):
  244. Processed: {s2.get('processed', 0)}
  245. Reconciled: {s2.get('reconciled', 0)}
  246. Queued: {s2.get('queued', 0)}
  247. """)
  248. @click.command()
  249. @click.option('--resume', type=str, help='Resume from timestamp')
  250. @click.option('--before', type=float, help='Only snapshots before timestamp')
  251. @click.option('--after', type=float, help='Only snapshots after timestamp')
  252. @click.option('--filter-type', '-t', type=click.Choice(['exact', 'substring', 'regex', 'domain', 'tag', 'timestamp']), default='exact')
  253. @click.option('--batch-size', type=int, default=100, help='Commit every N snapshots')
  254. @click.option('--continuous', is_flag=True, help='Run continuously as background worker')
  255. @click.argument('filter_patterns', nargs=-1)
  256. @docstring(update.__doc__)
  257. def main(**kwargs):
  258. update(**kwargs)
  259. if __name__ == '__main__':
  260. main()