models.py 118 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809281028112812281328142815281628172818281928202821282228232824282528262827282828292830283128322833283428352836283728382839284028412842284328442845284628472848284928502851285228532854285528562857285828592860286128622863286428652866286728682869287028712872287328742875287628772878287928802881288228832884288528862887288828892890289128922893289428952896289728982899290029012902290329042905290629072908290929102911291229132914291529162917291829192920292129222923292429252926292729282929293029312932293329342935293629372938293929402941294229432944294529462947294829492950295129522953295429552956295729582959296029612962296329642965296629672968296929702971297229732974297529762977297829792980298129822983298429852986298729882989299029912992299329942995299629972998299930003001300230033004300530063007300830093010301130123013301430153016301730183019302030213022302330243025
  1. __package__ = 'archivebox.core'
  2. from typing import Optional, Dict, Iterable, Any, List, TYPE_CHECKING
  3. from archivebox.uuid_compat import uuid7
  4. from datetime import datetime, timedelta
  5. from django_stubs_ext.db.models import TypedModelMeta
  6. import os
  7. import json
  8. from pathlib import Path
  9. from statemachine import State, registry
  10. from django.db import models
  11. from django.db.models import QuerySet, Value, Case, When, IntegerField
  12. from django.utils.functional import cached_property
  13. from django.utils.text import slugify
  14. from django.utils import timezone
  15. from django.core.cache import cache
  16. from django.urls import reverse, reverse_lazy
  17. from django.contrib import admin
  18. from django.conf import settings
  19. from archivebox.config import CONSTANTS
  20. from archivebox.misc.system import get_dir_size, atomic_write
  21. from archivebox.misc.util import parse_date, base_url, domain as url_domain, to_json, ts_to_date_str, urlencode, htmlencode, urldecode
  22. from archivebox.misc.hashing import get_dir_info
  23. from archivebox.hooks import (
  24. get_plugins, get_plugin_name, get_plugin_icon,
  25. )
  26. from archivebox.base_models.models import (
  27. ModelWithUUID, ModelWithSerializers, ModelWithOutputDir,
  28. ModelWithConfig, ModelWithNotes, ModelWithHealthStats,
  29. get_or_create_system_user_pk,
  30. )
  31. from archivebox.workers.models import ModelWithStateMachine, BaseStateMachine
  32. from archivebox.workers.tasks import bg_archive_snapshot
  33. from archivebox.crawls.models import Crawl
  34. from archivebox.machine.models import NetworkInterface, Binary
  35. class Tag(ModelWithSerializers):
  36. # Keep AutoField for compatibility with main branch migrations
  37. # Don't use UUIDField here - requires complex FK transformation
  38. id = models.AutoField(primary_key=True, serialize=False, verbose_name='ID')
  39. created_by = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE, default=get_or_create_system_user_pk, null=True, related_name='tag_set')
  40. created_at = models.DateTimeField(default=timezone.now, db_index=True, null=True)
  41. modified_at = models.DateTimeField(auto_now=True)
  42. name = models.CharField(unique=True, blank=False, max_length=100)
  43. slug = models.SlugField(unique=True, blank=False, max_length=100, editable=False)
  44. snapshot_set: models.Manager['Snapshot']
  45. class Meta(TypedModelMeta):
  46. app_label = 'core'
  47. verbose_name = "Tag"
  48. verbose_name_plural = "Tags"
  49. def __str__(self):
  50. return self.name
  51. def save(self, *args, **kwargs):
  52. is_new = self._state.adding
  53. if is_new:
  54. self.slug = slugify(self.name)
  55. existing = set(Tag.objects.filter(slug__startswith=self.slug).values_list("slug", flat=True))
  56. i = None
  57. while True:
  58. slug = f"{slugify(self.name)}_{i}" if i else slugify(self.name)
  59. if slug not in existing:
  60. self.slug = slug
  61. break
  62. i = (i or 0) + 1
  63. super().save(*args, **kwargs)
  64. if is_new:
  65. from archivebox.misc.logging_util import log_worker_event
  66. log_worker_event(
  67. worker_type='DB',
  68. event='Created Tag',
  69. indent_level=0,
  70. metadata={
  71. 'id': self.id,
  72. 'name': self.name,
  73. 'slug': self.slug,
  74. },
  75. )
  76. @property
  77. def api_url(self) -> str:
  78. return reverse_lazy('api-1:get_tag', args=[self.id])
  79. def to_jsonl(self) -> dict:
  80. """
  81. Convert Tag model instance to a JSONL record.
  82. """
  83. from archivebox.config import VERSION
  84. return {
  85. 'type': 'Tag',
  86. 'schema_version': VERSION,
  87. 'id': str(self.id),
  88. 'name': self.name,
  89. 'slug': self.slug,
  90. }
  91. @staticmethod
  92. def from_jsonl(record: Dict[str, Any], overrides: Dict[str, Any] = None):
  93. """
  94. Create/update Tag from JSONL record.
  95. Args:
  96. record: JSONL record with 'name' field
  97. overrides: Optional dict with 'snapshot' to auto-attach tag
  98. Returns:
  99. Tag instance or None
  100. """
  101. name = record.get('name')
  102. if not name:
  103. return None
  104. tag, _ = Tag.objects.get_or_create(name=name)
  105. # Auto-attach to snapshot if in overrides
  106. if overrides and 'snapshot' in overrides and tag:
  107. overrides['snapshot'].tags.add(tag)
  108. return tag
  109. class SnapshotTag(models.Model):
  110. id = models.AutoField(primary_key=True)
  111. snapshot = models.ForeignKey('Snapshot', db_column='snapshot_id', on_delete=models.CASCADE, to_field='id')
  112. tag = models.ForeignKey(Tag, db_column='tag_id', on_delete=models.CASCADE, to_field='id')
  113. class Meta:
  114. app_label = 'core'
  115. db_table = 'core_snapshot_tags'
  116. unique_together = [('snapshot', 'tag')]
  117. class SnapshotQuerySet(models.QuerySet):
  118. """Custom QuerySet for Snapshot model with export methods that persist through .filter() etc."""
  119. # =========================================================================
  120. # Filtering Methods
  121. # =========================================================================
  122. FILTER_TYPES = {
  123. 'exact': lambda pattern: models.Q(url=pattern),
  124. 'substring': lambda pattern: models.Q(url__icontains=pattern),
  125. 'regex': lambda pattern: models.Q(url__iregex=pattern),
  126. 'domain': lambda pattern: models.Q(url__istartswith=f"http://{pattern}") | models.Q(url__istartswith=f"https://{pattern}") | models.Q(url__istartswith=f"ftp://{pattern}"),
  127. 'tag': lambda pattern: models.Q(tags__name=pattern),
  128. 'timestamp': lambda pattern: models.Q(timestamp=pattern),
  129. }
  130. def filter_by_patterns(self, patterns: List[str], filter_type: str = 'exact') -> 'SnapshotQuerySet':
  131. """Filter snapshots by URL patterns using specified filter type"""
  132. from archivebox.misc.logging import stderr
  133. q_filter = models.Q()
  134. for pattern in patterns:
  135. try:
  136. q_filter = q_filter | self.FILTER_TYPES[filter_type](pattern)
  137. except KeyError:
  138. stderr()
  139. stderr(f'[X] Got invalid pattern for --filter-type={filter_type}:', color='red')
  140. stderr(f' {pattern}')
  141. raise SystemExit(2)
  142. return self.filter(q_filter)
  143. def search(self, patterns: List[str]) -> 'SnapshotQuerySet':
  144. """Search snapshots using the configured search backend"""
  145. from archivebox.config.common import SEARCH_BACKEND_CONFIG
  146. from archivebox.search import query_search_index
  147. from archivebox.misc.logging import stderr
  148. if not SEARCH_BACKEND_CONFIG.USE_SEARCHING_BACKEND:
  149. stderr()
  150. stderr('[X] The search backend is not enabled, set config.USE_SEARCHING_BACKEND = True', color='red')
  151. raise SystemExit(2)
  152. qsearch = self.none()
  153. for pattern in patterns:
  154. try:
  155. qsearch |= query_search_index(pattern)
  156. except:
  157. raise SystemExit(2)
  158. return self.all() & qsearch
  159. # =========================================================================
  160. # Export Methods
  161. # =========================================================================
  162. def to_json(self, with_headers: bool = False) -> str:
  163. """Generate JSON index from snapshots"""
  164. import sys
  165. from datetime import datetime, timezone as tz
  166. from archivebox.config import VERSION
  167. from archivebox.config.common import SERVER_CONFIG
  168. MAIN_INDEX_HEADER = {
  169. 'info': 'This is an index of site data archived by ArchiveBox: The self-hosted web archive.',
  170. 'schema': 'archivebox.index.json',
  171. 'copyright_info': SERVER_CONFIG.FOOTER_INFO,
  172. 'meta': {
  173. 'project': 'ArchiveBox',
  174. 'version': VERSION,
  175. 'git_sha': VERSION,
  176. 'website': 'https://ArchiveBox.io',
  177. 'docs': 'https://github.com/ArchiveBox/ArchiveBox/wiki',
  178. 'source': 'https://github.com/ArchiveBox/ArchiveBox',
  179. 'issues': 'https://github.com/ArchiveBox/ArchiveBox/issues',
  180. 'dependencies': {},
  181. },
  182. } if with_headers else {}
  183. snapshot_dicts = [s.to_dict(extended=True) for s in self.iterator(chunk_size=500)]
  184. if with_headers:
  185. output = {
  186. **MAIN_INDEX_HEADER,
  187. 'num_links': len(snapshot_dicts),
  188. 'updated': datetime.now(tz.utc),
  189. 'last_run_cmd': sys.argv,
  190. 'links': snapshot_dicts,
  191. }
  192. else:
  193. output = snapshot_dicts
  194. return to_json(output, indent=4, sort_keys=True)
  195. def to_csv(self, cols: Optional[List[str]] = None, header: bool = True, separator: str = ',', ljust: int = 0) -> str:
  196. """Generate CSV output from snapshots"""
  197. cols = cols or ['timestamp', 'is_archived', 'url']
  198. header_str = separator.join(col.ljust(ljust) for col in cols) if header else ''
  199. row_strs = (s.to_csv(cols=cols, ljust=ljust, separator=separator) for s in self.iterator(chunk_size=500))
  200. return '\n'.join((header_str, *row_strs))
  201. def to_html(self, with_headers: bool = True) -> str:
  202. """Generate main index HTML from snapshots"""
  203. from datetime import datetime, timezone as tz
  204. from django.template.loader import render_to_string
  205. from archivebox.config import VERSION
  206. from archivebox.config.common import SERVER_CONFIG
  207. from archivebox.config.version import get_COMMIT_HASH
  208. template = 'static_index.html' if with_headers else 'minimal_index.html'
  209. snapshot_list = list(self.iterator(chunk_size=500))
  210. return render_to_string(template, {
  211. 'version': VERSION,
  212. 'git_sha': get_COMMIT_HASH() or VERSION,
  213. 'num_links': str(len(snapshot_list)),
  214. 'date_updated': datetime.now(tz.utc).strftime('%Y-%m-%d'),
  215. 'time_updated': datetime.now(tz.utc).strftime('%Y-%m-%d %H:%M'),
  216. 'links': snapshot_list,
  217. 'FOOTER_INFO': SERVER_CONFIG.FOOTER_INFO,
  218. })
  219. class SnapshotManager(models.Manager.from_queryset(SnapshotQuerySet)):
  220. """Manager for Snapshot model - uses SnapshotQuerySet for chainable methods"""
  221. def filter(self, *args, **kwargs):
  222. domain = kwargs.pop('domain', None)
  223. qs = super().filter(*args, **kwargs)
  224. if domain:
  225. qs = qs.filter(url__icontains=f'://{domain}')
  226. return qs
  227. def get_queryset(self):
  228. # Don't prefetch by default - it causes "too many open files" during bulk operations
  229. # Views/templates can add .prefetch_related('tags', 'archiveresult_set') where needed
  230. return super().get_queryset()
  231. # =========================================================================
  232. # Import Methods
  233. # =========================================================================
  234. def remove(self, atomic: bool = False) -> tuple:
  235. """Remove snapshots from the database"""
  236. from django.db import transaction
  237. if atomic:
  238. with transaction.atomic():
  239. return self.delete()
  240. return self.delete()
  241. class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHealthStats, ModelWithStateMachine):
  242. id = models.UUIDField(primary_key=True, default=uuid7, editable=False, unique=True)
  243. created_at = models.DateTimeField(default=timezone.now, db_index=True)
  244. modified_at = models.DateTimeField(auto_now=True)
  245. url = models.URLField(unique=False, db_index=True) # URLs can appear in multiple crawls
  246. timestamp = models.CharField(max_length=32, unique=True, db_index=True, editable=False)
  247. bookmarked_at = models.DateTimeField(default=timezone.now, db_index=True)
  248. crawl: Crawl = models.ForeignKey(Crawl, on_delete=models.CASCADE, null=False, related_name='snapshot_set', db_index=True) # type: ignore[assignment]
  249. parent_snapshot = models.ForeignKey('self', on_delete=models.SET_NULL, null=True, blank=True, related_name='child_snapshots', db_index=True, help_text='Parent snapshot that discovered this URL (for recursive crawling)')
  250. title = models.CharField(max_length=512, null=True, blank=True, db_index=True)
  251. downloaded_at = models.DateTimeField(default=None, null=True, editable=False, db_index=True, blank=True)
  252. depth = models.PositiveSmallIntegerField(default=0, db_index=True) # 0 for root snapshot, 1+ for discovered URLs
  253. fs_version = models.CharField(max_length=10, default='0.9.0', help_text='Filesystem version of this snapshot (e.g., "0.7.0", "0.8.0", "0.9.0"). Used to trigger lazy migration on save().')
  254. current_step = models.PositiveSmallIntegerField(default=0, db_index=True, help_text='Current hook step being executed (0-9). Used for sequential hook execution.')
  255. retry_at = ModelWithStateMachine.RetryAtField(default=timezone.now)
  256. status = ModelWithStateMachine.StatusField(choices=ModelWithStateMachine.StatusChoices, default=ModelWithStateMachine.StatusChoices.QUEUED)
  257. config = models.JSONField(default=dict, null=False, blank=False, editable=True)
  258. notes = models.TextField(blank=True, null=False, default='')
  259. # output_dir is computed via @cached_property from fs_version and get_storage_path_for_version()
  260. tags = models.ManyToManyField(Tag, blank=True, through=SnapshotTag, related_name='snapshot_set', through_fields=('snapshot', 'tag'))
  261. state_machine_name = 'archivebox.core.models.SnapshotMachine'
  262. state_field_name = 'status'
  263. retry_at_field_name = 'retry_at'
  264. StatusChoices = ModelWithStateMachine.StatusChoices
  265. active_state = StatusChoices.STARTED
  266. objects = SnapshotManager()
  267. archiveresult_set: models.Manager['ArchiveResult']
  268. class Meta(TypedModelMeta):
  269. app_label = 'core'
  270. verbose_name = "Snapshot"
  271. verbose_name_plural = "Snapshots"
  272. constraints = [
  273. # Allow same URL in different crawls, but not duplicates within same crawl
  274. models.UniqueConstraint(fields=['url', 'crawl'], name='unique_url_per_crawl'),
  275. # Global timestamp uniqueness for 1:1 symlink mapping
  276. models.UniqueConstraint(fields=['timestamp'], name='unique_timestamp'),
  277. ]
  278. def __str__(self):
  279. return f'[{self.id}] {self.url[:64]}'
  280. @property
  281. def created_by(self):
  282. """Convenience property to access the user who created this snapshot via its crawl."""
  283. return self.crawl.created_by
  284. @property
  285. def process_set(self):
  286. """Get all Process objects related to this snapshot's ArchiveResults."""
  287. from archivebox.machine.models import Process
  288. return Process.objects.filter(archiveresult__snapshot_id=self.id)
  289. @property
  290. def binary_set(self):
  291. """Get all Binary objects used by processes related to this snapshot."""
  292. from archivebox.machine.models import Binary
  293. return Binary.objects.filter(process_set__archiveresult__snapshot_id=self.id).distinct()
  294. def save(self, *args, **kwargs):
  295. is_new = self._state.adding
  296. if not self.bookmarked_at:
  297. self.bookmarked_at = self.created_at or timezone.now()
  298. if not self.timestamp:
  299. self.timestamp = str(self.bookmarked_at.timestamp())
  300. # Migrate filesystem if needed (happens automatically on save)
  301. if self.pk and self.fs_migration_needed:
  302. from django.db import transaction
  303. with transaction.atomic():
  304. # Walk through migration chain automatically
  305. current = self.fs_version
  306. target = self._fs_current_version()
  307. while current != target:
  308. next_ver = self._fs_next_version(current)
  309. method = f'_fs_migrate_from_{current.replace(".", "_")}_to_{next_ver.replace(".", "_")}'
  310. # Only run if method exists (most are no-ops)
  311. if hasattr(self, method):
  312. getattr(self, method)()
  313. current = next_ver
  314. # Update version (still in transaction)
  315. self.fs_version = target
  316. super().save(*args, **kwargs)
  317. if self.url not in self.crawl.urls:
  318. self.crawl.urls += f'\n{self.url}'
  319. self.crawl.save()
  320. if is_new:
  321. from archivebox.misc.logging_util import log_worker_event
  322. log_worker_event(
  323. worker_type='DB',
  324. event='Created Snapshot',
  325. indent_level=2,
  326. url=self.url,
  327. metadata={
  328. 'id': str(self.id),
  329. 'crawl_id': str(self.crawl_id),
  330. 'depth': self.depth,
  331. 'status': self.status,
  332. },
  333. )
  334. # =========================================================================
  335. # Filesystem Migration Methods
  336. # =========================================================================
  337. @staticmethod
  338. def _fs_current_version() -> str:
  339. """Get current ArchiveBox filesystem version (normalized to x.x.0 format)"""
  340. from archivebox.config import VERSION
  341. # Normalize version to x.x.0 format (e.g., "0.9.0rc1" -> "0.9.0")
  342. parts = VERSION.split('.')
  343. if len(parts) >= 2:
  344. major, minor = parts[0], parts[1]
  345. # Strip any non-numeric suffix from minor version
  346. minor = ''.join(c for c in minor if c.isdigit())
  347. return f'{major}.{minor}.0'
  348. return '0.9.0' # Fallback if version parsing fails
  349. @property
  350. def fs_migration_needed(self) -> bool:
  351. """Check if snapshot needs filesystem migration"""
  352. return self.fs_version != self._fs_current_version()
  353. def _fs_next_version(self, version: str) -> str:
  354. """Get next version in migration chain (0.7/0.8 had same layout, only 0.8→0.9 migration needed)"""
  355. # Treat 0.7.0 and 0.8.0 as equivalent (both used archive/{timestamp})
  356. if version in ('0.7.0', '0.8.0'):
  357. return '0.9.0'
  358. return self._fs_current_version()
  359. def _fs_migrate_from_0_8_0_to_0_9_0(self):
  360. """
  361. Migrate from flat to nested structure.
  362. 0.8.x: archive/{timestamp}/
  363. 0.9.x: users/{user}/snapshots/YYYYMMDD/{domain}/{uuid}/
  364. Transaction handling:
  365. 1. Copy files INSIDE transaction
  366. 2. Convert index.json to index.jsonl INSIDE transaction
  367. 3. Create symlink INSIDE transaction
  368. 4. Update fs_version INSIDE transaction (done by save())
  369. 5. Exit transaction (DB commit)
  370. 6. Delete old files OUTSIDE transaction (after commit)
  371. """
  372. import shutil
  373. from django.db import transaction
  374. old_dir = self.get_storage_path_for_version('0.8.0')
  375. new_dir = self.get_storage_path_for_version('0.9.0')
  376. if not old_dir.exists() or old_dir == new_dir or new_dir.exists():
  377. # Even if no directory migration needed, still convert index format
  378. self.convert_index_json_to_jsonl()
  379. return
  380. new_dir.mkdir(parents=True, exist_ok=True)
  381. # Copy all files (idempotent), skipping index.json (will be converted to jsonl)
  382. for old_file in old_dir.rglob('*'):
  383. if not old_file.is_file():
  384. continue
  385. rel_path = old_file.relative_to(old_dir)
  386. new_file = new_dir / rel_path
  387. # Skip if already copied
  388. if new_file.exists() and new_file.stat().st_size == old_file.stat().st_size:
  389. continue
  390. new_file.parent.mkdir(parents=True, exist_ok=True)
  391. shutil.copy2(old_file, new_file)
  392. # Verify all copied
  393. old_files = {f.relative_to(old_dir): f.stat().st_size
  394. for f in old_dir.rglob('*') if f.is_file()}
  395. new_files = {f.relative_to(new_dir): f.stat().st_size
  396. for f in new_dir.rglob('*') if f.is_file()}
  397. if old_files.keys() != new_files.keys():
  398. missing = old_files.keys() - new_files.keys()
  399. raise Exception(f"Migration incomplete: missing {missing}")
  400. # Convert index.json to index.jsonl in the new directory
  401. self.convert_index_json_to_jsonl()
  402. # Create backwards-compat symlink (INSIDE transaction)
  403. symlink_path = CONSTANTS.ARCHIVE_DIR / self.timestamp
  404. if symlink_path.is_symlink():
  405. symlink_path.unlink()
  406. if not symlink_path.exists() or symlink_path == old_dir:
  407. symlink_path.symlink_to(new_dir, target_is_directory=True)
  408. # Schedule old directory deletion AFTER transaction commits
  409. transaction.on_commit(lambda: self._cleanup_old_migration_dir(old_dir))
  410. def _cleanup_old_migration_dir(self, old_dir: Path):
  411. """
  412. Delete old directory after successful migration.
  413. Called via transaction.on_commit() after DB commit succeeds.
  414. """
  415. import shutil
  416. import logging
  417. if old_dir.exists() and not old_dir.is_symlink():
  418. try:
  419. shutil.rmtree(old_dir)
  420. except Exception as e:
  421. # Log but don't raise - migration succeeded, this is just cleanup
  422. logging.getLogger('archivebox.migration').warning(
  423. f"Could not remove old migration directory {old_dir}: {e}"
  424. )
  425. # =========================================================================
  426. # Path Calculation and Migration Helpers
  427. # =========================================================================
  428. @staticmethod
  429. def extract_domain_from_url(url: str) -> str:
  430. """
  431. Extract domain from URL for 0.9.x path structure.
  432. Uses full hostname with sanitized special chars.
  433. Examples:
  434. https://example.com:8080 → example.com_8080
  435. https://sub.example.com → sub.example.com
  436. file:///path → localhost
  437. data:text/html → data
  438. """
  439. from urllib.parse import urlparse
  440. try:
  441. parsed = urlparse(url)
  442. if parsed.scheme in ('http', 'https'):
  443. if parsed.port:
  444. return f"{parsed.hostname}_{parsed.port}".replace(':', '_')
  445. return parsed.hostname or 'unknown'
  446. elif parsed.scheme == 'file':
  447. return 'localhost'
  448. elif parsed.scheme:
  449. return parsed.scheme
  450. else:
  451. return 'unknown'
  452. except Exception:
  453. return 'unknown'
  454. def get_storage_path_for_version(self, version: str) -> Path:
  455. """
  456. Calculate storage path for specific filesystem version.
  457. Centralizes path logic so it's reusable.
  458. 0.7.x/0.8.x: archive/{timestamp}
  459. 0.9.x: users/{username}/snapshots/YYYYMMDD/{domain}/{uuid}/
  460. """
  461. from datetime import datetime
  462. if version in ('0.7.0', '0.8.0'):
  463. return CONSTANTS.ARCHIVE_DIR / self.timestamp
  464. elif version in ('0.9.0', '1.0.0'):
  465. username = self.created_by.username
  466. # Use created_at for date grouping (fallback to timestamp)
  467. if self.created_at:
  468. date_str = self.created_at.strftime('%Y%m%d')
  469. else:
  470. date_str = datetime.fromtimestamp(float(self.timestamp)).strftime('%Y%m%d')
  471. domain = self.extract_domain_from_url(self.url)
  472. return (
  473. CONSTANTS.DATA_DIR / 'users' / username / 'snapshots' /
  474. date_str / domain / str(self.id)
  475. )
  476. else:
  477. # Unknown version - use current
  478. return self.get_storage_path_for_version(self._fs_current_version())
  479. # =========================================================================
  480. # Loading and Creation from Filesystem (Used by archivebox update ONLY)
  481. # =========================================================================
  482. @classmethod
  483. def load_from_directory(cls, snapshot_dir: Path) -> Optional['Snapshot']:
  484. """
  485. Load existing Snapshot from DB by reading index.jsonl or index.json.
  486. Reads index file, extracts url+timestamp, queries DB.
  487. Returns existing Snapshot or None if not found/invalid.
  488. Does NOT create new snapshots.
  489. ONLY used by: archivebox update (for orphan detection)
  490. """
  491. import json
  492. # Try index.jsonl first (new format), then index.json (legacy)
  493. jsonl_path = snapshot_dir / CONSTANTS.JSONL_INDEX_FILENAME
  494. json_path = snapshot_dir / CONSTANTS.JSON_INDEX_FILENAME
  495. data = None
  496. if jsonl_path.exists():
  497. try:
  498. with open(jsonl_path) as f:
  499. for line in f:
  500. line = line.strip()
  501. if line.startswith('{'):
  502. record = json.loads(line)
  503. if record.get('type') == 'Snapshot':
  504. data = record
  505. break
  506. except (json.JSONDecodeError, OSError):
  507. pass
  508. elif json_path.exists():
  509. try:
  510. with open(json_path) as f:
  511. data = json.load(f)
  512. except (json.JSONDecodeError, OSError):
  513. pass
  514. if not data:
  515. return None
  516. url = data.get('url')
  517. if not url:
  518. return None
  519. # Get timestamp - prefer index file, fallback to folder name
  520. timestamp = cls._select_best_timestamp(
  521. index_timestamp=data.get('timestamp'),
  522. folder_name=snapshot_dir.name
  523. )
  524. if not timestamp:
  525. return None
  526. # Look up existing
  527. try:
  528. return cls.objects.get(url=url, timestamp=timestamp)
  529. except cls.DoesNotExist:
  530. return None
  531. except cls.MultipleObjectsReturned:
  532. # Should not happen with unique constraint
  533. return cls.objects.filter(url=url, timestamp=timestamp).first()
  534. @classmethod
  535. def create_from_directory(cls, snapshot_dir: Path) -> Optional['Snapshot']:
  536. """
  537. Create new Snapshot from orphaned directory.
  538. Validates timestamp, ensures uniqueness.
  539. Returns new UNSAVED Snapshot or None if invalid.
  540. ONLY used by: archivebox update (for orphan import)
  541. """
  542. import json
  543. # Try index.jsonl first (new format), then index.json (legacy)
  544. jsonl_path = snapshot_dir / CONSTANTS.JSONL_INDEX_FILENAME
  545. json_path = snapshot_dir / CONSTANTS.JSON_INDEX_FILENAME
  546. data = None
  547. if jsonl_path.exists():
  548. try:
  549. with open(jsonl_path) as f:
  550. for line in f:
  551. line = line.strip()
  552. if line.startswith('{'):
  553. record = json.loads(line)
  554. if record.get('type') == 'Snapshot':
  555. data = record
  556. break
  557. except (json.JSONDecodeError, OSError):
  558. pass
  559. elif json_path.exists():
  560. try:
  561. with open(json_path) as f:
  562. data = json.load(f)
  563. except (json.JSONDecodeError, OSError):
  564. pass
  565. if not data:
  566. return None
  567. url = data.get('url')
  568. if not url:
  569. return None
  570. # Get and validate timestamp
  571. timestamp = cls._select_best_timestamp(
  572. index_timestamp=data.get('timestamp'),
  573. folder_name=snapshot_dir.name
  574. )
  575. if not timestamp:
  576. return None
  577. # Ensure uniqueness (reuses existing logic from create_or_update_from_dict)
  578. timestamp = cls._ensure_unique_timestamp(url, timestamp)
  579. # Detect version
  580. fs_version = cls._detect_fs_version_from_index(data)
  581. # Get or create catchall crawl for orphaned snapshots
  582. from archivebox.crawls.models import Crawl
  583. system_user_id = get_or_create_system_user_pk()
  584. catchall_crawl, _ = Crawl.objects.get_or_create(
  585. label='[migration] orphaned snapshots',
  586. defaults={
  587. 'urls': f'# Orphaned snapshot: {url}',
  588. 'max_depth': 0,
  589. 'created_by_id': system_user_id,
  590. }
  591. )
  592. return cls(
  593. url=url,
  594. timestamp=timestamp,
  595. title=data.get('title', ''),
  596. fs_version=fs_version,
  597. crawl=catchall_crawl,
  598. )
  599. @staticmethod
  600. def _select_best_timestamp(index_timestamp: str, folder_name: str) -> Optional[str]:
  601. """
  602. Select best timestamp from index.json vs folder name.
  603. Validates range (1995-2035).
  604. Prefers index.json if valid.
  605. """
  606. def is_valid_timestamp(ts):
  607. try:
  608. ts_int = int(float(ts))
  609. # 1995-01-01 to 2035-12-31
  610. return 788918400 <= ts_int <= 2082758400
  611. except:
  612. return False
  613. index_valid = is_valid_timestamp(index_timestamp) if index_timestamp else False
  614. folder_valid = is_valid_timestamp(folder_name)
  615. if index_valid:
  616. return str(int(float(index_timestamp)))
  617. elif folder_valid:
  618. return str(int(float(folder_name)))
  619. else:
  620. return None
  621. @classmethod
  622. def _ensure_unique_timestamp(cls, url: str, timestamp: str) -> str:
  623. """
  624. Ensure timestamp is globally unique.
  625. If collision with different URL, increment by 1 until unique.
  626. NOTE: Logic already exists in create_or_update_from_dict (line 266-267)
  627. This is just an extracted, reusable version.
  628. """
  629. while cls.objects.filter(timestamp=timestamp).exclude(url=url).exists():
  630. timestamp = str(int(float(timestamp)) + 1)
  631. return timestamp
  632. @staticmethod
  633. def _detect_fs_version_from_index(data: dict) -> str:
  634. """
  635. Detect fs_version from index.json structure.
  636. - Has fs_version field: use it
  637. - Has history dict: 0.7.0
  638. - Has archive_results list: 0.8.0
  639. - Default: 0.7.0
  640. """
  641. if 'fs_version' in data:
  642. return data['fs_version']
  643. if 'history' in data and 'archive_results' not in data:
  644. return '0.7.0'
  645. if 'archive_results' in data:
  646. return '0.8.0'
  647. return '0.7.0'
  648. # =========================================================================
  649. # Index.json Reconciliation
  650. # =========================================================================
  651. def reconcile_with_index(self):
  652. """
  653. Merge index.json/index.jsonl with DB. DB is source of truth.
  654. - Title: longest non-URL
  655. - Tags: union
  656. - ArchiveResults: keep both (by plugin+start_ts)
  657. Converts index.json to index.jsonl if needed, then writes back in JSONL format.
  658. Used by: archivebox update (to sync index with DB)
  659. """
  660. import json
  661. # Try to convert index.json to index.jsonl first
  662. self.convert_index_json_to_jsonl()
  663. # Check for index.jsonl (preferred) or index.json (legacy)
  664. jsonl_path = Path(self.output_dir) / CONSTANTS.JSONL_INDEX_FILENAME
  665. json_path = Path(self.output_dir) / CONSTANTS.JSON_INDEX_FILENAME
  666. index_data = {}
  667. if jsonl_path.exists():
  668. # Read from JSONL format
  669. jsonl_data = self.read_index_jsonl()
  670. if jsonl_data['snapshot']:
  671. index_data = jsonl_data['snapshot']
  672. # Convert archive_results list to expected format
  673. index_data['archive_results'] = jsonl_data['archive_results']
  674. elif json_path.exists():
  675. # Fallback to legacy JSON format
  676. try:
  677. with open(json_path) as f:
  678. index_data = json.load(f)
  679. except:
  680. pass
  681. # Merge title
  682. self._merge_title_from_index(index_data)
  683. # Merge tags
  684. self._merge_tags_from_index(index_data)
  685. # Merge ArchiveResults
  686. self._merge_archive_results_from_index(index_data)
  687. # Write back in JSONL format
  688. self.write_index_jsonl()
  689. def reconcile_with_index_json(self):
  690. """Deprecated: use reconcile_with_index() instead."""
  691. return self.reconcile_with_index()
  692. def _merge_title_from_index(self, index_data: dict):
  693. """Merge title - prefer longest non-URL title."""
  694. index_title = index_data.get('title', '').strip()
  695. db_title = self.title or ''
  696. candidates = [t for t in [index_title, db_title] if t and t != self.url]
  697. if candidates:
  698. best_title = max(candidates, key=len)
  699. if self.title != best_title:
  700. self.title = best_title
  701. def _merge_tags_from_index(self, index_data: dict):
  702. """Merge tags - union of both sources."""
  703. from django.db import transaction
  704. index_tags = set(index_data.get('tags', '').split(',')) if index_data.get('tags') else set()
  705. index_tags = {t.strip() for t in index_tags if t.strip()}
  706. db_tags = set(self.tags.values_list('name', flat=True))
  707. new_tags = index_tags - db_tags
  708. if new_tags:
  709. with transaction.atomic():
  710. for tag_name in new_tags:
  711. tag, _ = Tag.objects.get_or_create(name=tag_name)
  712. self.tags.add(tag)
  713. def _merge_archive_results_from_index(self, index_data: dict):
  714. """Merge ArchiveResults - keep both (by plugin+start_ts)."""
  715. existing = {
  716. (ar.plugin, ar.start_ts): ar
  717. for ar in ArchiveResult.objects.filter(snapshot=self)
  718. }
  719. # Handle 0.8.x format (archive_results list)
  720. for result_data in index_data.get('archive_results', []):
  721. self._create_archive_result_if_missing(result_data, existing)
  722. # Handle 0.7.x format (history dict)
  723. if 'history' in index_data and isinstance(index_data['history'], dict):
  724. for plugin, result_list in index_data['history'].items():
  725. if isinstance(result_list, list):
  726. for result_data in result_list:
  727. # Support both old 'extractor' and new 'plugin' keys for backwards compat
  728. result_data['plugin'] = result_data.get('plugin') or result_data.get('extractor') or plugin
  729. self._create_archive_result_if_missing(result_data, existing)
  730. def _create_archive_result_if_missing(self, result_data: dict, existing: dict):
  731. """Create ArchiveResult if not already in DB."""
  732. from dateutil import parser
  733. # Support both old 'extractor' and new 'plugin' keys for backwards compat
  734. plugin = result_data.get('plugin') or result_data.get('extractor', '')
  735. if not plugin:
  736. return
  737. start_ts = None
  738. if result_data.get('start_ts'):
  739. try:
  740. start_ts = parser.parse(result_data['start_ts'])
  741. except:
  742. pass
  743. if (plugin, start_ts) in existing:
  744. return
  745. try:
  746. end_ts = None
  747. if result_data.get('end_ts'):
  748. try:
  749. end_ts = parser.parse(result_data['end_ts'])
  750. except:
  751. pass
  752. # Support both 'output' (legacy) and 'output_str' (new JSONL) field names
  753. output_str = result_data.get('output_str') or result_data.get('output', '')
  754. ArchiveResult.objects.create(
  755. snapshot=self,
  756. plugin=plugin,
  757. hook_name=result_data.get('hook_name', ''),
  758. status=result_data.get('status', 'failed'),
  759. output_str=output_str,
  760. cmd=result_data.get('cmd', []),
  761. pwd=result_data.get('pwd', str(self.output_dir)),
  762. start_ts=start_ts,
  763. end_ts=end_ts,
  764. )
  765. except:
  766. pass
  767. def write_index_json(self):
  768. """Write index.json in 0.9.x format (deprecated, use write_index_jsonl)."""
  769. import json
  770. index_path = Path(self.output_dir) / 'index.json'
  771. data = {
  772. 'url': self.url,
  773. 'timestamp': self.timestamp,
  774. 'title': self.title or '',
  775. 'tags': ','.join(sorted(self.tags.values_list('name', flat=True))),
  776. 'fs_version': self.fs_version,
  777. 'bookmarked_at': self.bookmarked_at.isoformat() if self.bookmarked_at else None,
  778. 'created_at': self.created_at.isoformat() if self.created_at else None,
  779. 'archive_results': [
  780. {
  781. 'plugin': ar.plugin,
  782. 'status': ar.status,
  783. 'start_ts': ar.start_ts.isoformat() if ar.start_ts else None,
  784. 'end_ts': ar.end_ts.isoformat() if ar.end_ts else None,
  785. 'output': ar.output_str or '',
  786. 'cmd': ar.cmd if isinstance(ar.cmd, list) else [],
  787. 'pwd': ar.pwd,
  788. }
  789. for ar in ArchiveResult.objects.filter(snapshot=self).order_by('start_ts')
  790. ],
  791. }
  792. index_path.parent.mkdir(parents=True, exist_ok=True)
  793. with open(index_path, 'w') as f:
  794. json.dump(data, f, indent=2, sort_keys=True)
  795. def write_index_jsonl(self):
  796. """
  797. Write index.jsonl in flat JSONL format.
  798. Each line is a JSON record with a 'type' field:
  799. - Snapshot: snapshot metadata (crawl_id, url, tags, etc.)
  800. - ArchiveResult: extractor results (plugin, status, output, etc.)
  801. - Binary: binary info used for the extraction
  802. - Process: process execution details (cmd, exit_code, timing, etc.)
  803. """
  804. import json
  805. index_path = Path(self.output_dir) / CONSTANTS.JSONL_INDEX_FILENAME
  806. index_path.parent.mkdir(parents=True, exist_ok=True)
  807. # Track unique binaries and processes to avoid duplicates
  808. binaries_seen = set()
  809. processes_seen = set()
  810. with open(index_path, 'w') as f:
  811. # Write Snapshot record first (to_jsonl includes crawl_id, fs_version)
  812. f.write(json.dumps(self.to_jsonl()) + '\n')
  813. # Write ArchiveResult records with their associated Binary and Process
  814. # Use select_related to optimize queries
  815. for ar in self.archiveresult_set.select_related('process__binary').order_by('start_ts'):
  816. # Write Binary record if not already written
  817. if ar.process and ar.process.binary and ar.process.binary_id not in binaries_seen:
  818. binaries_seen.add(ar.process.binary_id)
  819. f.write(json.dumps(ar.process.binary.to_jsonl()) + '\n')
  820. # Write Process record if not already written
  821. if ar.process and ar.process_id not in processes_seen:
  822. processes_seen.add(ar.process_id)
  823. f.write(json.dumps(ar.process.to_jsonl()) + '\n')
  824. # Write ArchiveResult record
  825. f.write(json.dumps(ar.to_jsonl()) + '\n')
  826. def read_index_jsonl(self) -> dict:
  827. """
  828. Read index.jsonl and return parsed records grouped by type.
  829. Returns dict with keys: 'snapshot', 'archive_results', 'binaries', 'processes'
  830. """
  831. import json
  832. from archivebox.misc.jsonl import (
  833. TYPE_SNAPSHOT, TYPE_ARCHIVERESULT, TYPE_BINARY, TYPE_PROCESS,
  834. )
  835. index_path = Path(self.output_dir) / CONSTANTS.JSONL_INDEX_FILENAME
  836. result = {
  837. 'snapshot': None,
  838. 'archive_results': [],
  839. 'binaries': [],
  840. 'processes': [],
  841. }
  842. if not index_path.exists():
  843. return result
  844. with open(index_path, 'r') as f:
  845. for line in f:
  846. line = line.strip()
  847. if not line or not line.startswith('{'):
  848. continue
  849. try:
  850. record = json.loads(line)
  851. record_type = record.get('type')
  852. if record_type == TYPE_SNAPSHOT:
  853. result['snapshot'] = record
  854. elif record_type == TYPE_ARCHIVERESULT:
  855. result['archive_results'].append(record)
  856. elif record_type == TYPE_BINARY:
  857. result['binaries'].append(record)
  858. elif record_type == TYPE_PROCESS:
  859. result['processes'].append(record)
  860. except json.JSONDecodeError:
  861. continue
  862. return result
  863. def convert_index_json_to_jsonl(self) -> bool:
  864. """
  865. Convert index.json to index.jsonl format.
  866. Reads existing index.json, creates index.jsonl, and removes index.json.
  867. Returns True if conversion was performed, False if no conversion needed.
  868. """
  869. import json
  870. json_path = Path(self.output_dir) / CONSTANTS.JSON_INDEX_FILENAME
  871. jsonl_path = Path(self.output_dir) / CONSTANTS.JSONL_INDEX_FILENAME
  872. # Skip if already converted or no json file exists
  873. if jsonl_path.exists() or not json_path.exists():
  874. return False
  875. try:
  876. with open(json_path, 'r') as f:
  877. data = json.load(f)
  878. except (json.JSONDecodeError, OSError):
  879. return False
  880. # Detect format version and extract records
  881. fs_version = data.get('fs_version', '0.7.0')
  882. jsonl_path.parent.mkdir(parents=True, exist_ok=True)
  883. with open(jsonl_path, 'w') as f:
  884. # Write Snapshot record
  885. snapshot_record = {
  886. 'type': 'Snapshot',
  887. 'id': str(self.id),
  888. 'crawl_id': str(self.crawl_id) if self.crawl_id else None,
  889. 'url': data.get('url', self.url),
  890. 'timestamp': data.get('timestamp', self.timestamp),
  891. 'title': data.get('title', self.title or ''),
  892. 'tags': data.get('tags', ''),
  893. 'fs_version': fs_version,
  894. 'bookmarked_at': data.get('bookmarked_at'),
  895. 'created_at': data.get('created_at'),
  896. }
  897. f.write(json.dumps(snapshot_record) + '\n')
  898. # Handle 0.8.x/0.9.x format (archive_results list)
  899. for result_data in data.get('archive_results', []):
  900. ar_record = {
  901. 'type': 'ArchiveResult',
  902. 'snapshot_id': str(self.id),
  903. 'plugin': result_data.get('plugin', ''),
  904. 'status': result_data.get('status', ''),
  905. 'output_str': result_data.get('output', ''),
  906. 'start_ts': result_data.get('start_ts'),
  907. 'end_ts': result_data.get('end_ts'),
  908. }
  909. if result_data.get('cmd'):
  910. ar_record['cmd'] = result_data['cmd']
  911. f.write(json.dumps(ar_record) + '\n')
  912. # Handle 0.7.x format (history dict)
  913. if 'history' in data and isinstance(data['history'], dict):
  914. for plugin, result_list in data['history'].items():
  915. if not isinstance(result_list, list):
  916. continue
  917. for result_data in result_list:
  918. ar_record = {
  919. 'type': 'ArchiveResult',
  920. 'snapshot_id': str(self.id),
  921. 'plugin': result_data.get('plugin') or result_data.get('extractor') or plugin,
  922. 'status': result_data.get('status', ''),
  923. 'output_str': result_data.get('output', ''),
  924. 'start_ts': result_data.get('start_ts'),
  925. 'end_ts': result_data.get('end_ts'),
  926. }
  927. if result_data.get('cmd'):
  928. ar_record['cmd'] = result_data['cmd']
  929. f.write(json.dumps(ar_record) + '\n')
  930. # Remove old index.json after successful conversion
  931. try:
  932. json_path.unlink()
  933. except OSError:
  934. pass
  935. return True
  936. # =========================================================================
  937. # Snapshot Utilities
  938. # =========================================================================
  939. @staticmethod
  940. def move_directory_to_invalid(snapshot_dir: Path):
  941. """
  942. Move invalid directory to data/invalid/YYYYMMDD/.
  943. Used by: archivebox update (when encountering invalid directories)
  944. """
  945. from datetime import datetime
  946. import shutil
  947. invalid_dir = CONSTANTS.DATA_DIR / 'invalid' / datetime.now().strftime('%Y%m%d')
  948. invalid_dir.mkdir(parents=True, exist_ok=True)
  949. dest = invalid_dir / snapshot_dir.name
  950. counter = 1
  951. while dest.exists():
  952. dest = invalid_dir / f"{snapshot_dir.name}_{counter}"
  953. counter += 1
  954. try:
  955. shutil.move(str(snapshot_dir), str(dest))
  956. except:
  957. pass
  958. @classmethod
  959. def find_and_merge_duplicates(cls) -> int:
  960. """
  961. Find and merge snapshots with same url:timestamp.
  962. Returns count of duplicate sets merged.
  963. Used by: archivebox update (Phase 3: deduplication)
  964. """
  965. from django.db.models import Count
  966. duplicates = (
  967. cls.objects
  968. .values('url', 'timestamp')
  969. .annotate(count=Count('id'))
  970. .filter(count__gt=1)
  971. )
  972. merged = 0
  973. for dup in duplicates.iterator(chunk_size=500):
  974. snapshots = list(
  975. cls.objects
  976. .filter(url=dup['url'], timestamp=dup['timestamp'])
  977. .order_by('created_at') # Keep oldest
  978. )
  979. if len(snapshots) > 1:
  980. try:
  981. cls._merge_snapshots(snapshots)
  982. merged += 1
  983. except:
  984. pass
  985. return merged
  986. @classmethod
  987. def _merge_snapshots(cls, snapshots: list['Snapshot']):
  988. """
  989. Merge exact duplicates.
  990. Keep oldest, union files + ArchiveResults.
  991. """
  992. import shutil
  993. keeper = snapshots[0]
  994. duplicates = snapshots[1:]
  995. keeper_dir = Path(keeper.output_dir)
  996. for dup in duplicates:
  997. dup_dir = Path(dup.output_dir)
  998. # Merge files
  999. if dup_dir.exists() and dup_dir != keeper_dir:
  1000. for dup_file in dup_dir.rglob('*'):
  1001. if not dup_file.is_file():
  1002. continue
  1003. rel = dup_file.relative_to(dup_dir)
  1004. keeper_file = keeper_dir / rel
  1005. if not keeper_file.exists():
  1006. keeper_file.parent.mkdir(parents=True, exist_ok=True)
  1007. shutil.copy2(dup_file, keeper_file)
  1008. try:
  1009. shutil.rmtree(dup_dir)
  1010. except:
  1011. pass
  1012. # Merge tags
  1013. for tag in dup.tags.all():
  1014. keeper.tags.add(tag)
  1015. # Move ArchiveResults
  1016. ArchiveResult.objects.filter(snapshot=dup).update(snapshot=keeper)
  1017. # Delete
  1018. dup.delete()
  1019. # =========================================================================
  1020. # Output Directory Properties
  1021. # =========================================================================
  1022. @property
  1023. def output_dir_parent(self) -> str:
  1024. return 'archive'
  1025. @property
  1026. def output_dir_name(self) -> str:
  1027. return str(self.timestamp)
  1028. def archive(self, overwrite=False, methods=None):
  1029. return bg_archive_snapshot(self, overwrite=overwrite, methods=methods)
  1030. @admin.display(description='Tags')
  1031. def tags_str(self, nocache=True) -> str | None:
  1032. calc_tags_str = lambda: ','.join(sorted(tag.name for tag in self.tags.all()))
  1033. if hasattr(self, '_prefetched_objects_cache') and 'tags' in self._prefetched_objects_cache:
  1034. return calc_tags_str()
  1035. cache_key = f'{self.pk}-tags'
  1036. return cache.get_or_set(cache_key, calc_tags_str) if not nocache else calc_tags_str()
  1037. def icons(self) -> str:
  1038. """Generate HTML icons showing which extractor plugins have succeeded for this snapshot"""
  1039. from django.utils.html import format_html, mark_safe
  1040. cache_key = f'result_icons:{self.pk}:{(self.downloaded_at or self.modified_at or self.created_at or self.bookmarked_at).timestamp()}'
  1041. def calc_icons():
  1042. if hasattr(self, '_prefetched_objects_cache') and 'archiveresult_set' in self._prefetched_objects_cache:
  1043. archive_results = {r.plugin: r for r in self.archiveresult_set.all() if r.status == "succeeded" and (r.output_files or r.output_str)}
  1044. else:
  1045. # Filter for results that have either output_files or output_str
  1046. from django.db.models import Q
  1047. archive_results = {r.plugin: r for r in self.archiveresult_set.filter(
  1048. Q(status="succeeded") & (Q(output_files__isnull=False) | ~Q(output_str=''))
  1049. )}
  1050. path = self.archive_path
  1051. canon = self.canonical_outputs()
  1052. output = ""
  1053. output_template = '<a href="/{}/{}" class="exists-{}" title="{}">{}</a> &nbsp;'
  1054. # Get all plugins from hooks system (sorted by numeric prefix)
  1055. all_plugins = [get_plugin_name(e) for e in get_plugins()]
  1056. for plugin in all_plugins:
  1057. result = archive_results.get(plugin)
  1058. existing = result and result.status == 'succeeded' and (result.output_files or result.output_str)
  1059. icon = get_plugin_icon(plugin)
  1060. # Skip plugins with empty icons that have no output
  1061. # (e.g., staticfile only shows when there's actual output)
  1062. if not icon.strip() and not existing:
  1063. continue
  1064. output += format_html(
  1065. output_template,
  1066. path,
  1067. canon.get(plugin, plugin + '/'),
  1068. str(bool(existing)),
  1069. plugin,
  1070. icon
  1071. )
  1072. return format_html('<span class="files-icons" style="font-size: 1.1em; opacity: 0.8; min-width: 240px; display: inline-block">{}</span>', mark_safe(output))
  1073. cache_result = cache.get(cache_key)
  1074. if cache_result:
  1075. return cache_result
  1076. fresh_result = calc_icons()
  1077. cache.set(cache_key, fresh_result, timeout=60 * 60 * 24)
  1078. return fresh_result
  1079. @property
  1080. def api_url(self) -> str:
  1081. return reverse_lazy('api-1:get_snapshot', args=[self.id])
  1082. def get_absolute_url(self):
  1083. return f'/{self.archive_path}'
  1084. @cached_property
  1085. def domain(self) -> str:
  1086. return url_domain(self.url)
  1087. @cached_property
  1088. def output_dir(self):
  1089. """The filesystem path to the snapshot's output directory."""
  1090. import os
  1091. current_path = self.get_storage_path_for_version(self.fs_version)
  1092. if current_path.exists():
  1093. return str(current_path)
  1094. # Check for backwards-compat symlink
  1095. old_path = CONSTANTS.ARCHIVE_DIR / self.timestamp
  1096. if old_path.is_symlink():
  1097. return str(Path(os.readlink(old_path)).resolve())
  1098. elif old_path.exists():
  1099. return str(old_path)
  1100. return str(current_path)
  1101. @cached_property
  1102. def archive_path(self):
  1103. return f'{CONSTANTS.ARCHIVE_DIR_NAME}/{self.timestamp}'
  1104. @cached_property
  1105. def archive_size(self):
  1106. try:
  1107. return get_dir_size(self.output_dir)[0]
  1108. except Exception:
  1109. return 0
  1110. def save_tags(self, tags: Iterable[str] = ()) -> None:
  1111. tags_id = [Tag.objects.get_or_create(name=tag)[0].pk for tag in tags if tag.strip()]
  1112. self.tags.clear()
  1113. self.tags.add(*tags_id)
  1114. def pending_archiveresults(self) -> QuerySet['ArchiveResult']:
  1115. return self.archiveresult_set.exclude(status__in=ArchiveResult.FINAL_OR_ACTIVE_STATES)
  1116. def run(self) -> list['ArchiveResult']:
  1117. """
  1118. Execute snapshot by creating pending ArchiveResults for all enabled hooks.
  1119. Called by: SnapshotMachine.enter_started()
  1120. Hook Lifecycle:
  1121. 1. discover_hooks('Snapshot') → finds all plugin hooks
  1122. 2. For each hook:
  1123. - Create ArchiveResult with status=QUEUED
  1124. - Store hook_name (e.g., 'on_Snapshot__50_wget.py')
  1125. 3. ArchiveResults execute independently via ArchiveResultMachine
  1126. 4. Hook execution happens in ArchiveResult.run(), NOT here
  1127. Returns:
  1128. list[ArchiveResult]: Newly created pending results
  1129. """
  1130. return self.create_pending_archiveresults()
  1131. def cleanup(self):
  1132. """
  1133. Clean up background ArchiveResult hooks.
  1134. Called by the state machine when entering the 'sealed' state.
  1135. Kills any background hooks and finalizes their ArchiveResults.
  1136. """
  1137. from archivebox.hooks import kill_process
  1138. # Kill any background ArchiveResult hooks
  1139. if not self.OUTPUT_DIR.exists():
  1140. return
  1141. # Find all .pid files in this snapshot's output directory
  1142. for pid_file in self.OUTPUT_DIR.glob('**/*.pid'):
  1143. kill_process(pid_file, validate=True)
  1144. # Update all STARTED ArchiveResults from filesystem
  1145. results = self.archiveresult_set.filter(status=ArchiveResult.StatusChoices.STARTED)
  1146. for ar in results:
  1147. ar.update_from_output()
  1148. def has_running_background_hooks(self) -> bool:
  1149. """
  1150. Check if any ArchiveResult background hooks are still running.
  1151. Used by state machine to determine if snapshot is finished.
  1152. """
  1153. from archivebox.hooks import process_is_alive
  1154. if not self.OUTPUT_DIR.exists():
  1155. return False
  1156. for plugin_dir in self.OUTPUT_DIR.iterdir():
  1157. if not plugin_dir.is_dir():
  1158. continue
  1159. pid_file = plugin_dir / 'hook.pid'
  1160. if process_is_alive(pid_file):
  1161. return True
  1162. return False
  1163. def to_jsonl(self) -> dict:
  1164. """
  1165. Convert Snapshot model instance to a JSONL record.
  1166. Includes all fields needed to fully reconstruct/identify this snapshot.
  1167. """
  1168. from archivebox.config import VERSION
  1169. return {
  1170. 'type': 'Snapshot',
  1171. 'schema_version': VERSION,
  1172. 'id': str(self.id),
  1173. 'crawl_id': str(self.crawl_id),
  1174. 'url': self.url,
  1175. 'title': self.title,
  1176. 'tags': self.tags_str(),
  1177. 'bookmarked_at': self.bookmarked_at.isoformat() if self.bookmarked_at else None,
  1178. 'created_at': self.created_at.isoformat() if self.created_at else None,
  1179. 'timestamp': self.timestamp,
  1180. 'depth': self.depth,
  1181. 'status': self.status,
  1182. 'fs_version': self.fs_version,
  1183. }
  1184. @staticmethod
  1185. def from_jsonl(record: Dict[str, Any], overrides: Dict[str, Any] = None, queue_for_extraction: bool = True):
  1186. """
  1187. Create/update Snapshot from JSONL record or dict.
  1188. Unified method that handles:
  1189. - ID-based patching: {"id": "...", "title": "new title"}
  1190. - URL-based create/update: {"url": "...", "title": "...", "tags": "..."}
  1191. - Auto-creates Crawl if not provided
  1192. - Optionally queues for extraction
  1193. Args:
  1194. record: Dict with 'url' (for create) or 'id' (for patch), plus other fields
  1195. overrides: Dict with 'crawl', 'snapshot' (parent), 'created_by_id'
  1196. queue_for_extraction: If True, sets status=QUEUED and retry_at (default: True)
  1197. Returns:
  1198. Snapshot instance or None
  1199. """
  1200. import re
  1201. from django.utils import timezone
  1202. from archivebox.misc.util import parse_date
  1203. from archivebox.base_models.models import get_or_create_system_user_pk
  1204. from archivebox.config.common import GENERAL_CONFIG
  1205. overrides = overrides or {}
  1206. # If 'id' is provided, lookup and patch that specific snapshot
  1207. snapshot_id = record.get('id')
  1208. if snapshot_id:
  1209. try:
  1210. snapshot = Snapshot.objects.get(id=snapshot_id)
  1211. # Generically update all fields present in record
  1212. update_fields = []
  1213. for field_name, value in record.items():
  1214. # Skip internal fields
  1215. if field_name in ('id', 'type'):
  1216. continue
  1217. # Skip if field doesn't exist on model
  1218. if not hasattr(snapshot, field_name):
  1219. continue
  1220. # Special parsing for date fields
  1221. if field_name in ('bookmarked_at', 'retry_at', 'created_at', 'modified_at'):
  1222. if value and isinstance(value, str):
  1223. value = parse_date(value)
  1224. # Update field if value is provided and different
  1225. if value is not None and getattr(snapshot, field_name) != value:
  1226. setattr(snapshot, field_name, value)
  1227. update_fields.append(field_name)
  1228. if update_fields:
  1229. snapshot.save(update_fields=update_fields + ['modified_at'])
  1230. return snapshot
  1231. except Snapshot.DoesNotExist:
  1232. # ID not found, fall through to create-by-URL logic
  1233. pass
  1234. url = record.get('url')
  1235. if not url:
  1236. return None
  1237. # Determine or create crawl (every snapshot must have a crawl)
  1238. crawl = overrides.get('crawl')
  1239. parent_snapshot = overrides.get('snapshot') # Parent snapshot
  1240. created_by_id = overrides.get('created_by_id') or (parent_snapshot.created_by.pk if parent_snapshot else get_or_create_system_user_pk())
  1241. # If no crawl provided, inherit from parent or auto-create one
  1242. if not crawl:
  1243. if parent_snapshot:
  1244. # Inherit crawl from parent snapshot
  1245. crawl = parent_snapshot.crawl
  1246. else:
  1247. # Auto-create a single-URL crawl
  1248. from archivebox.crawls.models import Crawl
  1249. from archivebox.config import CONSTANTS
  1250. timestamp_str = timezone.now().strftime("%Y-%m-%d__%H-%M-%S")
  1251. sources_file = CONSTANTS.SOURCES_DIR / f'{timestamp_str}__auto_crawl.txt'
  1252. sources_file.parent.mkdir(parents=True, exist_ok=True)
  1253. sources_file.write_text(url)
  1254. crawl = Crawl.objects.create(
  1255. urls=url,
  1256. max_depth=0,
  1257. label=f'auto-created for {url[:50]}',
  1258. created_by_id=created_by_id,
  1259. )
  1260. # Parse tags
  1261. tags_str = record.get('tags', '')
  1262. tag_list = []
  1263. if tags_str:
  1264. tag_list = list(dict.fromkeys(
  1265. tag.strip() for tag in re.split(GENERAL_CONFIG.TAG_SEPARATOR_PATTERN, tags_str)
  1266. if tag.strip()
  1267. ))
  1268. # Get most recent snapshot with this URL (URLs can exist in multiple crawls)
  1269. snapshot = Snapshot.objects.filter(url=url).order_by('-created_at').first()
  1270. title = record.get('title')
  1271. timestamp = record.get('timestamp')
  1272. if snapshot:
  1273. # Update existing snapshot
  1274. if title and (not snapshot.title or len(title) > len(snapshot.title or '')):
  1275. snapshot.title = title
  1276. snapshot.save(update_fields=['title', 'modified_at'])
  1277. else:
  1278. # Create new snapshot
  1279. if timestamp:
  1280. while Snapshot.objects.filter(timestamp=timestamp).exists():
  1281. timestamp = str(float(timestamp) + 1.0)
  1282. snapshot = Snapshot.objects.create(
  1283. url=url,
  1284. timestamp=timestamp,
  1285. title=title,
  1286. crawl=crawl,
  1287. )
  1288. # Update tags
  1289. if tag_list:
  1290. existing_tags = set(snapshot.tags.values_list('name', flat=True))
  1291. new_tags = set(tag_list) | existing_tags
  1292. snapshot.save_tags(new_tags)
  1293. # Queue for extraction and update additional fields
  1294. update_fields = []
  1295. if queue_for_extraction:
  1296. snapshot.status = Snapshot.StatusChoices.QUEUED
  1297. snapshot.retry_at = timezone.now()
  1298. update_fields.extend(['status', 'retry_at'])
  1299. # Update additional fields if provided
  1300. for field_name in ('depth', 'parent_snapshot_id', 'crawl_id', 'bookmarked_at'):
  1301. value = record.get(field_name)
  1302. if value is not None and getattr(snapshot, field_name) != value:
  1303. setattr(snapshot, field_name, value)
  1304. update_fields.append(field_name)
  1305. if update_fields:
  1306. snapshot.save(update_fields=update_fields + ['modified_at'])
  1307. return snapshot
  1308. def create_pending_archiveresults(self) -> list['ArchiveResult']:
  1309. """
  1310. Create ArchiveResult records for all enabled hooks.
  1311. Uses the hooks system to discover available hooks from:
  1312. - archivebox/plugins/*/on_Snapshot__*.{py,sh,js}
  1313. - data/plugins/*/on_Snapshot__*.{py,sh,js}
  1314. Creates one ArchiveResult per hook (not per plugin), with hook_name set.
  1315. This enables step-based execution where all hooks in a step can run in parallel.
  1316. """
  1317. from archivebox.hooks import discover_hooks
  1318. hooks = discover_hooks('Snapshot')
  1319. archiveresults = []
  1320. for hook_path in hooks:
  1321. hook_name = hook_path.name # e.g., 'on_Snapshot__50_wget.py'
  1322. plugin = hook_path.parent.name # e.g., 'wget'
  1323. # Check if AR already exists for this specific hook
  1324. if ArchiveResult.objects.filter(snapshot=self, hook_name=hook_name).exists():
  1325. continue
  1326. archiveresult, created = ArchiveResult.objects.get_or_create(
  1327. snapshot=self,
  1328. hook_name=hook_name,
  1329. defaults={
  1330. 'plugin': plugin,
  1331. 'status': ArchiveResult.INITIAL_STATE,
  1332. 'retry_at': timezone.now(),
  1333. },
  1334. )
  1335. if archiveresult.status == ArchiveResult.INITIAL_STATE:
  1336. archiveresults.append(archiveresult)
  1337. return archiveresults
  1338. def advance_step_if_ready(self) -> bool:
  1339. """
  1340. Advance current_step if all foreground hooks in current step are finished.
  1341. Called by the state machine to check if step can advance.
  1342. Background hooks (.bg) don't block step advancement.
  1343. Step advancement rules:
  1344. - All foreground ARs in current step must be finished (SUCCEEDED/FAILED/SKIPPED)
  1345. - Background ARs (hook_name contains '.bg.') are ignored for advancement
  1346. - When ready, increments current_step by 1 (up to 9)
  1347. Returns:
  1348. True if step was advanced, False if not ready or already at step 9.
  1349. """
  1350. from archivebox.hooks import extract_step, is_background_hook
  1351. if self.current_step >= 9:
  1352. return False # Already at final step
  1353. # Get all ARs for current step that are foreground
  1354. current_step_ars = self.archiveresult_set.filter(
  1355. hook_name__isnull=False
  1356. ).exclude(hook_name='')
  1357. # Check each AR in current step
  1358. for ar in current_step_ars:
  1359. ar_step = extract_step(ar.hook_name)
  1360. if ar_step != self.current_step:
  1361. continue # Not in current step
  1362. if is_background_hook(ar.hook_name):
  1363. continue # Background hooks don't block
  1364. # Foreground hook in current step - check if finished
  1365. if ar.status not in ArchiveResult.FINAL_OR_ACTIVE_STATES:
  1366. # Still pending/queued - can't advance
  1367. return False
  1368. if ar.status == ArchiveResult.StatusChoices.STARTED:
  1369. # Still running - can't advance
  1370. return False
  1371. # All foreground hooks in current step are finished - advance!
  1372. self.current_step += 1
  1373. self.save(update_fields=['current_step', 'modified_at'])
  1374. return True
  1375. def is_finished_processing(self) -> bool:
  1376. """
  1377. Check if this snapshot has finished processing.
  1378. Used by SnapshotMachine.is_finished() to determine if snapshot is complete.
  1379. Returns:
  1380. True if all archiveresults are finished (or no work to do), False otherwise.
  1381. """
  1382. # if no archiveresults exist yet, it's not finished
  1383. if not self.archiveresult_set.exists():
  1384. return False
  1385. # Try to advance step if ready (handles step-based hook execution)
  1386. # This will increment current_step when all foreground hooks in current step are done
  1387. while self.advance_step_if_ready():
  1388. pass # Keep advancing until we can't anymore
  1389. # if archiveresults exist but are still pending, it's not finished
  1390. if self.pending_archiveresults().exists():
  1391. return False
  1392. # Don't wait for background hooks - they'll be cleaned up on entering sealed state
  1393. # Background hooks in STARTED state are excluded by pending_archiveresults()
  1394. # (STARTED is in FINAL_OR_ACTIVE_STATES) so once all results are FINAL or ACTIVE,
  1395. # we can transition to sealed and cleanup() will kill the background hooks
  1396. # otherwise archiveresults exist and are all finished, so it's finished
  1397. return True
  1398. def retry_failed_archiveresults(self, retry_at: Optional['timezone.datetime'] = None) -> int:
  1399. """
  1400. Reset failed/skipped ArchiveResults to queued for retry.
  1401. This enables seamless retry of the entire extraction pipeline:
  1402. - Resets FAILED and SKIPPED results to QUEUED
  1403. - Sets retry_at so workers pick them up
  1404. - Plugins run in order (numeric prefix)
  1405. - Each plugin checks its dependencies at runtime
  1406. Dependency handling (e.g., chrome_session → screenshot):
  1407. - Plugins check if required outputs exist before running
  1408. - If dependency output missing → plugin returns 'skipped'
  1409. - On retry, if dependency now succeeds → dependent can run
  1410. Returns count of ArchiveResults reset.
  1411. """
  1412. retry_at = retry_at or timezone.now()
  1413. count = self.archiveresult_set.filter(
  1414. status__in=[
  1415. ArchiveResult.StatusChoices.FAILED,
  1416. ArchiveResult.StatusChoices.SKIPPED,
  1417. ]
  1418. ).update(
  1419. status=ArchiveResult.StatusChoices.QUEUED,
  1420. retry_at=retry_at,
  1421. output=None,
  1422. start_ts=None,
  1423. end_ts=None,
  1424. )
  1425. # Also reset the snapshot and current_step so it gets re-checked from the beginning
  1426. if count > 0:
  1427. self.status = self.StatusChoices.STARTED
  1428. self.retry_at = retry_at
  1429. self.current_step = 0 # Reset to step 0 for retry
  1430. self.save(update_fields=['status', 'retry_at', 'current_step', 'modified_at'])
  1431. return count
  1432. # =========================================================================
  1433. # URL Helper Properties (migrated from Link schema)
  1434. # =========================================================================
  1435. @cached_property
  1436. def url_hash(self) -> str:
  1437. from hashlib import sha256
  1438. return sha256(self.url.encode()).hexdigest()[:8]
  1439. @cached_property
  1440. def scheme(self) -> str:
  1441. return self.url.split('://')[0]
  1442. @cached_property
  1443. def path(self) -> str:
  1444. parts = self.url.split('://', 1)
  1445. return '/' + parts[1].split('/', 1)[1] if len(parts) > 1 and '/' in parts[1] else '/'
  1446. @cached_property
  1447. def basename(self) -> str:
  1448. return self.path.split('/')[-1]
  1449. @cached_property
  1450. def extension(self) -> str:
  1451. basename = self.basename
  1452. return basename.split('.')[-1] if '.' in basename else ''
  1453. @cached_property
  1454. def base_url(self) -> str:
  1455. return f'{self.scheme}://{self.domain}'
  1456. @cached_property
  1457. def is_static(self) -> bool:
  1458. static_extensions = {'.pdf', '.jpg', '.jpeg', '.png', '.gif', '.webp', '.svg', '.mp4', '.mp3', '.wav', '.webm'}
  1459. return any(self.url.lower().endswith(ext) for ext in static_extensions)
  1460. @cached_property
  1461. def is_archived(self) -> bool:
  1462. output_paths = (
  1463. self.domain,
  1464. 'output.html',
  1465. 'output.pdf',
  1466. 'screenshot.png',
  1467. 'singlefile.html',
  1468. 'readability/content.html',
  1469. 'mercury/content.html',
  1470. 'htmltotext.txt',
  1471. 'media',
  1472. 'git',
  1473. )
  1474. return any((Path(self.output_dir) / path).exists() for path in output_paths)
  1475. # =========================================================================
  1476. # Date/Time Properties (migrated from Link schema)
  1477. # =========================================================================
  1478. @cached_property
  1479. def bookmarked_date(self) -> Optional[str]:
  1480. max_ts = (timezone.now() + timedelta(days=30)).timestamp()
  1481. if self.timestamp and self.timestamp.replace('.', '').isdigit():
  1482. if 0 < float(self.timestamp) < max_ts:
  1483. return self._ts_to_date_str(datetime.fromtimestamp(float(self.timestamp)))
  1484. return str(self.timestamp)
  1485. return None
  1486. @cached_property
  1487. def downloaded_datestr(self) -> Optional[str]:
  1488. return self._ts_to_date_str(self.downloaded_at) if self.downloaded_at else None
  1489. @cached_property
  1490. def archive_dates(self) -> List[datetime]:
  1491. return [
  1492. result.start_ts
  1493. for result in self.archiveresult_set.all()
  1494. if result.start_ts
  1495. ]
  1496. @cached_property
  1497. def oldest_archive_date(self) -> Optional[datetime]:
  1498. dates = self.archive_dates
  1499. return min(dates) if dates else None
  1500. @cached_property
  1501. def newest_archive_date(self) -> Optional[datetime]:
  1502. dates = self.archive_dates
  1503. return max(dates) if dates else None
  1504. @cached_property
  1505. def num_outputs(self) -> int:
  1506. return self.archiveresult_set.filter(status='succeeded').count()
  1507. @cached_property
  1508. def num_failures(self) -> int:
  1509. return self.archiveresult_set.filter(status='failed').count()
  1510. # =========================================================================
  1511. # Output Path Methods (migrated from Link schema)
  1512. # =========================================================================
  1513. def canonical_outputs(self) -> Dict[str, Optional[str]]:
  1514. """
  1515. Intelligently discover the best output file for each plugin.
  1516. Uses actual ArchiveResult data and filesystem scanning with smart heuristics.
  1517. """
  1518. FAVICON_PROVIDER = 'https://www.google.com/s2/favicons?domain={}'
  1519. # Mimetypes that can be embedded/previewed in an iframe
  1520. IFRAME_EMBEDDABLE_EXTENSIONS = {
  1521. 'html', 'htm', 'pdf', 'txt', 'md', 'json', 'jsonl',
  1522. 'png', 'jpg', 'jpeg', 'gif', 'webp', 'svg', 'ico',
  1523. 'mp4', 'webm', 'mp3', 'opus', 'ogg', 'wav',
  1524. }
  1525. MIN_DISPLAY_SIZE = 15_000 # 15KB - filter out tiny files
  1526. MAX_SCAN_FILES = 50 # Don't scan massive directories
  1527. def find_best_output_in_dir(dir_path: Path, plugin_name: str) -> Optional[str]:
  1528. """Find the best representative file in a plugin's output directory"""
  1529. if not dir_path.exists() or not dir_path.is_dir():
  1530. return None
  1531. candidates = []
  1532. file_count = 0
  1533. # Special handling for media plugin - look for thumbnails
  1534. is_media_dir = plugin_name == 'media'
  1535. # Scan for suitable files
  1536. for file_path in dir_path.rglob('*'):
  1537. file_count += 1
  1538. if file_count > MAX_SCAN_FILES:
  1539. break
  1540. if file_path.is_dir() or file_path.name.startswith('.'):
  1541. continue
  1542. ext = file_path.suffix.lstrip('.').lower()
  1543. if ext not in IFRAME_EMBEDDABLE_EXTENSIONS:
  1544. continue
  1545. try:
  1546. size = file_path.stat().st_size
  1547. except OSError:
  1548. continue
  1549. # For media dir, allow smaller image files (thumbnails are often < 15KB)
  1550. min_size = 5_000 if (is_media_dir and ext in ('png', 'jpg', 'jpeg', 'webp', 'gif')) else MIN_DISPLAY_SIZE
  1551. if size < min_size:
  1552. continue
  1553. # Prefer main files: index.html, output.*, content.*, etc.
  1554. priority = 0
  1555. name_lower = file_path.name.lower()
  1556. if is_media_dir:
  1557. # Special prioritization for media directories
  1558. if any(keyword in name_lower for keyword in ('thumb', 'thumbnail', 'cover', 'poster')):
  1559. priority = 200 # Highest priority for thumbnails
  1560. elif ext in ('png', 'jpg', 'jpeg', 'webp', 'gif'):
  1561. priority = 150 # High priority for any image
  1562. elif ext in ('mp4', 'webm', 'mp3', 'opus', 'ogg'):
  1563. priority = 100 # Lower priority for actual media files
  1564. else:
  1565. priority = 50
  1566. elif 'index' in name_lower:
  1567. priority = 100
  1568. elif name_lower.startswith(('output', 'content', plugin_name)):
  1569. priority = 50
  1570. elif ext in ('html', 'htm', 'pdf'):
  1571. priority = 30
  1572. elif ext in ('png', 'jpg', 'jpeg', 'webp'):
  1573. priority = 20
  1574. else:
  1575. priority = 10
  1576. candidates.append((priority, size, file_path))
  1577. if not candidates:
  1578. return None
  1579. # Sort by priority (desc), then size (desc)
  1580. candidates.sort(key=lambda x: (x[0], x[1]), reverse=True)
  1581. best_file = candidates[0][2]
  1582. return str(best_file.relative_to(Path(self.output_dir)))
  1583. canonical = {
  1584. 'index_path': 'index.html',
  1585. 'google_favicon_path': FAVICON_PROVIDER.format(self.domain),
  1586. 'archive_org_path': f'https://web.archive.org/web/{self.base_url}',
  1587. }
  1588. # Scan each ArchiveResult's output directory for the best file
  1589. snap_dir = Path(self.output_dir)
  1590. for result in self.archiveresult_set.filter(status='succeeded'):
  1591. if not result.output_files and not result.output_str:
  1592. continue
  1593. # Try to find the best output file for this plugin
  1594. plugin_dir = snap_dir / result.plugin
  1595. best_output = None
  1596. # Check output_files first (new field)
  1597. if result.output_files:
  1598. first_file = next(iter(result.output_files.keys()), None)
  1599. if first_file and (plugin_dir / first_file).exists():
  1600. best_output = f'{result.plugin}/{first_file}'
  1601. # Fallback to output_str if it looks like a path
  1602. if not best_output and result.output_str and (snap_dir / result.output_str).exists():
  1603. best_output = result.output_str
  1604. if not best_output and plugin_dir.exists():
  1605. # Intelligently find the best file in the plugin's directory
  1606. best_output = find_best_output_in_dir(plugin_dir, result.plugin)
  1607. if best_output:
  1608. canonical[f'{result.plugin}_path'] = best_output
  1609. # Also scan top-level for legacy outputs (backwards compatibility)
  1610. for file_path in snap_dir.glob('*'):
  1611. if file_path.is_dir() or file_path.name in ('index.html', 'index.json'):
  1612. continue
  1613. ext = file_path.suffix.lstrip('.').lower()
  1614. if ext not in IFRAME_EMBEDDABLE_EXTENSIONS:
  1615. continue
  1616. try:
  1617. size = file_path.stat().st_size
  1618. if size >= MIN_DISPLAY_SIZE:
  1619. # Add as generic output with stem as key
  1620. key = f'{file_path.stem}_path'
  1621. if key not in canonical:
  1622. canonical[key] = file_path.name
  1623. except OSError:
  1624. continue
  1625. if self.is_static:
  1626. static_path = f'warc/{self.timestamp}'
  1627. canonical.update({
  1628. 'title': self.basename,
  1629. 'wget_path': static_path,
  1630. })
  1631. return canonical
  1632. def latest_outputs(self, status: Optional[str] = None) -> Dict[str, Any]:
  1633. """Get the latest output that each plugin produced"""
  1634. from archivebox.hooks import get_plugins
  1635. from django.db.models import Q
  1636. latest: Dict[str, Any] = {}
  1637. for plugin in get_plugins():
  1638. results = self.archiveresult_set.filter(plugin=plugin)
  1639. if status is not None:
  1640. results = results.filter(status=status)
  1641. # Filter for results with output_files or output_str
  1642. results = results.filter(Q(output_files__isnull=False) | ~Q(output_str='')).order_by('-start_ts')
  1643. result = results.first()
  1644. # Return embed_path() for backwards compatibility
  1645. latest[plugin] = result.embed_path() if result else None
  1646. return latest
  1647. # =========================================================================
  1648. # Serialization Methods
  1649. # =========================================================================
  1650. def to_dict(self, extended: bool = False) -> Dict[str, Any]:
  1651. """Convert Snapshot to a dictionary (replacement for Link._asdict())"""
  1652. from archivebox.misc.util import ts_to_date_str
  1653. result = {
  1654. 'TYPE': 'core.models.Snapshot',
  1655. 'id': str(self.id),
  1656. 'url': self.url,
  1657. 'timestamp': self.timestamp,
  1658. 'title': self.title,
  1659. 'tags': self.tags_str(),
  1660. 'downloaded_at': self.downloaded_at.isoformat() if self.downloaded_at else None,
  1661. 'bookmarked_at': self.bookmarked_at.isoformat() if self.bookmarked_at else None,
  1662. 'created_at': self.created_at.isoformat() if self.created_at else None,
  1663. # Computed properties
  1664. 'domain': self.domain,
  1665. 'scheme': self.scheme,
  1666. 'base_url': self.base_url,
  1667. 'path': self.path,
  1668. 'basename': self.basename,
  1669. 'extension': self.extension,
  1670. 'is_static': self.is_static,
  1671. 'is_archived': self.is_archived,
  1672. 'archive_path': self.archive_path,
  1673. 'output_dir': self.output_dir,
  1674. 'link_dir': self.output_dir, # backwards compatibility alias
  1675. 'archive_size': self.archive_size,
  1676. 'bookmarked_date': self.bookmarked_date,
  1677. 'downloaded_datestr': self.downloaded_datestr,
  1678. 'num_outputs': self.num_outputs,
  1679. 'num_failures': self.num_failures,
  1680. }
  1681. if extended:
  1682. result['canonical'] = self.canonical_outputs()
  1683. return result
  1684. def to_json(self, indent: int = 4) -> str:
  1685. """Convert to JSON string"""
  1686. return to_json(self.to_dict(extended=True), indent=indent)
  1687. def to_csv(self, cols: Optional[List[str]] = None, separator: str = ',', ljust: int = 0) -> str:
  1688. """Convert to CSV string"""
  1689. data = self.to_dict()
  1690. cols = cols or ['timestamp', 'is_archived', 'url']
  1691. return separator.join(to_json(data.get(col, ''), indent=None).ljust(ljust) for col in cols)
  1692. def write_json_details(self, out_dir: Optional[str] = None) -> None:
  1693. """Write JSON index file for this snapshot to its output directory"""
  1694. out_dir = out_dir or self.output_dir
  1695. path = Path(out_dir) / CONSTANTS.JSON_INDEX_FILENAME
  1696. atomic_write(str(path), self.to_dict(extended=True))
  1697. def write_html_details(self, out_dir: Optional[str] = None) -> None:
  1698. """Write HTML detail page for this snapshot to its output directory"""
  1699. from django.template.loader import render_to_string
  1700. from archivebox.config.common import SERVER_CONFIG
  1701. from archivebox.config.configset import get_config
  1702. from archivebox.misc.logging_util import printable_filesize
  1703. out_dir = out_dir or self.output_dir
  1704. config = get_config()
  1705. SAVE_ARCHIVE_DOT_ORG = config.get('SAVE_ARCHIVE_DOT_ORG', True)
  1706. TITLE_LOADING_MSG = 'Not yet archived...'
  1707. canonical = self.canonical_outputs()
  1708. context = {
  1709. **self.to_dict(extended=True),
  1710. **{f'{k}_path': v for k, v in canonical.items()},
  1711. 'canonical': {f'{k}_path': v for k, v in canonical.items()},
  1712. 'title': htmlencode(self.title or (self.base_url if self.is_archived else TITLE_LOADING_MSG)),
  1713. 'url_str': htmlencode(urldecode(self.base_url)),
  1714. 'archive_url': urlencode(f'warc/{self.timestamp}' or (self.domain if self.is_archived else '')) or 'about:blank',
  1715. 'extension': self.extension or 'html',
  1716. 'tags': self.tags_str() or 'untagged',
  1717. 'size': printable_filesize(self.archive_size) if self.archive_size else 'pending',
  1718. 'status': 'archived' if self.is_archived else 'not yet archived',
  1719. 'status_color': 'success' if self.is_archived else 'danger',
  1720. 'oldest_archive_date': ts_to_date_str(self.oldest_archive_date),
  1721. 'SAVE_ARCHIVE_DOT_ORG': SAVE_ARCHIVE_DOT_ORG,
  1722. 'PREVIEW_ORIGINALS': SERVER_CONFIG.PREVIEW_ORIGINALS,
  1723. }
  1724. rendered_html = render_to_string('snapshot.html', context)
  1725. atomic_write(str(Path(out_dir) / CONSTANTS.HTML_INDEX_FILENAME), rendered_html)
  1726. # =========================================================================
  1727. # Helper Methods
  1728. # =========================================================================
  1729. @staticmethod
  1730. def _ts_to_date_str(dt: Optional[datetime]) -> Optional[str]:
  1731. return dt.strftime('%Y-%m-%d %H:%M:%S') if dt else None
  1732. # =============================================================================
  1733. # Snapshot State Machine
  1734. # =============================================================================
  1735. class SnapshotMachine(BaseStateMachine, strict_states=True):
  1736. """
  1737. State machine for managing Snapshot lifecycle.
  1738. Hook Lifecycle:
  1739. ┌─────────────────────────────────────────────────────────────┐
  1740. │ QUEUED State │
  1741. │ • Waiting for snapshot to be ready │
  1742. └─────────────────────────────────────────────────────────────┘
  1743. ↓ tick() when can_start()
  1744. ┌─────────────────────────────────────────────────────────────┐
  1745. │ STARTED State → enter_started() │
  1746. │ 1. snapshot.run() │
  1747. │ • discover_hooks('Snapshot') → finds all plugin hooks │
  1748. │ • create_pending_archiveresults() → creates ONE │
  1749. │ ArchiveResult per hook (NO execution yet) │
  1750. │ 2. ArchiveResults process independently with their own │
  1751. │ state machines (see ArchiveResultMachine) │
  1752. │ 3. Advance through steps 0-9 as foreground hooks complete │
  1753. └─────────────────────────────────────────────────────────────┘
  1754. ↓ tick() when is_finished()
  1755. ┌─────────────────────────────────────────────────────────────┐
  1756. │ SEALED State → enter_sealed() │
  1757. │ • cleanup() → kills any background hooks still running │
  1758. │ • Set retry_at=None (no more processing) │
  1759. └─────────────────────────────────────────────────────────────┘
  1760. https://github.com/ArchiveBox/ArchiveBox/wiki/ArchiveBox-Architecture-Diagrams
  1761. """
  1762. model_attr_name = 'snapshot'
  1763. # States
  1764. queued = State(value=Snapshot.StatusChoices.QUEUED, initial=True)
  1765. started = State(value=Snapshot.StatusChoices.STARTED)
  1766. sealed = State(value=Snapshot.StatusChoices.SEALED, final=True)
  1767. # Tick Event
  1768. tick = (
  1769. queued.to.itself(unless='can_start') |
  1770. queued.to(started, cond='can_start') |
  1771. started.to.itself(unless='is_finished') |
  1772. started.to(sealed, cond='is_finished')
  1773. )
  1774. def can_start(self) -> bool:
  1775. can_start = bool(self.snapshot.url)
  1776. return can_start
  1777. def is_finished(self) -> bool:
  1778. """Check if snapshot processing is complete - delegates to model method."""
  1779. return self.snapshot.is_finished_processing()
  1780. @queued.enter
  1781. def enter_queued(self):
  1782. self.snapshot.update_and_requeue(
  1783. retry_at=timezone.now(),
  1784. status=Snapshot.StatusChoices.QUEUED,
  1785. )
  1786. @started.enter
  1787. def enter_started(self):
  1788. # lock the snapshot while we create the pending archiveresults
  1789. self.snapshot.update_and_requeue(
  1790. retry_at=timezone.now() + timedelta(seconds=30), # if failed, wait 30s before retrying
  1791. )
  1792. # Run the snapshot - creates pending archiveresults for all enabled plugins
  1793. self.snapshot.run()
  1794. # unlock the snapshot after we're done + set status = started
  1795. self.snapshot.update_and_requeue(
  1796. retry_at=timezone.now() + timedelta(seconds=5), # check again in 5s
  1797. status=Snapshot.StatusChoices.STARTED,
  1798. )
  1799. @sealed.enter
  1800. def enter_sealed(self):
  1801. # Clean up background hooks
  1802. self.snapshot.cleanup()
  1803. self.snapshot.update_and_requeue(
  1804. retry_at=None,
  1805. status=Snapshot.StatusChoices.SEALED,
  1806. )
  1807. class ArchiveResult(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHealthStats, ModelWithStateMachine):
  1808. class StatusChoices(models.TextChoices):
  1809. QUEUED = 'queued', 'Queued'
  1810. STARTED = 'started', 'Started'
  1811. BACKOFF = 'backoff', 'Waiting to retry'
  1812. SUCCEEDED = 'succeeded', 'Succeeded'
  1813. FAILED = 'failed', 'Failed'
  1814. SKIPPED = 'skipped', 'Skipped'
  1815. @classmethod
  1816. def get_plugin_choices(cls):
  1817. """Get plugin choices from discovered hooks (for forms/admin)."""
  1818. plugins = [get_plugin_name(e) for e in get_plugins()]
  1819. return tuple((e, e) for e in plugins)
  1820. # Keep AutoField for backward compatibility with 0.7.x databases
  1821. # UUID field is added separately by migration for new records
  1822. id = models.AutoField(primary_key=True, editable=False)
  1823. # Note: unique constraint is added by migration 0027 - don't set unique=True here
  1824. # or SQLite table recreation in earlier migrations will fail
  1825. uuid = models.UUIDField(default=uuid7, null=True, blank=True, db_index=True)
  1826. created_at = models.DateTimeField(default=timezone.now, db_index=True)
  1827. modified_at = models.DateTimeField(auto_now=True)
  1828. snapshot: Snapshot = models.ForeignKey(Snapshot, on_delete=models.CASCADE) # type: ignore
  1829. # No choices= constraint - plugin names come from plugin system and can be any string
  1830. plugin = models.CharField(max_length=32, blank=False, null=False, db_index=True, default='')
  1831. hook_name = models.CharField(max_length=255, blank=True, default='', db_index=True, help_text='Full filename of the hook that executed (e.g., on_Snapshot__50_wget.py)')
  1832. # Process FK - tracks execution details (cmd, pwd, stdout, stderr, etc.)
  1833. # Required - every ArchiveResult must have a Process
  1834. process = models.OneToOneField(
  1835. 'machine.Process',
  1836. on_delete=models.PROTECT,
  1837. null=False, # Required after migration 4
  1838. related_name='archiveresult',
  1839. help_text='Process execution details for this archive result'
  1840. )
  1841. # New output fields (replacing old 'output' field)
  1842. output_str = models.TextField(blank=True, default='', help_text='Human-readable output summary')
  1843. output_json = models.JSONField(null=True, blank=True, default=None, help_text='Structured metadata (headers, redirects, etc.)')
  1844. output_files = models.JSONField(default=dict, help_text='Dict of {relative_path: {metadata}}')
  1845. output_size = models.BigIntegerField(default=0, help_text='Total bytes of all output files')
  1846. output_mimetypes = models.CharField(max_length=512, blank=True, default='', help_text='CSV of mimetypes sorted by size')
  1847. start_ts = models.DateTimeField(default=None, null=True, blank=True)
  1848. end_ts = models.DateTimeField(default=None, null=True, blank=True)
  1849. status = ModelWithStateMachine.StatusField(choices=StatusChoices.choices, default=StatusChoices.QUEUED)
  1850. retry_at = ModelWithStateMachine.RetryAtField(default=timezone.now)
  1851. notes = models.TextField(blank=True, null=False, default='')
  1852. # output_dir is computed via @property from snapshot.output_dir / plugin
  1853. state_machine_name = 'archivebox.core.models.ArchiveResultMachine'
  1854. retry_at_field_name = 'retry_at'
  1855. state_field_name = 'status'
  1856. active_state = StatusChoices.STARTED
  1857. class Meta(TypedModelMeta):
  1858. app_label = 'core'
  1859. verbose_name = 'Archive Result'
  1860. verbose_name_plural = 'Archive Results Log'
  1861. def __str__(self):
  1862. return f'[{self.id}] {self.snapshot.url[:64]} -> {self.plugin}'
  1863. @property
  1864. def created_by(self):
  1865. """Convenience property to access the user who created this archive result via its snapshot's crawl."""
  1866. return self.snapshot.crawl.created_by
  1867. def to_jsonl(self) -> dict:
  1868. """
  1869. Convert ArchiveResult model instance to a JSONL record.
  1870. """
  1871. from archivebox.config import VERSION
  1872. record = {
  1873. 'type': 'ArchiveResult',
  1874. 'schema_version': VERSION,
  1875. 'id': str(self.id),
  1876. 'snapshot_id': str(self.snapshot_id),
  1877. 'plugin': self.plugin,
  1878. 'hook_name': self.hook_name,
  1879. 'status': self.status,
  1880. 'output_str': self.output_str,
  1881. 'start_ts': self.start_ts.isoformat() if self.start_ts else None,
  1882. 'end_ts': self.end_ts.isoformat() if self.end_ts else None,
  1883. }
  1884. # Include optional fields if set
  1885. if self.output_json:
  1886. record['output_json'] = self.output_json
  1887. if self.output_files:
  1888. record['output_files'] = self.output_files
  1889. if self.output_size:
  1890. record['output_size'] = self.output_size
  1891. if self.output_mimetypes:
  1892. record['output_mimetypes'] = self.output_mimetypes
  1893. if self.cmd:
  1894. record['cmd'] = self.cmd
  1895. if self.cmd_version:
  1896. record['cmd_version'] = self.cmd_version
  1897. if self.process_id:
  1898. record['process_id'] = str(self.process_id)
  1899. return record
  1900. def save(self, *args, **kwargs):
  1901. is_new = self._state.adding
  1902. # Create Process record if this is a new ArchiveResult and no process exists yet
  1903. if is_new and not self.process_id:
  1904. from archivebox.machine.models import Process, Machine
  1905. process = Process.objects.create(
  1906. machine=Machine.current(),
  1907. pwd=str(Path(self.snapshot.output_dir) / self.plugin),
  1908. cmd=[], # Will be set by run()
  1909. status='queued',
  1910. timeout=120,
  1911. env={},
  1912. )
  1913. self.process = process
  1914. # Skip ModelWithOutputDir.save() to avoid creating index.json in plugin directories
  1915. # Call the Django Model.save() directly instead
  1916. models.Model.save(self, *args, **kwargs)
  1917. if is_new:
  1918. from archivebox.misc.logging_util import log_worker_event
  1919. log_worker_event(
  1920. worker_type='DB',
  1921. event='Created ArchiveResult',
  1922. indent_level=3,
  1923. plugin=self.plugin,
  1924. metadata={
  1925. 'id': str(self.id),
  1926. 'snapshot_id': str(self.snapshot_id),
  1927. 'snapshot_url': str(self.snapshot.url)[:64],
  1928. 'status': self.status,
  1929. },
  1930. )
  1931. @cached_property
  1932. def snapshot_dir(self):
  1933. return Path(self.snapshot.output_dir)
  1934. @cached_property
  1935. def url(self):
  1936. return self.snapshot.url
  1937. @property
  1938. def api_url(self) -> str:
  1939. return reverse_lazy('api-1:get_archiveresult', args=[self.id])
  1940. def get_absolute_url(self):
  1941. return f'/{self.snapshot.archive_path}/{self.plugin}'
  1942. @property
  1943. def plugin_module(self) -> Any | None:
  1944. # Hook scripts are now used instead of Python plugin modules
  1945. # The plugin name maps to hooks in archivebox/plugins/{plugin}/
  1946. return None
  1947. def output_exists(self) -> bool:
  1948. return os.path.exists(Path(self.snapshot_dir) / self.plugin)
  1949. def embed_path(self) -> Optional[str]:
  1950. """
  1951. Get the relative path to the embeddable output file for this result.
  1952. Returns the first file from output_files if set, otherwise tries to
  1953. find a reasonable default based on the plugin type.
  1954. """
  1955. # Check output_files dict for primary output
  1956. if self.output_files:
  1957. # Return first file from output_files (dict preserves insertion order)
  1958. first_file = next(iter(self.output_files.keys()), None)
  1959. if first_file:
  1960. return f'{self.plugin}/{first_file}'
  1961. # Fallback: check output_str if it looks like a file path
  1962. if self.output_str and ('/' in self.output_str or '.' in self.output_str):
  1963. return self.output_str
  1964. # Try to find output file based on plugin's canonical output path
  1965. canonical = self.snapshot.canonical_outputs()
  1966. plugin_key = f'{self.plugin}_path'
  1967. if plugin_key in canonical:
  1968. return canonical[plugin_key]
  1969. # Fallback to plugin directory
  1970. return f'{self.plugin}/'
  1971. def create_output_dir(self):
  1972. output_dir = Path(self.snapshot_dir) / self.plugin
  1973. output_dir.mkdir(parents=True, exist_ok=True)
  1974. return output_dir
  1975. @property
  1976. def output_dir_name(self) -> str:
  1977. return self.plugin
  1978. @property
  1979. def output_dir_parent(self) -> str:
  1980. return str(self.snapshot.OUTPUT_DIR.relative_to(CONSTANTS.DATA_DIR))
  1981. # Properties that delegate to Process model (for backwards compatibility)
  1982. # These properties will replace the direct fields after migration is complete
  1983. # They allow existing code to continue using archiveresult.pwd, .cmd, etc.
  1984. # Note: After migration 3 creates Process records and migration 5 removes the old fields,
  1985. # these properties provide seamless access to Process data through ArchiveResult
  1986. # Uncommented after migration 3 completed - properties now active
  1987. @property
  1988. def pwd(self) -> str:
  1989. """Working directory (from Process)."""
  1990. return self.process.pwd if self.process_id else ''
  1991. @property
  1992. def cmd(self) -> list:
  1993. """Command array (from Process)."""
  1994. return self.process.cmd if self.process_id else []
  1995. @property
  1996. def cmd_version(self) -> str:
  1997. """Command version (from Process.binary)."""
  1998. return self.process.cmd_version if self.process_id else ''
  1999. @property
  2000. def binary(self):
  2001. """Binary FK (from Process)."""
  2002. return self.process.binary if self.process_id else None
  2003. @property
  2004. def iface(self):
  2005. """Network interface FK (from Process)."""
  2006. return self.process.iface if self.process_id else None
  2007. @property
  2008. def machine(self):
  2009. """Machine FK (from Process)."""
  2010. return self.process.machine if self.process_id else None
  2011. @property
  2012. def timeout(self) -> int:
  2013. """Timeout in seconds (from Process)."""
  2014. return self.process.timeout if self.process_id else 120
  2015. def save_search_index(self):
  2016. pass
  2017. def cascade_health_update(self, success: bool):
  2018. """Update health stats for self, parent Snapshot, and grandparent Crawl."""
  2019. self.increment_health_stats(success)
  2020. self.snapshot.increment_health_stats(success)
  2021. self.snapshot.crawl.increment_health_stats(success)
  2022. def run(self):
  2023. """
  2024. Execute this ArchiveResult's hook and update status.
  2025. If self.hook_name is set, runs only that specific hook.
  2026. If self.hook_name is empty, discovers and runs all hooks for self.plugin (backwards compat).
  2027. Updates status/output fields, queues discovered URLs, and triggers indexing.
  2028. """
  2029. from django.utils import timezone
  2030. from archivebox.hooks import BUILTIN_PLUGINS_DIR, USER_PLUGINS_DIR, run_hook, is_background_hook
  2031. from archivebox.config.configset import get_config
  2032. # Get merged config with proper context
  2033. config = get_config(
  2034. crawl=self.snapshot.crawl,
  2035. snapshot=self.snapshot,
  2036. )
  2037. # Determine which hook(s) to run
  2038. hooks = []
  2039. if self.hook_name:
  2040. # SPECIFIC HOOK MODE: Find the specific hook by name
  2041. for base_dir in (BUILTIN_PLUGINS_DIR, USER_PLUGINS_DIR):
  2042. if not base_dir.exists():
  2043. continue
  2044. plugin_dir = base_dir / self.plugin
  2045. if plugin_dir.exists():
  2046. hook_path = plugin_dir / self.hook_name
  2047. if hook_path.exists():
  2048. hooks.append(hook_path)
  2049. break
  2050. else:
  2051. # LEGACY MODE: Discover all hooks for this plugin (backwards compatibility)
  2052. for base_dir in (BUILTIN_PLUGINS_DIR, USER_PLUGINS_DIR):
  2053. if not base_dir.exists():
  2054. continue
  2055. plugin_dir = base_dir / self.plugin
  2056. if plugin_dir.exists():
  2057. matches = list(plugin_dir.glob('on_Snapshot__*.*'))
  2058. if matches:
  2059. hooks.extend(sorted(matches))
  2060. if not hooks:
  2061. self.status = self.StatusChoices.FAILED
  2062. if self.hook_name:
  2063. self.output_str = f'Hook not found: {self.plugin}/{self.hook_name}'
  2064. else:
  2065. self.output_str = f'No hooks found for plugin: {self.plugin}'
  2066. self.retry_at = None
  2067. self.save()
  2068. return
  2069. # Output directory is plugin_dir for the hook output
  2070. plugin_dir = Path(self.snapshot.output_dir) / self.plugin
  2071. start_ts = timezone.now()
  2072. is_bg_hook = False
  2073. for hook in hooks:
  2074. # Check if this is a background hook
  2075. is_bg_hook = is_background_hook(hook.name)
  2076. result = run_hook(
  2077. hook,
  2078. output_dir=plugin_dir,
  2079. config=config,
  2080. url=self.snapshot.url,
  2081. snapshot_id=str(self.snapshot.id),
  2082. crawl_id=str(self.snapshot.crawl.id),
  2083. depth=self.snapshot.depth,
  2084. )
  2085. # Background hooks return None
  2086. if result is None:
  2087. is_bg_hook = True
  2088. # Update status based on hook execution
  2089. if is_bg_hook:
  2090. # BACKGROUND HOOK - still running, return immediately
  2091. # Status stays STARTED, will be finalized by Snapshot.cleanup()
  2092. self.status = self.StatusChoices.STARTED
  2093. self.start_ts = start_ts
  2094. if self.process_id:
  2095. self.process.pwd = str(plugin_dir)
  2096. self.process.save()
  2097. self.save()
  2098. return
  2099. # FOREGROUND HOOK - completed, update from filesystem
  2100. self.start_ts = start_ts
  2101. if self.process_id:
  2102. self.process.pwd = str(plugin_dir)
  2103. self.process.save()
  2104. self.update_from_output()
  2105. # Clean up empty output directory if no files were created
  2106. if plugin_dir.exists() and not self.output_files:
  2107. try:
  2108. if not any(plugin_dir.iterdir()):
  2109. plugin_dir.rmdir()
  2110. except (OSError, RuntimeError):
  2111. pass
  2112. def update_from_output(self):
  2113. """
  2114. Update this ArchiveResult from filesystem logs and output files.
  2115. Used for:
  2116. - Foreground hooks that completed (called from ArchiveResult.run())
  2117. - Background hooks that completed (called from Snapshot.cleanup())
  2118. Updates:
  2119. - status, output_str, output_json from ArchiveResult JSONL record
  2120. - output_files, output_size, output_mimetypes by walking filesystem
  2121. - end_ts, retry_at, cmd, cmd_version, binary FK
  2122. - Processes side-effect records (Snapshot, Tag, etc.) via process_hook_records()
  2123. """
  2124. import json
  2125. import mimetypes
  2126. from collections import defaultdict
  2127. from pathlib import Path
  2128. from django.utils import timezone
  2129. from archivebox.hooks import process_hook_records
  2130. plugin_dir = Path(self.pwd) if self.pwd else None
  2131. if not plugin_dir or not plugin_dir.exists():
  2132. self.status = self.StatusChoices.FAILED
  2133. self.output_str = 'Output directory not found'
  2134. self.end_ts = timezone.now()
  2135. self.retry_at = None
  2136. self.save()
  2137. return
  2138. # Read and parse JSONL output from stdout.log
  2139. stdout_file = plugin_dir / 'stdout.log'
  2140. stdout = stdout_file.read_text() if stdout_file.exists() else ''
  2141. records = []
  2142. for line in stdout.splitlines():
  2143. if line.strip() and line.strip().startswith('{'):
  2144. try:
  2145. records.append(json.loads(line))
  2146. except json.JSONDecodeError:
  2147. continue
  2148. # Find ArchiveResult record and update status/output from it
  2149. ar_records = [r for r in records if r.get('type') == 'ArchiveResult']
  2150. if ar_records:
  2151. hook_data = ar_records[0]
  2152. # Update status
  2153. status_map = {
  2154. 'succeeded': self.StatusChoices.SUCCEEDED,
  2155. 'failed': self.StatusChoices.FAILED,
  2156. 'skipped': self.StatusChoices.SKIPPED,
  2157. }
  2158. self.status = status_map.get(hook_data.get('status', 'failed'), self.StatusChoices.FAILED)
  2159. # Update output fields
  2160. self.output_str = hook_data.get('output_str') or hook_data.get('output') or ''
  2161. self.output_json = hook_data.get('output_json')
  2162. # Update cmd fields
  2163. if hook_data.get('cmd'):
  2164. if self.process_id:
  2165. self.process.cmd = hook_data['cmd']
  2166. self.process.save()
  2167. self._set_binary_from_cmd(hook_data['cmd'])
  2168. # Note: cmd_version is derived from binary.version, not stored on Process
  2169. else:
  2170. # No ArchiveResult record = failed
  2171. self.status = self.StatusChoices.FAILED
  2172. self.output_str = 'Hook did not output ArchiveResult record'
  2173. # Walk filesystem and populate output_files, output_size, output_mimetypes
  2174. exclude_names = {'stdout.log', 'stderr.log', 'hook.pid', 'listener.pid'}
  2175. mime_sizes = defaultdict(int)
  2176. total_size = 0
  2177. output_files = {}
  2178. for file_path in plugin_dir.rglob('*'):
  2179. if not file_path.is_file():
  2180. continue
  2181. if file_path.name in exclude_names:
  2182. continue
  2183. try:
  2184. stat = file_path.stat()
  2185. mime_type, _ = mimetypes.guess_type(str(file_path))
  2186. mime_type = mime_type or 'application/octet-stream'
  2187. relative_path = str(file_path.relative_to(plugin_dir))
  2188. output_files[relative_path] = {}
  2189. mime_sizes[mime_type] += stat.st_size
  2190. total_size += stat.st_size
  2191. except (OSError, IOError):
  2192. continue
  2193. self.output_files = output_files
  2194. self.output_size = total_size
  2195. sorted_mimes = sorted(mime_sizes.items(), key=lambda x: x[1], reverse=True)
  2196. self.output_mimetypes = ','.join(mime for mime, _ in sorted_mimes)
  2197. # Update timestamps
  2198. self.end_ts = timezone.now()
  2199. self.retry_at = None
  2200. self.save()
  2201. # Process side-effect records (filter Snapshots for depth/URL)
  2202. filtered_records = []
  2203. for record in records:
  2204. record_type = record.get('type')
  2205. # Skip ArchiveResult records (already processed above)
  2206. if record_type == 'ArchiveResult':
  2207. continue
  2208. # Filter Snapshot records for depth/URL constraints
  2209. if record_type == 'Snapshot':
  2210. url = record.get('url')
  2211. if not url:
  2212. continue
  2213. depth = record.get('depth', self.snapshot.depth + 1)
  2214. if depth > self.snapshot.crawl.max_depth:
  2215. continue
  2216. if not self._url_passes_filters(url):
  2217. continue
  2218. filtered_records.append(record)
  2219. # Process filtered records with unified dispatcher
  2220. overrides = {
  2221. 'snapshot': self.snapshot,
  2222. 'crawl': self.snapshot.crawl,
  2223. 'created_by_id': self.created_by.pk,
  2224. }
  2225. process_hook_records(filtered_records, overrides=overrides)
  2226. # Cleanup PID files and empty logs
  2227. pid_file = plugin_dir / 'hook.pid'
  2228. pid_file.unlink(missing_ok=True)
  2229. stderr_file = plugin_dir / 'stderr.log'
  2230. if stdout_file.exists() and stdout_file.stat().st_size == 0:
  2231. stdout_file.unlink()
  2232. if stderr_file.exists() and stderr_file.stat().st_size == 0:
  2233. stderr_file.unlink()
  2234. def _set_binary_from_cmd(self, cmd: list) -> None:
  2235. """
  2236. Find Binary for command and set binary FK.
  2237. Tries matching by absolute path first, then by binary name.
  2238. Only matches binaries on the current machine.
  2239. """
  2240. if not cmd:
  2241. return
  2242. from archivebox.machine.models import Machine
  2243. bin_path_or_name = cmd[0] if isinstance(cmd, list) else cmd
  2244. machine = Machine.current()
  2245. # Try matching by absolute path first
  2246. binary = Binary.objects.filter(
  2247. abspath=bin_path_or_name,
  2248. machine=machine
  2249. ).first()
  2250. if binary:
  2251. if self.process_id:
  2252. self.process.binary = binary
  2253. self.process.save()
  2254. return
  2255. # Fallback: match by binary name
  2256. bin_name = Path(bin_path_or_name).name
  2257. binary = Binary.objects.filter(
  2258. name=bin_name,
  2259. machine=machine
  2260. ).first()
  2261. if binary:
  2262. if self.process_id:
  2263. self.process.binary = binary
  2264. self.process.save()
  2265. def _url_passes_filters(self, url: str) -> bool:
  2266. """Check if URL passes URL_ALLOWLIST and URL_DENYLIST config filters.
  2267. Uses proper config hierarchy: defaults -> file -> env -> machine -> user -> crawl -> snapshot
  2268. """
  2269. import re
  2270. from archivebox.config.configset import get_config
  2271. # Get merged config with proper hierarchy
  2272. config = get_config(
  2273. user=self.created_by,
  2274. crawl=self.snapshot.crawl,
  2275. snapshot=self.snapshot,
  2276. )
  2277. # Get allowlist/denylist (can be string or list)
  2278. allowlist_raw = config.get('URL_ALLOWLIST', '')
  2279. denylist_raw = config.get('URL_DENYLIST', '')
  2280. # Normalize to list of patterns
  2281. def to_pattern_list(value):
  2282. if isinstance(value, list):
  2283. return value
  2284. if isinstance(value, str):
  2285. return [p.strip() for p in value.split(',') if p.strip()]
  2286. return []
  2287. allowlist = to_pattern_list(allowlist_raw)
  2288. denylist = to_pattern_list(denylist_raw)
  2289. # Denylist takes precedence
  2290. if denylist:
  2291. for pattern in denylist:
  2292. try:
  2293. if re.search(pattern, url):
  2294. return False
  2295. except re.error:
  2296. continue # Skip invalid regex patterns
  2297. # If allowlist exists, URL must match at least one pattern
  2298. if allowlist:
  2299. for pattern in allowlist:
  2300. try:
  2301. if re.search(pattern, url):
  2302. return True
  2303. except re.error:
  2304. continue # Skip invalid regex patterns
  2305. return False # No allowlist patterns matched
  2306. return True # No filters or passed filters
  2307. @property
  2308. def output_dir(self) -> Path:
  2309. """Get the output directory for this plugin's results."""
  2310. return Path(self.snapshot.output_dir) / self.plugin
  2311. def is_background_hook(self) -> bool:
  2312. """Check if this ArchiveResult is for a background hook."""
  2313. plugin_dir = Path(self.pwd) if self.pwd else None
  2314. if not plugin_dir:
  2315. return False
  2316. pid_file = plugin_dir / 'hook.pid'
  2317. return pid_file.exists()
  2318. # =============================================================================
  2319. # ArchiveResult State Machine
  2320. # =============================================================================
  2321. class ArchiveResultMachine(BaseStateMachine, strict_states=True):
  2322. """
  2323. State machine for managing ArchiveResult (single plugin execution) lifecycle.
  2324. Hook Lifecycle:
  2325. ┌─────────────────────────────────────────────────────────────┐
  2326. │ QUEUED State │
  2327. │ • Waiting for its turn to run │
  2328. └─────────────────────────────────────────────────────────────┘
  2329. ↓ tick() when can_start()
  2330. ┌─────────────────────────────────────────────────────────────┐
  2331. │ STARTED State → enter_started() │
  2332. │ 1. archiveresult.run() │
  2333. │ • Find specific hook by hook_name │
  2334. │ • run_hook(script, output_dir, ...) → subprocess │
  2335. │ │
  2336. │ 2a. FOREGROUND hook (returns HookResult): │
  2337. │ • update_from_output() immediately │
  2338. │ - Read stdout.log │
  2339. │ - Parse JSONL records │
  2340. │ - Extract 'ArchiveResult' record → update status │
  2341. │ - Walk output_dir → populate output_files │
  2342. │ - Call process_hook_records() for side effects │
  2343. │ │
  2344. │ 2b. BACKGROUND hook (returns None): │
  2345. │ • Status stays STARTED │
  2346. │ • Continues running in background │
  2347. │ • Killed by Snapshot.cleanup() when sealed │
  2348. └─────────────────────────────────────────────────────────────┘
  2349. ↓ tick() checks status
  2350. ┌─────────────────────────────────────────────────────────────┐
  2351. │ SUCCEEDED / FAILED / SKIPPED / BACKOFF │
  2352. │ • Set by hook's JSONL output during update_from_output() │
  2353. │ • Health stats incremented (num_uses_succeeded/failed) │
  2354. │ • Parent Snapshot health stats also updated │
  2355. └─────────────────────────────────────────────────────────────┘
  2356. https://github.com/ArchiveBox/ArchiveBox/wiki/ArchiveBox-Architecture-Diagrams
  2357. """
  2358. model_attr_name = 'archiveresult'
  2359. # States
  2360. queued = State(value=ArchiveResult.StatusChoices.QUEUED, initial=True)
  2361. started = State(value=ArchiveResult.StatusChoices.STARTED)
  2362. backoff = State(value=ArchiveResult.StatusChoices.BACKOFF)
  2363. succeeded = State(value=ArchiveResult.StatusChoices.SUCCEEDED, final=True)
  2364. failed = State(value=ArchiveResult.StatusChoices.FAILED, final=True)
  2365. skipped = State(value=ArchiveResult.StatusChoices.SKIPPED, final=True)
  2366. # Tick Event - transitions based on conditions
  2367. tick = (
  2368. queued.to.itself(unless='can_start') |
  2369. queued.to(started, cond='can_start') |
  2370. started.to.itself(unless='is_finished') |
  2371. started.to(succeeded, cond='is_succeeded') |
  2372. started.to(failed, cond='is_failed') |
  2373. started.to(skipped, cond='is_skipped') |
  2374. started.to(backoff, cond='is_backoff') |
  2375. backoff.to.itself(unless='can_start') |
  2376. backoff.to(started, cond='can_start') |
  2377. backoff.to(succeeded, cond='is_succeeded') |
  2378. backoff.to(failed, cond='is_failed') |
  2379. backoff.to(skipped, cond='is_skipped')
  2380. )
  2381. def can_start(self) -> bool:
  2382. if not self.archiveresult.snapshot.url:
  2383. return False
  2384. # Check if snapshot has exceeded MAX_URL_ATTEMPTS failed results
  2385. from archivebox.config.configset import get_config
  2386. config = get_config(
  2387. crawl=self.archiveresult.snapshot.crawl,
  2388. snapshot=self.archiveresult.snapshot,
  2389. )
  2390. max_attempts = config.get('MAX_URL_ATTEMPTS', 50)
  2391. # Count failed ArchiveResults for this snapshot (any plugin type)
  2392. failed_count = self.archiveresult.snapshot.archiveresult_set.filter(
  2393. status=ArchiveResult.StatusChoices.FAILED
  2394. ).count()
  2395. if failed_count >= max_attempts:
  2396. # Mark this result as skipped since we've hit the limit
  2397. self.archiveresult.status = ArchiveResult.StatusChoices.SKIPPED
  2398. self.archiveresult.output_str = f'Skipped: snapshot exceeded MAX_URL_ATTEMPTS ({max_attempts} failures)'
  2399. self.archiveresult.retry_at = None
  2400. self.archiveresult.save()
  2401. return False
  2402. return True
  2403. def is_succeeded(self) -> bool:
  2404. """Check if extractor plugin succeeded (status was set by run())."""
  2405. return self.archiveresult.status == ArchiveResult.StatusChoices.SUCCEEDED
  2406. def is_failed(self) -> bool:
  2407. """Check if extractor plugin failed (status was set by run())."""
  2408. return self.archiveresult.status == ArchiveResult.StatusChoices.FAILED
  2409. def is_skipped(self) -> bool:
  2410. """Check if extractor plugin was skipped (status was set by run())."""
  2411. return self.archiveresult.status == ArchiveResult.StatusChoices.SKIPPED
  2412. def is_backoff(self) -> bool:
  2413. """Check if we should backoff and retry later."""
  2414. # Backoff if status is still started (plugin didn't complete) and output_str is empty
  2415. return (
  2416. self.archiveresult.status == ArchiveResult.StatusChoices.STARTED and
  2417. not self.archiveresult.output_str
  2418. )
  2419. def is_finished(self) -> bool:
  2420. """Check if extraction has completed (success, failure, or skipped)."""
  2421. return self.archiveresult.status in (
  2422. ArchiveResult.StatusChoices.SUCCEEDED,
  2423. ArchiveResult.StatusChoices.FAILED,
  2424. ArchiveResult.StatusChoices.SKIPPED,
  2425. )
  2426. @queued.enter
  2427. def enter_queued(self):
  2428. self.archiveresult.update_and_requeue(
  2429. retry_at=timezone.now(),
  2430. status=ArchiveResult.StatusChoices.QUEUED,
  2431. start_ts=None,
  2432. ) # bump the snapshot's retry_at so they pickup any new changes
  2433. @started.enter
  2434. def enter_started(self):
  2435. from archivebox.machine.models import NetworkInterface
  2436. # Update Process with network interface
  2437. if self.archiveresult.process_id:
  2438. self.archiveresult.process.iface = NetworkInterface.current()
  2439. self.archiveresult.process.save()
  2440. # Lock the object and mark start time
  2441. self.archiveresult.update_and_requeue(
  2442. retry_at=timezone.now() + timedelta(seconds=120), # 2 min timeout for plugin
  2443. status=ArchiveResult.StatusChoices.STARTED,
  2444. start_ts=timezone.now(),
  2445. )
  2446. # Run the plugin - this updates status, output, timestamps, etc.
  2447. self.archiveresult.run()
  2448. # Save the updated result
  2449. self.archiveresult.save()
  2450. @backoff.enter
  2451. def enter_backoff(self):
  2452. self.archiveresult.update_and_requeue(
  2453. retry_at=timezone.now() + timedelta(seconds=60),
  2454. status=ArchiveResult.StatusChoices.BACKOFF,
  2455. end_ts=None,
  2456. )
  2457. @succeeded.enter
  2458. def enter_succeeded(self):
  2459. self.archiveresult.update_and_requeue(
  2460. retry_at=None,
  2461. status=ArchiveResult.StatusChoices.SUCCEEDED,
  2462. end_ts=timezone.now(),
  2463. )
  2464. # Update health stats for ArchiveResult, Snapshot, and Crawl cascade
  2465. self.archiveresult.cascade_health_update(success=True)
  2466. @failed.enter
  2467. def enter_failed(self):
  2468. self.archiveresult.update_and_requeue(
  2469. retry_at=None,
  2470. status=ArchiveResult.StatusChoices.FAILED,
  2471. end_ts=timezone.now(),
  2472. )
  2473. # Update health stats for ArchiveResult, Snapshot, and Crawl cascade
  2474. self.archiveresult.cascade_health_update(success=False)
  2475. @skipped.enter
  2476. def enter_skipped(self):
  2477. self.archiveresult.update_and_requeue(
  2478. retry_at=None,
  2479. status=ArchiveResult.StatusChoices.SKIPPED,
  2480. end_ts=timezone.now(),
  2481. )
  2482. def after_transition(self, event: str, source: State, target: State):
  2483. self.archiveresult.snapshot.update_and_requeue() # bump snapshot retry time so it picks up all the new changes
  2484. # =============================================================================
  2485. # State Machine Registration
  2486. # =============================================================================
  2487. # Manually register state machines with python-statemachine registry
  2488. # (normally auto-discovered from statemachines.py, but we define them here for clarity)
  2489. registry.register(SnapshotMachine)
  2490. registry.register(ArchiveResultMachine)