models.py 120 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809281028112812281328142815281628172818281928202821282228232824282528262827282828292830283128322833283428352836283728382839284028412842284328442845284628472848284928502851285228532854285528562857285828592860286128622863286428652866286728682869287028712872287328742875287628772878287928802881288228832884288528862887288828892890289128922893289428952896289728982899290029012902290329042905290629072908290929102911291229132914291529162917291829192920292129222923292429252926292729282929293029312932293329342935293629372938293929402941294229432944294529462947294829492950295129522953295429552956295729582959296029612962296329642965296629672968296929702971297229732974297529762977297829792980298129822983298429852986298729882989299029912992299329942995299629972998299930003001300230033004300530063007300830093010301130123013301430153016301730183019302030213022302330243025302630273028302930303031303230333034303530363037303830393040304130423043304430453046304730483049305030513052305330543055305630573058305930603061306230633064306530663067306830693070307130723073307430753076307730783079308030813082308330843085308630873088308930903091
  1. __package__ = 'archivebox.core'
  2. from typing import Optional, Dict, Iterable, Any, List, TYPE_CHECKING
  3. from archivebox.uuid_compat import uuid7
  4. from datetime import datetime, timedelta
  5. from django_stubs_ext.db.models import TypedModelMeta
  6. import os
  7. import json
  8. from pathlib import Path
  9. from statemachine import State, registry
  10. from django.db import models
  11. from django.db.models import QuerySet, Value, Case, When, IntegerField
  12. from django.utils.functional import cached_property
  13. from django.utils.text import slugify
  14. from django.utils import timezone
  15. from django.core.cache import cache
  16. from django.urls import reverse, reverse_lazy
  17. from django.contrib import admin
  18. from django.conf import settings
  19. from archivebox.config import CONSTANTS
  20. from archivebox.misc.system import get_dir_size, atomic_write
  21. from archivebox.misc.util import parse_date, base_url, domain as url_domain, to_json, ts_to_date_str, urlencode, htmlencode, urldecode
  22. from archivebox.misc.hashing import get_dir_info
  23. from archivebox.hooks import (
  24. get_plugins, get_plugin_name, get_plugin_icon,
  25. )
  26. from archivebox.base_models.models import (
  27. ModelWithUUID, ModelWithSerializers, ModelWithOutputDir,
  28. ModelWithConfig, ModelWithNotes, ModelWithHealthStats,
  29. get_or_create_system_user_pk,
  30. )
  31. from archivebox.workers.models import ModelWithStateMachine, BaseStateMachine
  32. from archivebox.workers.tasks import bg_archive_snapshot
  33. from archivebox.crawls.models import Crawl
  34. from archivebox.machine.models import NetworkInterface, Binary
  35. class Tag(ModelWithSerializers):
  36. # Keep AutoField for compatibility with main branch migrations
  37. # Don't use UUIDField here - requires complex FK transformation
  38. id = models.AutoField(primary_key=True, serialize=False, verbose_name='ID')
  39. created_by = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE, default=get_or_create_system_user_pk, null=True, related_name='tag_set')
  40. created_at = models.DateTimeField(default=timezone.now, db_index=True, null=True)
  41. modified_at = models.DateTimeField(auto_now=True)
  42. name = models.CharField(unique=True, blank=False, max_length=100)
  43. slug = models.SlugField(unique=True, blank=False, max_length=100, editable=False)
  44. snapshot_set: models.Manager['Snapshot']
  45. class Meta(TypedModelMeta):
  46. app_label = 'core'
  47. verbose_name = "Tag"
  48. verbose_name_plural = "Tags"
  49. def __str__(self):
  50. return self.name
  51. def save(self, *args, **kwargs):
  52. is_new = self._state.adding
  53. if is_new:
  54. self.slug = slugify(self.name)
  55. existing = set(Tag.objects.filter(slug__startswith=self.slug).values_list("slug", flat=True))
  56. i = None
  57. while True:
  58. slug = f"{slugify(self.name)}_{i}" if i else slugify(self.name)
  59. if slug not in existing:
  60. self.slug = slug
  61. break
  62. i = (i or 0) + 1
  63. super().save(*args, **kwargs)
  64. if is_new:
  65. from archivebox.misc.logging_util import log_worker_event
  66. log_worker_event(
  67. worker_type='DB',
  68. event='Created Tag',
  69. indent_level=0,
  70. metadata={
  71. 'id': self.id,
  72. 'name': self.name,
  73. 'slug': self.slug,
  74. },
  75. )
  76. @property
  77. def api_url(self) -> str:
  78. return reverse_lazy('api-1:get_tag', args=[self.id])
  79. def to_jsonl(self) -> dict:
  80. """
  81. Convert Tag model instance to a JSONL record.
  82. """
  83. from archivebox.config import VERSION
  84. return {
  85. 'type': 'Tag',
  86. 'schema_version': VERSION,
  87. 'id': str(self.id),
  88. 'name': self.name,
  89. 'slug': self.slug,
  90. }
  91. @staticmethod
  92. def from_jsonl(record: Dict[str, Any], overrides: Dict[str, Any] = None):
  93. """
  94. Create/update Tag from JSONL record.
  95. Args:
  96. record: JSONL record with 'name' field
  97. overrides: Optional dict with 'snapshot' to auto-attach tag
  98. Returns:
  99. Tag instance or None
  100. """
  101. name = record.get('name')
  102. if not name:
  103. return None
  104. tag, _ = Tag.objects.get_or_create(name=name)
  105. # Auto-attach to snapshot if in overrides
  106. if overrides and 'snapshot' in overrides and tag:
  107. overrides['snapshot'].tags.add(tag)
  108. return tag
  109. # Aliases
  110. from_json = from_jsonl
  111. to_json = to_jsonl
  112. class SnapshotTag(models.Model):
  113. id = models.AutoField(primary_key=True)
  114. snapshot = models.ForeignKey('Snapshot', db_column='snapshot_id', on_delete=models.CASCADE, to_field='id')
  115. tag = models.ForeignKey(Tag, db_column='tag_id', on_delete=models.CASCADE, to_field='id')
  116. class Meta:
  117. app_label = 'core'
  118. db_table = 'core_snapshot_tags'
  119. unique_together = [('snapshot', 'tag')]
  120. class SnapshotQuerySet(models.QuerySet):
  121. """Custom QuerySet for Snapshot model with export methods that persist through .filter() etc."""
  122. # =========================================================================
  123. # Filtering Methods
  124. # =========================================================================
  125. FILTER_TYPES = {
  126. 'exact': lambda pattern: models.Q(url=pattern),
  127. 'substring': lambda pattern: models.Q(url__icontains=pattern),
  128. 'regex': lambda pattern: models.Q(url__iregex=pattern),
  129. 'domain': lambda pattern: models.Q(url__istartswith=f"http://{pattern}") | models.Q(url__istartswith=f"https://{pattern}") | models.Q(url__istartswith=f"ftp://{pattern}"),
  130. 'tag': lambda pattern: models.Q(tags__name=pattern),
  131. 'timestamp': lambda pattern: models.Q(timestamp=pattern),
  132. }
  133. def filter_by_patterns(self, patterns: List[str], filter_type: str = 'exact') -> 'SnapshotQuerySet':
  134. """Filter snapshots by URL patterns using specified filter type"""
  135. from archivebox.misc.logging import stderr
  136. q_filter = models.Q()
  137. for pattern in patterns:
  138. try:
  139. q_filter = q_filter | self.FILTER_TYPES[filter_type](pattern)
  140. except KeyError:
  141. stderr()
  142. stderr(f'[X] Got invalid pattern for --filter-type={filter_type}:', color='red')
  143. stderr(f' {pattern}')
  144. raise SystemExit(2)
  145. return self.filter(q_filter)
  146. def search(self, patterns: List[str]) -> 'SnapshotQuerySet':
  147. """Search snapshots using the configured search backend"""
  148. from archivebox.config.common import SEARCH_BACKEND_CONFIG
  149. from archivebox.search import query_search_index
  150. from archivebox.misc.logging import stderr
  151. if not SEARCH_BACKEND_CONFIG.USE_SEARCHING_BACKEND:
  152. stderr()
  153. stderr('[X] The search backend is not enabled, set config.USE_SEARCHING_BACKEND = True', color='red')
  154. raise SystemExit(2)
  155. qsearch = self.none()
  156. for pattern in patterns:
  157. try:
  158. qsearch |= query_search_index(pattern)
  159. except:
  160. raise SystemExit(2)
  161. return self.all() & qsearch
  162. # =========================================================================
  163. # Export Methods
  164. # =========================================================================
  165. def to_json(self, with_headers: bool = False) -> str:
  166. """Generate JSON index from snapshots"""
  167. import sys
  168. from datetime import datetime, timezone as tz
  169. from archivebox.config import VERSION
  170. from archivebox.config.common import SERVER_CONFIG
  171. MAIN_INDEX_HEADER = {
  172. 'info': 'This is an index of site data archived by ArchiveBox: The self-hosted web archive.',
  173. 'schema': 'archivebox.index.json',
  174. 'copyright_info': SERVER_CONFIG.FOOTER_INFO,
  175. 'meta': {
  176. 'project': 'ArchiveBox',
  177. 'version': VERSION,
  178. 'git_sha': VERSION,
  179. 'website': 'https://ArchiveBox.io',
  180. 'docs': 'https://github.com/ArchiveBox/ArchiveBox/wiki',
  181. 'source': 'https://github.com/ArchiveBox/ArchiveBox',
  182. 'issues': 'https://github.com/ArchiveBox/ArchiveBox/issues',
  183. 'dependencies': {},
  184. },
  185. } if with_headers else {}
  186. snapshot_dicts = [s.to_dict(extended=True) for s in self.iterator(chunk_size=500)]
  187. if with_headers:
  188. output = {
  189. **MAIN_INDEX_HEADER,
  190. 'num_links': len(snapshot_dicts),
  191. 'updated': datetime.now(tz.utc),
  192. 'last_run_cmd': sys.argv,
  193. 'links': snapshot_dicts,
  194. }
  195. else:
  196. output = snapshot_dicts
  197. return to_json(output, indent=4, sort_keys=True)
  198. def to_csv(self, cols: Optional[List[str]] = None, header: bool = True, separator: str = ',', ljust: int = 0) -> str:
  199. """Generate CSV output from snapshots"""
  200. cols = cols or ['timestamp', 'is_archived', 'url']
  201. header_str = separator.join(col.ljust(ljust) for col in cols) if header else ''
  202. row_strs = (s.to_csv(cols=cols, ljust=ljust, separator=separator) for s in self.iterator(chunk_size=500))
  203. return '\n'.join((header_str, *row_strs))
  204. def to_html(self, with_headers: bool = True) -> str:
  205. """Generate main index HTML from snapshots"""
  206. from datetime import datetime, timezone as tz
  207. from django.template.loader import render_to_string
  208. from archivebox.config import VERSION
  209. from archivebox.config.common import SERVER_CONFIG
  210. from archivebox.config.version import get_COMMIT_HASH
  211. template = 'static_index.html' if with_headers else 'minimal_index.html'
  212. snapshot_list = list(self.iterator(chunk_size=500))
  213. return render_to_string(template, {
  214. 'version': VERSION,
  215. 'git_sha': get_COMMIT_HASH() or VERSION,
  216. 'num_links': str(len(snapshot_list)),
  217. 'date_updated': datetime.now(tz.utc).strftime('%Y-%m-%d'),
  218. 'time_updated': datetime.now(tz.utc).strftime('%Y-%m-%d %H:%M'),
  219. 'links': snapshot_list,
  220. 'FOOTER_INFO': SERVER_CONFIG.FOOTER_INFO,
  221. })
  222. class SnapshotManager(models.Manager.from_queryset(SnapshotQuerySet)):
  223. """Manager for Snapshot model - uses SnapshotQuerySet for chainable methods"""
  224. def filter(self, *args, **kwargs):
  225. domain = kwargs.pop('domain', None)
  226. qs = super().filter(*args, **kwargs)
  227. if domain:
  228. qs = qs.filter(url__icontains=f'://{domain}')
  229. return qs
  230. def get_queryset(self):
  231. # Don't prefetch by default - it causes "too many open files" during bulk operations
  232. # Views/templates can add .prefetch_related('tags', 'archiveresult_set') where needed
  233. return super().get_queryset()
  234. # =========================================================================
  235. # Import Methods
  236. # =========================================================================
  237. def remove(self, atomic: bool = False) -> tuple:
  238. """Remove snapshots from the database"""
  239. from django.db import transaction
  240. if atomic:
  241. with transaction.atomic():
  242. return self.delete()
  243. return self.delete()
  244. class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHealthStats, ModelWithStateMachine):
  245. id = models.UUIDField(primary_key=True, default=uuid7, editable=False, unique=True)
  246. created_at = models.DateTimeField(default=timezone.now, db_index=True)
  247. modified_at = models.DateTimeField(auto_now=True)
  248. url = models.URLField(unique=False, db_index=True) # URLs can appear in multiple crawls
  249. timestamp = models.CharField(max_length=32, unique=True, db_index=True, editable=False)
  250. bookmarked_at = models.DateTimeField(default=timezone.now, db_index=True)
  251. crawl: Crawl = models.ForeignKey(Crawl, on_delete=models.CASCADE, null=False, related_name='snapshot_set', db_index=True) # type: ignore[assignment]
  252. parent_snapshot = models.ForeignKey('self', on_delete=models.SET_NULL, null=True, blank=True, related_name='child_snapshots', db_index=True, help_text='Parent snapshot that discovered this URL (for recursive crawling)')
  253. title = models.CharField(max_length=512, null=True, blank=True, db_index=True)
  254. downloaded_at = models.DateTimeField(default=None, null=True, editable=False, db_index=True, blank=True)
  255. depth = models.PositiveSmallIntegerField(default=0, db_index=True) # 0 for root snapshot, 1+ for discovered URLs
  256. fs_version = models.CharField(max_length=10, default='0.9.0', help_text='Filesystem version of this snapshot (e.g., "0.7.0", "0.8.0", "0.9.0"). Used to trigger lazy migration on save().')
  257. current_step = models.PositiveSmallIntegerField(default=0, db_index=True, help_text='Current hook step being executed (0-9). Used for sequential hook execution.')
  258. retry_at = ModelWithStateMachine.RetryAtField(default=timezone.now)
  259. status = ModelWithStateMachine.StatusField(choices=ModelWithStateMachine.StatusChoices, default=ModelWithStateMachine.StatusChoices.QUEUED)
  260. config = models.JSONField(default=dict, null=False, blank=False, editable=True)
  261. notes = models.TextField(blank=True, null=False, default='')
  262. # output_dir is computed via @cached_property from fs_version and get_storage_path_for_version()
  263. tags = models.ManyToManyField(Tag, blank=True, through=SnapshotTag, related_name='snapshot_set', through_fields=('snapshot', 'tag'))
  264. state_machine_name = 'archivebox.core.models.SnapshotMachine'
  265. state_field_name = 'status'
  266. retry_at_field_name = 'retry_at'
  267. StatusChoices = ModelWithStateMachine.StatusChoices
  268. active_state = StatusChoices.STARTED
  269. objects = SnapshotManager()
  270. archiveresult_set: models.Manager['ArchiveResult']
  271. class Meta(TypedModelMeta):
  272. app_label = 'core'
  273. verbose_name = "Snapshot"
  274. verbose_name_plural = "Snapshots"
  275. constraints = [
  276. # Allow same URL in different crawls, but not duplicates within same crawl
  277. models.UniqueConstraint(fields=['url', 'crawl'], name='unique_url_per_crawl'),
  278. # Global timestamp uniqueness for 1:1 symlink mapping
  279. models.UniqueConstraint(fields=['timestamp'], name='unique_timestamp'),
  280. ]
  281. def __str__(self):
  282. return f'[{self.id}] {self.url[:64]}'
  283. @property
  284. def created_by(self):
  285. """Convenience property to access the user who created this snapshot via its crawl."""
  286. return self.crawl.created_by
  287. @property
  288. def process_set(self):
  289. """Get all Process objects related to this snapshot's ArchiveResults."""
  290. from archivebox.machine.models import Process
  291. return Process.objects.filter(archiveresult__snapshot_id=self.id)
  292. @property
  293. def binary_set(self):
  294. """Get all Binary objects used by processes related to this snapshot."""
  295. from archivebox.machine.models import Binary
  296. return Binary.objects.filter(process_set__archiveresult__snapshot_id=self.id).distinct()
  297. def save(self, *args, **kwargs):
  298. is_new = self._state.adding
  299. if not self.bookmarked_at:
  300. self.bookmarked_at = self.created_at or timezone.now()
  301. if not self.timestamp:
  302. self.timestamp = str(self.bookmarked_at.timestamp())
  303. # Migrate filesystem if needed (happens automatically on save)
  304. if self.pk and self.fs_migration_needed:
  305. from django.db import transaction
  306. with transaction.atomic():
  307. # Walk through migration chain automatically
  308. current = self.fs_version
  309. target = self._fs_current_version()
  310. while current != target:
  311. next_ver = self._fs_next_version(current)
  312. method = f'_fs_migrate_from_{current.replace(".", "_")}_to_{next_ver.replace(".", "_")}'
  313. # Only run if method exists (most are no-ops)
  314. if hasattr(self, method):
  315. getattr(self, method)()
  316. current = next_ver
  317. # Update version (still in transaction)
  318. self.fs_version = target
  319. super().save(*args, **kwargs)
  320. if self.url not in self.crawl.urls:
  321. self.crawl.urls += f'\n{self.url}'
  322. self.crawl.save()
  323. if is_new:
  324. from archivebox.misc.logging_util import log_worker_event
  325. log_worker_event(
  326. worker_type='DB',
  327. event='Created Snapshot',
  328. indent_level=2,
  329. url=self.url,
  330. metadata={
  331. 'id': str(self.id),
  332. 'crawl_id': str(self.crawl_id),
  333. 'depth': self.depth,
  334. 'status': self.status,
  335. },
  336. )
  337. # =========================================================================
  338. # Filesystem Migration Methods
  339. # =========================================================================
  340. @staticmethod
  341. def _fs_current_version() -> str:
  342. """Get current ArchiveBox filesystem version (normalized to x.x.0 format)"""
  343. from archivebox.config import VERSION
  344. # Normalize version to x.x.0 format (e.g., "0.9.0rc1" -> "0.9.0")
  345. parts = VERSION.split('.')
  346. if len(parts) >= 2:
  347. major, minor = parts[0], parts[1]
  348. # Strip any non-numeric suffix from minor version
  349. minor = ''.join(c for c in minor if c.isdigit())
  350. return f'{major}.{minor}.0'
  351. return '0.9.0' # Fallback if version parsing fails
  352. @property
  353. def fs_migration_needed(self) -> bool:
  354. """Check if snapshot needs filesystem migration"""
  355. return self.fs_version != self._fs_current_version()
  356. def _fs_next_version(self, version: str) -> str:
  357. """Get next version in migration chain (0.7/0.8 had same layout, only 0.8→0.9 migration needed)"""
  358. # Treat 0.7.0 and 0.8.0 as equivalent (both used archive/{timestamp})
  359. if version in ('0.7.0', '0.8.0'):
  360. return '0.9.0'
  361. return self._fs_current_version()
  362. def _fs_migrate_from_0_8_0_to_0_9_0(self):
  363. """
  364. Migrate from flat to nested structure.
  365. 0.8.x: archive/{timestamp}/
  366. 0.9.x: users/{user}/snapshots/YYYYMMDD/{domain}/{uuid}/
  367. Transaction handling:
  368. 1. Copy files INSIDE transaction
  369. 2. Convert index.json to index.jsonl INSIDE transaction
  370. 3. Create symlink INSIDE transaction
  371. 4. Update fs_version INSIDE transaction (done by save())
  372. 5. Exit transaction (DB commit)
  373. 6. Delete old files OUTSIDE transaction (after commit)
  374. """
  375. import shutil
  376. from django.db import transaction
  377. old_dir = self.get_storage_path_for_version('0.8.0')
  378. new_dir = self.get_storage_path_for_version('0.9.0')
  379. if not old_dir.exists() or old_dir == new_dir or new_dir.exists():
  380. # Even if no directory migration needed, still convert index format
  381. self.convert_index_json_to_jsonl()
  382. return
  383. new_dir.mkdir(parents=True, exist_ok=True)
  384. # Copy all files (idempotent), skipping index.json (will be converted to jsonl)
  385. for old_file in old_dir.rglob('*'):
  386. if not old_file.is_file():
  387. continue
  388. rel_path = old_file.relative_to(old_dir)
  389. new_file = new_dir / rel_path
  390. # Skip if already copied
  391. if new_file.exists() and new_file.stat().st_size == old_file.stat().st_size:
  392. continue
  393. new_file.parent.mkdir(parents=True, exist_ok=True)
  394. shutil.copy2(old_file, new_file)
  395. # Verify all copied
  396. old_files = {f.relative_to(old_dir): f.stat().st_size
  397. for f in old_dir.rglob('*') if f.is_file()}
  398. new_files = {f.relative_to(new_dir): f.stat().st_size
  399. for f in new_dir.rglob('*') if f.is_file()}
  400. if old_files.keys() != new_files.keys():
  401. missing = old_files.keys() - new_files.keys()
  402. raise Exception(f"Migration incomplete: missing {missing}")
  403. # Convert index.json to index.jsonl in the new directory
  404. self.convert_index_json_to_jsonl()
  405. # Create backwards-compat symlink (INSIDE transaction)
  406. symlink_path = CONSTANTS.ARCHIVE_DIR / self.timestamp
  407. if symlink_path.is_symlink():
  408. symlink_path.unlink()
  409. if not symlink_path.exists() or symlink_path == old_dir:
  410. symlink_path.symlink_to(new_dir, target_is_directory=True)
  411. # Schedule old directory deletion AFTER transaction commits
  412. transaction.on_commit(lambda: self._cleanup_old_migration_dir(old_dir))
  413. def _cleanup_old_migration_dir(self, old_dir: Path):
  414. """
  415. Delete old directory after successful migration.
  416. Called via transaction.on_commit() after DB commit succeeds.
  417. """
  418. import shutil
  419. import logging
  420. if old_dir.exists() and not old_dir.is_symlink():
  421. try:
  422. shutil.rmtree(old_dir)
  423. except Exception as e:
  424. # Log but don't raise - migration succeeded, this is just cleanup
  425. logging.getLogger('archivebox.migration').warning(
  426. f"Could not remove old migration directory {old_dir}: {e}"
  427. )
  428. # =========================================================================
  429. # Path Calculation and Migration Helpers
  430. # =========================================================================
  431. @staticmethod
  432. def extract_domain_from_url(url: str) -> str:
  433. """
  434. Extract domain from URL for 0.9.x path structure.
  435. Uses full hostname with sanitized special chars.
  436. Examples:
  437. https://example.com:8080 → example.com_8080
  438. https://sub.example.com → sub.example.com
  439. file:///path → localhost
  440. data:text/html → data
  441. """
  442. from urllib.parse import urlparse
  443. try:
  444. parsed = urlparse(url)
  445. if parsed.scheme in ('http', 'https'):
  446. if parsed.port:
  447. return f"{parsed.hostname}_{parsed.port}".replace(':', '_')
  448. return parsed.hostname or 'unknown'
  449. elif parsed.scheme == 'file':
  450. return 'localhost'
  451. elif parsed.scheme:
  452. return parsed.scheme
  453. else:
  454. return 'unknown'
  455. except Exception:
  456. return 'unknown'
  457. def get_storage_path_for_version(self, version: str) -> Path:
  458. """
  459. Calculate storage path for specific filesystem version.
  460. Centralizes path logic so it's reusable.
  461. 0.7.x/0.8.x: archive/{timestamp}
  462. 0.9.x: users/{username}/snapshots/YYYYMMDD/{domain}/{uuid}/
  463. """
  464. from datetime import datetime
  465. if version in ('0.7.0', '0.8.0'):
  466. return CONSTANTS.ARCHIVE_DIR / self.timestamp
  467. elif version in ('0.9.0', '1.0.0'):
  468. username = self.created_by.username
  469. # Use created_at for date grouping (fallback to timestamp)
  470. if self.created_at:
  471. date_str = self.created_at.strftime('%Y%m%d')
  472. else:
  473. date_str = datetime.fromtimestamp(float(self.timestamp)).strftime('%Y%m%d')
  474. domain = self.extract_domain_from_url(self.url)
  475. return (
  476. CONSTANTS.DATA_DIR / 'users' / username / 'snapshots' /
  477. date_str / domain / str(self.id)
  478. )
  479. else:
  480. # Unknown version - use current
  481. return self.get_storage_path_for_version(self._fs_current_version())
  482. # =========================================================================
  483. # Loading and Creation from Filesystem (Used by archivebox update ONLY)
  484. # =========================================================================
  485. @classmethod
  486. def load_from_directory(cls, snapshot_dir: Path) -> Optional['Snapshot']:
  487. """
  488. Load existing Snapshot from DB by reading index.jsonl or index.json.
  489. Reads index file, extracts url+timestamp, queries DB.
  490. Returns existing Snapshot or None if not found/invalid.
  491. Does NOT create new snapshots.
  492. ONLY used by: archivebox update (for orphan detection)
  493. """
  494. import json
  495. # Try index.jsonl first (new format), then index.json (legacy)
  496. jsonl_path = snapshot_dir / CONSTANTS.JSONL_INDEX_FILENAME
  497. json_path = snapshot_dir / CONSTANTS.JSON_INDEX_FILENAME
  498. data = None
  499. if jsonl_path.exists():
  500. try:
  501. with open(jsonl_path) as f:
  502. for line in f:
  503. line = line.strip()
  504. if line.startswith('{'):
  505. record = json.loads(line)
  506. if record.get('type') == 'Snapshot':
  507. data = record
  508. break
  509. except (json.JSONDecodeError, OSError):
  510. pass
  511. elif json_path.exists():
  512. try:
  513. with open(json_path) as f:
  514. data = json.load(f)
  515. except (json.JSONDecodeError, OSError):
  516. pass
  517. if not data:
  518. return None
  519. url = data.get('url')
  520. if not url:
  521. return None
  522. # Get timestamp - prefer index file, fallback to folder name
  523. timestamp = cls._select_best_timestamp(
  524. index_timestamp=data.get('timestamp'),
  525. folder_name=snapshot_dir.name
  526. )
  527. if not timestamp:
  528. return None
  529. # Look up existing
  530. try:
  531. return cls.objects.get(url=url, timestamp=timestamp)
  532. except cls.DoesNotExist:
  533. return None
  534. except cls.MultipleObjectsReturned:
  535. # Should not happen with unique constraint
  536. return cls.objects.filter(url=url, timestamp=timestamp).first()
  537. @classmethod
  538. def create_from_directory(cls, snapshot_dir: Path) -> Optional['Snapshot']:
  539. """
  540. Create new Snapshot from orphaned directory.
  541. Validates timestamp, ensures uniqueness.
  542. Returns new UNSAVED Snapshot or None if invalid.
  543. ONLY used by: archivebox update (for orphan import)
  544. """
  545. import json
  546. # Try index.jsonl first (new format), then index.json (legacy)
  547. jsonl_path = snapshot_dir / CONSTANTS.JSONL_INDEX_FILENAME
  548. json_path = snapshot_dir / CONSTANTS.JSON_INDEX_FILENAME
  549. data = None
  550. if jsonl_path.exists():
  551. try:
  552. with open(jsonl_path) as f:
  553. for line in f:
  554. line = line.strip()
  555. if line.startswith('{'):
  556. record = json.loads(line)
  557. if record.get('type') == 'Snapshot':
  558. data = record
  559. break
  560. except (json.JSONDecodeError, OSError):
  561. pass
  562. elif json_path.exists():
  563. try:
  564. with open(json_path) as f:
  565. data = json.load(f)
  566. except (json.JSONDecodeError, OSError):
  567. pass
  568. if not data:
  569. return None
  570. url = data.get('url')
  571. if not url:
  572. return None
  573. # Get and validate timestamp
  574. timestamp = cls._select_best_timestamp(
  575. index_timestamp=data.get('timestamp'),
  576. folder_name=snapshot_dir.name
  577. )
  578. if not timestamp:
  579. return None
  580. # Ensure uniqueness (reuses existing logic from create_or_update_from_dict)
  581. timestamp = cls._ensure_unique_timestamp(url, timestamp)
  582. # Detect version
  583. fs_version = cls._detect_fs_version_from_index(data)
  584. # Get or create catchall crawl for orphaned snapshots
  585. from archivebox.crawls.models import Crawl
  586. system_user_id = get_or_create_system_user_pk()
  587. catchall_crawl, _ = Crawl.objects.get_or_create(
  588. label='[migration] orphaned snapshots',
  589. defaults={
  590. 'urls': f'# Orphaned snapshot: {url}',
  591. 'max_depth': 0,
  592. 'created_by_id': system_user_id,
  593. }
  594. )
  595. return cls(
  596. url=url,
  597. timestamp=timestamp,
  598. title=data.get('title', ''),
  599. fs_version=fs_version,
  600. crawl=catchall_crawl,
  601. )
  602. @staticmethod
  603. def _select_best_timestamp(index_timestamp: str, folder_name: str) -> Optional[str]:
  604. """
  605. Select best timestamp from index.json vs folder name.
  606. Validates range (1995-2035).
  607. Prefers index.json if valid.
  608. """
  609. def is_valid_timestamp(ts):
  610. try:
  611. ts_int = int(float(ts))
  612. # 1995-01-01 to 2035-12-31
  613. return 788918400 <= ts_int <= 2082758400
  614. except:
  615. return False
  616. index_valid = is_valid_timestamp(index_timestamp) if index_timestamp else False
  617. folder_valid = is_valid_timestamp(folder_name)
  618. if index_valid:
  619. return str(int(float(index_timestamp)))
  620. elif folder_valid:
  621. return str(int(float(folder_name)))
  622. else:
  623. return None
  624. @classmethod
  625. def _ensure_unique_timestamp(cls, url: str, timestamp: str) -> str:
  626. """
  627. Ensure timestamp is globally unique.
  628. If collision with different URL, increment by 1 until unique.
  629. NOTE: Logic already exists in create_or_update_from_dict (line 266-267)
  630. This is just an extracted, reusable version.
  631. """
  632. while cls.objects.filter(timestamp=timestamp).exclude(url=url).exists():
  633. timestamp = str(int(float(timestamp)) + 1)
  634. return timestamp
  635. @staticmethod
  636. def _detect_fs_version_from_index(data: dict) -> str:
  637. """
  638. Detect fs_version from index.json structure.
  639. - Has fs_version field: use it
  640. - Has history dict: 0.7.0
  641. - Has archive_results list: 0.8.0
  642. - Default: 0.7.0
  643. """
  644. if 'fs_version' in data:
  645. return data['fs_version']
  646. if 'history' in data and 'archive_results' not in data:
  647. return '0.7.0'
  648. if 'archive_results' in data:
  649. return '0.8.0'
  650. return '0.7.0'
  651. # =========================================================================
  652. # Index.json Reconciliation
  653. # =========================================================================
  654. def reconcile_with_index(self):
  655. """
  656. Merge index.json/index.jsonl with DB. DB is source of truth.
  657. - Title: longest non-URL
  658. - Tags: union
  659. - ArchiveResults: keep both (by plugin+start_ts)
  660. Converts index.json to index.jsonl if needed, then writes back in JSONL format.
  661. Used by: archivebox update (to sync index with DB)
  662. """
  663. import json
  664. # Try to convert index.json to index.jsonl first
  665. self.convert_index_json_to_jsonl()
  666. # Check for index.jsonl (preferred) or index.json (legacy)
  667. jsonl_path = Path(self.output_dir) / CONSTANTS.JSONL_INDEX_FILENAME
  668. json_path = Path(self.output_dir) / CONSTANTS.JSON_INDEX_FILENAME
  669. index_data = {}
  670. if jsonl_path.exists():
  671. # Read from JSONL format
  672. jsonl_data = self.read_index_jsonl()
  673. if jsonl_data['snapshot']:
  674. index_data = jsonl_data['snapshot']
  675. # Convert archive_results list to expected format
  676. index_data['archive_results'] = jsonl_data['archive_results']
  677. elif json_path.exists():
  678. # Fallback to legacy JSON format
  679. try:
  680. with open(json_path) as f:
  681. index_data = json.load(f)
  682. except:
  683. pass
  684. # Merge title
  685. self._merge_title_from_index(index_data)
  686. # Merge tags
  687. self._merge_tags_from_index(index_data)
  688. # Merge ArchiveResults
  689. self._merge_archive_results_from_index(index_data)
  690. # Write back in JSONL format
  691. self.write_index_jsonl()
  692. def reconcile_with_index_json(self):
  693. """Deprecated: use reconcile_with_index() instead."""
  694. return self.reconcile_with_index()
  695. def _merge_title_from_index(self, index_data: dict):
  696. """Merge title - prefer longest non-URL title."""
  697. index_title = index_data.get('title', '').strip()
  698. db_title = self.title or ''
  699. candidates = [t for t in [index_title, db_title] if t and t != self.url]
  700. if candidates:
  701. best_title = max(candidates, key=len)
  702. if self.title != best_title:
  703. self.title = best_title
  704. def _merge_tags_from_index(self, index_data: dict):
  705. """Merge tags - union of both sources."""
  706. from django.db import transaction
  707. index_tags = set(index_data.get('tags', '').split(',')) if index_data.get('tags') else set()
  708. index_tags = {t.strip() for t in index_tags if t.strip()}
  709. db_tags = set(self.tags.values_list('name', flat=True))
  710. new_tags = index_tags - db_tags
  711. if new_tags:
  712. with transaction.atomic():
  713. for tag_name in new_tags:
  714. tag, _ = Tag.objects.get_or_create(name=tag_name)
  715. self.tags.add(tag)
  716. def _merge_archive_results_from_index(self, index_data: dict):
  717. """Merge ArchiveResults - keep both (by plugin+start_ts)."""
  718. existing = {
  719. (ar.plugin, ar.start_ts): ar
  720. for ar in ArchiveResult.objects.filter(snapshot=self)
  721. }
  722. # Handle 0.8.x format (archive_results list)
  723. for result_data in index_data.get('archive_results', []):
  724. self._create_archive_result_if_missing(result_data, existing)
  725. # Handle 0.7.x format (history dict)
  726. if 'history' in index_data and isinstance(index_data['history'], dict):
  727. for plugin, result_list in index_data['history'].items():
  728. if isinstance(result_list, list):
  729. for result_data in result_list:
  730. # Support both old 'extractor' and new 'plugin' keys for backwards compat
  731. result_data['plugin'] = result_data.get('plugin') or result_data.get('extractor') or plugin
  732. self._create_archive_result_if_missing(result_data, existing)
  733. def _create_archive_result_if_missing(self, result_data: dict, existing: dict):
  734. """Create ArchiveResult if not already in DB."""
  735. from dateutil import parser
  736. # Support both old 'extractor' and new 'plugin' keys for backwards compat
  737. plugin = result_data.get('plugin') or result_data.get('extractor', '')
  738. if not plugin:
  739. return
  740. start_ts = None
  741. if result_data.get('start_ts'):
  742. try:
  743. start_ts = parser.parse(result_data['start_ts'])
  744. except:
  745. pass
  746. if (plugin, start_ts) in existing:
  747. return
  748. try:
  749. end_ts = None
  750. if result_data.get('end_ts'):
  751. try:
  752. end_ts = parser.parse(result_data['end_ts'])
  753. except:
  754. pass
  755. # Support both 'output' (legacy) and 'output_str' (new JSONL) field names
  756. output_str = result_data.get('output_str') or result_data.get('output', '')
  757. ArchiveResult.objects.create(
  758. snapshot=self,
  759. plugin=plugin,
  760. hook_name=result_data.get('hook_name', ''),
  761. status=result_data.get('status', 'failed'),
  762. output_str=output_str,
  763. cmd=result_data.get('cmd', []),
  764. pwd=result_data.get('pwd', str(self.output_dir)),
  765. start_ts=start_ts,
  766. end_ts=end_ts,
  767. )
  768. except:
  769. pass
  770. def write_index_json(self):
  771. """Write index.json in 0.9.x format (deprecated, use write_index_jsonl)."""
  772. import json
  773. index_path = Path(self.output_dir) / 'index.json'
  774. data = {
  775. 'url': self.url,
  776. 'timestamp': self.timestamp,
  777. 'title': self.title or '',
  778. 'tags': ','.join(sorted(self.tags.values_list('name', flat=True))),
  779. 'fs_version': self.fs_version,
  780. 'bookmarked_at': self.bookmarked_at.isoformat() if self.bookmarked_at else None,
  781. 'created_at': self.created_at.isoformat() if self.created_at else None,
  782. 'archive_results': [
  783. {
  784. 'plugin': ar.plugin,
  785. 'status': ar.status,
  786. 'start_ts': ar.start_ts.isoformat() if ar.start_ts else None,
  787. 'end_ts': ar.end_ts.isoformat() if ar.end_ts else None,
  788. 'output': ar.output_str or '',
  789. 'cmd': ar.cmd if isinstance(ar.cmd, list) else [],
  790. 'pwd': ar.pwd,
  791. }
  792. for ar in ArchiveResult.objects.filter(snapshot=self).order_by('start_ts')
  793. ],
  794. }
  795. index_path.parent.mkdir(parents=True, exist_ok=True)
  796. with open(index_path, 'w') as f:
  797. json.dump(data, f, indent=2, sort_keys=True)
  798. def write_index_jsonl(self):
  799. """
  800. Write index.jsonl in flat JSONL format.
  801. Each line is a JSON record with a 'type' field:
  802. - Snapshot: snapshot metadata (crawl_id, url, tags, etc.)
  803. - ArchiveResult: extractor results (plugin, status, output, etc.)
  804. - Binary: binary info used for the extraction
  805. - Process: process execution details (cmd, exit_code, timing, etc.)
  806. """
  807. import json
  808. index_path = Path(self.output_dir) / CONSTANTS.JSONL_INDEX_FILENAME
  809. index_path.parent.mkdir(parents=True, exist_ok=True)
  810. # Track unique binaries and processes to avoid duplicates
  811. binaries_seen = set()
  812. processes_seen = set()
  813. with open(index_path, 'w') as f:
  814. # Write Snapshot record first (to_jsonl includes crawl_id, fs_version)
  815. f.write(json.dumps(self.to_jsonl()) + '\n')
  816. # Write ArchiveResult records with their associated Binary and Process
  817. # Use select_related to optimize queries
  818. for ar in self.archiveresult_set.select_related('process__binary').order_by('start_ts'):
  819. # Write Binary record if not already written
  820. if ar.process and ar.process.binary and ar.process.binary_id not in binaries_seen:
  821. binaries_seen.add(ar.process.binary_id)
  822. f.write(json.dumps(ar.process.binary.to_jsonl()) + '\n')
  823. # Write Process record if not already written
  824. if ar.process and ar.process_id not in processes_seen:
  825. processes_seen.add(ar.process_id)
  826. f.write(json.dumps(ar.process.to_jsonl()) + '\n')
  827. # Write ArchiveResult record
  828. f.write(json.dumps(ar.to_jsonl()) + '\n')
  829. def read_index_jsonl(self) -> dict:
  830. """
  831. Read index.jsonl and return parsed records grouped by type.
  832. Returns dict with keys: 'snapshot', 'archive_results', 'binaries', 'processes'
  833. """
  834. import json
  835. from archivebox.misc.jsonl import (
  836. TYPE_SNAPSHOT, TYPE_ARCHIVERESULT, TYPE_BINARY, TYPE_PROCESS,
  837. )
  838. index_path = Path(self.output_dir) / CONSTANTS.JSONL_INDEX_FILENAME
  839. result = {
  840. 'snapshot': None,
  841. 'archive_results': [],
  842. 'binaries': [],
  843. 'processes': [],
  844. }
  845. if not index_path.exists():
  846. return result
  847. with open(index_path, 'r') as f:
  848. for line in f:
  849. line = line.strip()
  850. if not line or not line.startswith('{'):
  851. continue
  852. try:
  853. record = json.loads(line)
  854. record_type = record.get('type')
  855. if record_type == TYPE_SNAPSHOT:
  856. result['snapshot'] = record
  857. elif record_type == TYPE_ARCHIVERESULT:
  858. result['archive_results'].append(record)
  859. elif record_type == TYPE_BINARY:
  860. result['binaries'].append(record)
  861. elif record_type == TYPE_PROCESS:
  862. result['processes'].append(record)
  863. except json.JSONDecodeError:
  864. continue
  865. return result
  866. def convert_index_json_to_jsonl(self) -> bool:
  867. """
  868. Convert index.json to index.jsonl format.
  869. Reads existing index.json, creates index.jsonl, and removes index.json.
  870. Returns True if conversion was performed, False if no conversion needed.
  871. """
  872. import json
  873. json_path = Path(self.output_dir) / CONSTANTS.JSON_INDEX_FILENAME
  874. jsonl_path = Path(self.output_dir) / CONSTANTS.JSONL_INDEX_FILENAME
  875. # Skip if already converted or no json file exists
  876. if jsonl_path.exists() or not json_path.exists():
  877. return False
  878. try:
  879. with open(json_path, 'r') as f:
  880. data = json.load(f)
  881. except (json.JSONDecodeError, OSError):
  882. return False
  883. # Detect format version and extract records
  884. fs_version = data.get('fs_version', '0.7.0')
  885. jsonl_path.parent.mkdir(parents=True, exist_ok=True)
  886. with open(jsonl_path, 'w') as f:
  887. # Write Snapshot record
  888. snapshot_record = {
  889. 'type': 'Snapshot',
  890. 'id': str(self.id),
  891. 'crawl_id': str(self.crawl_id) if self.crawl_id else None,
  892. 'url': data.get('url', self.url),
  893. 'timestamp': data.get('timestamp', self.timestamp),
  894. 'title': data.get('title', self.title or ''),
  895. 'tags': data.get('tags', ''),
  896. 'fs_version': fs_version,
  897. 'bookmarked_at': data.get('bookmarked_at'),
  898. 'created_at': data.get('created_at'),
  899. }
  900. f.write(json.dumps(snapshot_record) + '\n')
  901. # Handle 0.8.x/0.9.x format (archive_results list)
  902. for result_data in data.get('archive_results', []):
  903. ar_record = {
  904. 'type': 'ArchiveResult',
  905. 'snapshot_id': str(self.id),
  906. 'plugin': result_data.get('plugin', ''),
  907. 'status': result_data.get('status', ''),
  908. 'output_str': result_data.get('output', ''),
  909. 'start_ts': result_data.get('start_ts'),
  910. 'end_ts': result_data.get('end_ts'),
  911. }
  912. if result_data.get('cmd'):
  913. ar_record['cmd'] = result_data['cmd']
  914. f.write(json.dumps(ar_record) + '\n')
  915. # Handle 0.7.x format (history dict)
  916. if 'history' in data and isinstance(data['history'], dict):
  917. for plugin, result_list in data['history'].items():
  918. if not isinstance(result_list, list):
  919. continue
  920. for result_data in result_list:
  921. ar_record = {
  922. 'type': 'ArchiveResult',
  923. 'snapshot_id': str(self.id),
  924. 'plugin': result_data.get('plugin') or result_data.get('extractor') or plugin,
  925. 'status': result_data.get('status', ''),
  926. 'output_str': result_data.get('output', ''),
  927. 'start_ts': result_data.get('start_ts'),
  928. 'end_ts': result_data.get('end_ts'),
  929. }
  930. if result_data.get('cmd'):
  931. ar_record['cmd'] = result_data['cmd']
  932. f.write(json.dumps(ar_record) + '\n')
  933. # Remove old index.json after successful conversion
  934. try:
  935. json_path.unlink()
  936. except OSError:
  937. pass
  938. return True
  939. # =========================================================================
  940. # Snapshot Utilities
  941. # =========================================================================
  942. @staticmethod
  943. def move_directory_to_invalid(snapshot_dir: Path):
  944. """
  945. Move invalid directory to data/invalid/YYYYMMDD/.
  946. Used by: archivebox update (when encountering invalid directories)
  947. """
  948. from datetime import datetime
  949. import shutil
  950. invalid_dir = CONSTANTS.DATA_DIR / 'invalid' / datetime.now().strftime('%Y%m%d')
  951. invalid_dir.mkdir(parents=True, exist_ok=True)
  952. dest = invalid_dir / snapshot_dir.name
  953. counter = 1
  954. while dest.exists():
  955. dest = invalid_dir / f"{snapshot_dir.name}_{counter}"
  956. counter += 1
  957. try:
  958. shutil.move(str(snapshot_dir), str(dest))
  959. except:
  960. pass
  961. @classmethod
  962. def find_and_merge_duplicates(cls) -> int:
  963. """
  964. Find and merge snapshots with same url:timestamp.
  965. Returns count of duplicate sets merged.
  966. Used by: archivebox update (Phase 3: deduplication)
  967. """
  968. from django.db.models import Count
  969. duplicates = (
  970. cls.objects
  971. .values('url', 'timestamp')
  972. .annotate(count=Count('id'))
  973. .filter(count__gt=1)
  974. )
  975. merged = 0
  976. for dup in duplicates.iterator(chunk_size=500):
  977. snapshots = list(
  978. cls.objects
  979. .filter(url=dup['url'], timestamp=dup['timestamp'])
  980. .order_by('created_at') # Keep oldest
  981. )
  982. if len(snapshots) > 1:
  983. try:
  984. cls._merge_snapshots(snapshots)
  985. merged += 1
  986. except:
  987. pass
  988. return merged
  989. @classmethod
  990. def _merge_snapshots(cls, snapshots: list['Snapshot']):
  991. """
  992. Merge exact duplicates.
  993. Keep oldest, union files + ArchiveResults.
  994. """
  995. import shutil
  996. keeper = snapshots[0]
  997. duplicates = snapshots[1:]
  998. keeper_dir = Path(keeper.output_dir)
  999. for dup in duplicates:
  1000. dup_dir = Path(dup.output_dir)
  1001. # Merge files
  1002. if dup_dir.exists() and dup_dir != keeper_dir:
  1003. for dup_file in dup_dir.rglob('*'):
  1004. if not dup_file.is_file():
  1005. continue
  1006. rel = dup_file.relative_to(dup_dir)
  1007. keeper_file = keeper_dir / rel
  1008. if not keeper_file.exists():
  1009. keeper_file.parent.mkdir(parents=True, exist_ok=True)
  1010. shutil.copy2(dup_file, keeper_file)
  1011. try:
  1012. shutil.rmtree(dup_dir)
  1013. except:
  1014. pass
  1015. # Merge tags
  1016. for tag in dup.tags.all():
  1017. keeper.tags.add(tag)
  1018. # Move ArchiveResults
  1019. ArchiveResult.objects.filter(snapshot=dup).update(snapshot=keeper)
  1020. # Delete
  1021. dup.delete()
  1022. # =========================================================================
  1023. # Output Directory Properties
  1024. # =========================================================================
  1025. @property
  1026. def output_dir_parent(self) -> str:
  1027. return 'archive'
  1028. @property
  1029. def output_dir_name(self) -> str:
  1030. return str(self.timestamp)
  1031. def archive(self, overwrite=False, methods=None):
  1032. return bg_archive_snapshot(self, overwrite=overwrite, methods=methods)
  1033. @admin.display(description='Tags')
  1034. def tags_str(self, nocache=True) -> str | None:
  1035. calc_tags_str = lambda: ','.join(sorted(tag.name for tag in self.tags.all()))
  1036. if hasattr(self, '_prefetched_objects_cache') and 'tags' in self._prefetched_objects_cache:
  1037. return calc_tags_str()
  1038. cache_key = f'{self.pk}-tags'
  1039. return cache.get_or_set(cache_key, calc_tags_str) if not nocache else calc_tags_str()
  1040. def icons(self) -> str:
  1041. """Generate HTML icons showing which extractor plugins have succeeded for this snapshot"""
  1042. from django.utils.html import format_html, mark_safe
  1043. cache_key = f'result_icons:{self.pk}:{(self.downloaded_at or self.modified_at or self.created_at or self.bookmarked_at).timestamp()}'
  1044. def calc_icons():
  1045. if hasattr(self, '_prefetched_objects_cache') and 'archiveresult_set' in self._prefetched_objects_cache:
  1046. archive_results = {r.plugin: r for r in self.archiveresult_set.all() if r.status == "succeeded" and (r.output_files or r.output_str)}
  1047. else:
  1048. # Filter for results that have either output_files or output_str
  1049. from django.db.models import Q
  1050. archive_results = {r.plugin: r for r in self.archiveresult_set.filter(
  1051. Q(status="succeeded") & (Q(output_files__isnull=False) | ~Q(output_str=''))
  1052. )}
  1053. path = self.archive_path
  1054. canon = self.canonical_outputs()
  1055. output = ""
  1056. output_template = '<a href="/{}/{}" class="exists-{}" title="{}">{}</a> &nbsp;'
  1057. # Get all plugins from hooks system (sorted by numeric prefix)
  1058. all_plugins = [get_plugin_name(e) for e in get_plugins()]
  1059. for plugin in all_plugins:
  1060. result = archive_results.get(plugin)
  1061. existing = result and result.status == 'succeeded' and (result.output_files or result.output_str)
  1062. icon = get_plugin_icon(plugin)
  1063. # Skip plugins with empty icons that have no output
  1064. # (e.g., staticfile only shows when there's actual output)
  1065. if not icon.strip() and not existing:
  1066. continue
  1067. output += format_html(
  1068. output_template,
  1069. path,
  1070. canon.get(plugin, plugin + '/'),
  1071. str(bool(existing)),
  1072. plugin,
  1073. icon
  1074. )
  1075. return format_html('<span class="files-icons" style="font-size: 1.1em; opacity: 0.8; min-width: 240px; display: inline-block">{}</span>', mark_safe(output))
  1076. cache_result = cache.get(cache_key)
  1077. if cache_result:
  1078. return cache_result
  1079. fresh_result = calc_icons()
  1080. cache.set(cache_key, fresh_result, timeout=60 * 60 * 24)
  1081. return fresh_result
  1082. @property
  1083. def api_url(self) -> str:
  1084. return reverse_lazy('api-1:get_snapshot', args=[self.id])
  1085. def get_absolute_url(self):
  1086. return f'/{self.archive_path}'
  1087. @cached_property
  1088. def domain(self) -> str:
  1089. return url_domain(self.url)
  1090. @cached_property
  1091. def output_dir(self):
  1092. """The filesystem path to the snapshot's output directory."""
  1093. import os
  1094. current_path = self.get_storage_path_for_version(self.fs_version)
  1095. if current_path.exists():
  1096. return str(current_path)
  1097. # Check for backwards-compat symlink
  1098. old_path = CONSTANTS.ARCHIVE_DIR / self.timestamp
  1099. if old_path.is_symlink():
  1100. return str(Path(os.readlink(old_path)).resolve())
  1101. elif old_path.exists():
  1102. return str(old_path)
  1103. return str(current_path)
  1104. @cached_property
  1105. def archive_path(self):
  1106. return f'{CONSTANTS.ARCHIVE_DIR_NAME}/{self.timestamp}'
  1107. @cached_property
  1108. def archive_size(self):
  1109. try:
  1110. return get_dir_size(self.output_dir)[0]
  1111. except Exception:
  1112. return 0
  1113. def save_tags(self, tags: Iterable[str] = ()) -> None:
  1114. tags_id = [Tag.objects.get_or_create(name=tag)[0].pk for tag in tags if tag.strip()]
  1115. self.tags.clear()
  1116. self.tags.add(*tags_id)
  1117. def pending_archiveresults(self) -> QuerySet['ArchiveResult']:
  1118. return self.archiveresult_set.exclude(status__in=ArchiveResult.FINAL_OR_ACTIVE_STATES)
  1119. def run(self) -> list['ArchiveResult']:
  1120. """
  1121. Execute snapshot by creating pending ArchiveResults for all enabled hooks.
  1122. Called by: SnapshotMachine.enter_started()
  1123. Hook Lifecycle:
  1124. 1. discover_hooks('Snapshot') → finds all plugin hooks
  1125. 2. For each hook:
  1126. - Create ArchiveResult with status=QUEUED
  1127. - Store hook_name (e.g., 'on_Snapshot__50_wget.py')
  1128. 3. ArchiveResults execute independently via ArchiveResultMachine
  1129. 4. Hook execution happens in ArchiveResult.run(), NOT here
  1130. Returns:
  1131. list[ArchiveResult]: Newly created pending results
  1132. """
  1133. return self.create_pending_archiveresults()
  1134. def cleanup(self):
  1135. """
  1136. Clean up background ArchiveResult hooks.
  1137. Called by the state machine when entering the 'sealed' state.
  1138. Kills any background hooks and finalizes their ArchiveResults.
  1139. """
  1140. from archivebox.misc.process_utils import safe_kill_process
  1141. # Kill any background ArchiveResult hooks
  1142. if not self.OUTPUT_DIR.exists():
  1143. return
  1144. # Find all .pid files in this snapshot's output directory
  1145. for pid_file in self.OUTPUT_DIR.glob('**/*.pid'):
  1146. cmd_file = pid_file.parent / 'cmd.sh'
  1147. safe_kill_process(pid_file, cmd_file)
  1148. # Update all STARTED ArchiveResults from filesystem
  1149. results = self.archiveresult_set.filter(status=ArchiveResult.StatusChoices.STARTED)
  1150. for ar in results:
  1151. ar.update_from_output()
  1152. def has_running_background_hooks(self) -> bool:
  1153. """
  1154. Check if any ArchiveResult background hooks are still running.
  1155. Used by state machine to determine if snapshot is finished.
  1156. """
  1157. from archivebox.misc.process_utils import validate_pid_file
  1158. if not self.OUTPUT_DIR.exists():
  1159. return False
  1160. for plugin_dir in self.OUTPUT_DIR.iterdir():
  1161. if not plugin_dir.is_dir():
  1162. continue
  1163. pid_file = plugin_dir / 'hook.pid'
  1164. cmd_file = plugin_dir / 'cmd.sh'
  1165. if validate_pid_file(pid_file, cmd_file):
  1166. return True
  1167. return False
  1168. def to_jsonl(self) -> dict:
  1169. """
  1170. Convert Snapshot model instance to a JSONL record.
  1171. Includes all fields needed to fully reconstruct/identify this snapshot.
  1172. """
  1173. from archivebox.config import VERSION
  1174. return {
  1175. 'type': 'Snapshot',
  1176. 'schema_version': VERSION,
  1177. 'id': str(self.id),
  1178. 'crawl_id': str(self.crawl_id),
  1179. 'url': self.url,
  1180. 'title': self.title,
  1181. 'tags': self.tags_str(),
  1182. 'bookmarked_at': self.bookmarked_at.isoformat() if self.bookmarked_at else None,
  1183. 'created_at': self.created_at.isoformat() if self.created_at else None,
  1184. 'timestamp': self.timestamp,
  1185. 'depth': self.depth,
  1186. 'status': self.status,
  1187. 'fs_version': self.fs_version,
  1188. }
  1189. @staticmethod
  1190. def from_jsonl(record: Dict[str, Any], overrides: Dict[str, Any] = None, queue_for_extraction: bool = True):
  1191. """
  1192. Create/update Snapshot from JSONL record or dict.
  1193. Unified method that handles:
  1194. - ID-based patching: {"id": "...", "title": "new title"}
  1195. - URL-based create/update: {"url": "...", "title": "...", "tags": "..."}
  1196. - Auto-creates Crawl if not provided
  1197. - Optionally queues for extraction
  1198. Args:
  1199. record: Dict with 'url' (for create) or 'id' (for patch), plus other fields
  1200. overrides: Dict with 'crawl', 'snapshot' (parent), 'created_by_id'
  1201. queue_for_extraction: If True, sets status=QUEUED and retry_at (default: True)
  1202. Returns:
  1203. Snapshot instance or None
  1204. """
  1205. import re
  1206. from django.utils import timezone
  1207. from archivebox.misc.util import parse_date
  1208. from archivebox.base_models.models import get_or_create_system_user_pk
  1209. from archivebox.config.common import GENERAL_CONFIG
  1210. overrides = overrides or {}
  1211. # If 'id' is provided, lookup and patch that specific snapshot
  1212. snapshot_id = record.get('id')
  1213. if snapshot_id:
  1214. try:
  1215. snapshot = Snapshot.objects.get(id=snapshot_id)
  1216. # Generically update all fields present in record
  1217. update_fields = []
  1218. for field_name, value in record.items():
  1219. # Skip internal fields
  1220. if field_name in ('id', 'type'):
  1221. continue
  1222. # Skip if field doesn't exist on model
  1223. if not hasattr(snapshot, field_name):
  1224. continue
  1225. # Special parsing for date fields
  1226. if field_name in ('bookmarked_at', 'retry_at', 'created_at', 'modified_at'):
  1227. if value and isinstance(value, str):
  1228. value = parse_date(value)
  1229. # Update field if value is provided and different
  1230. if value is not None and getattr(snapshot, field_name) != value:
  1231. setattr(snapshot, field_name, value)
  1232. update_fields.append(field_name)
  1233. if update_fields:
  1234. snapshot.save(update_fields=update_fields + ['modified_at'])
  1235. return snapshot
  1236. except Snapshot.DoesNotExist:
  1237. # ID not found, fall through to create-by-URL logic
  1238. pass
  1239. url = record.get('url')
  1240. if not url:
  1241. return None
  1242. # Determine or create crawl (every snapshot must have a crawl)
  1243. crawl = overrides.get('crawl')
  1244. parent_snapshot = overrides.get('snapshot') # Parent snapshot
  1245. created_by_id = overrides.get('created_by_id') or (parent_snapshot.created_by.pk if parent_snapshot else get_or_create_system_user_pk())
  1246. # If no crawl provided, inherit from parent or auto-create one
  1247. if not crawl:
  1248. if parent_snapshot:
  1249. # Inherit crawl from parent snapshot
  1250. crawl = parent_snapshot.crawl
  1251. else:
  1252. # Auto-create a single-URL crawl
  1253. from archivebox.crawls.models import Crawl
  1254. from archivebox.config import CONSTANTS
  1255. timestamp_str = timezone.now().strftime("%Y-%m-%d__%H-%M-%S")
  1256. sources_file = CONSTANTS.SOURCES_DIR / f'{timestamp_str}__auto_crawl.txt'
  1257. sources_file.parent.mkdir(parents=True, exist_ok=True)
  1258. sources_file.write_text(url)
  1259. crawl = Crawl.objects.create(
  1260. urls=url,
  1261. max_depth=0,
  1262. label=f'auto-created for {url[:50]}',
  1263. created_by_id=created_by_id,
  1264. )
  1265. # Parse tags
  1266. tags_str = record.get('tags', '')
  1267. tag_list = []
  1268. if tags_str:
  1269. tag_list = list(dict.fromkeys(
  1270. tag.strip() for tag in re.split(GENERAL_CONFIG.TAG_SEPARATOR_PATTERN, tags_str)
  1271. if tag.strip()
  1272. ))
  1273. # Get most recent snapshot with this URL (URLs can exist in multiple crawls)
  1274. snapshot = Snapshot.objects.filter(url=url).order_by('-created_at').first()
  1275. title = record.get('title')
  1276. timestamp = record.get('timestamp')
  1277. if snapshot:
  1278. # Update existing snapshot
  1279. if title and (not snapshot.title or len(title) > len(snapshot.title or '')):
  1280. snapshot.title = title
  1281. snapshot.save(update_fields=['title', 'modified_at'])
  1282. else:
  1283. # Create new snapshot
  1284. if timestamp:
  1285. while Snapshot.objects.filter(timestamp=timestamp).exists():
  1286. timestamp = str(float(timestamp) + 1.0)
  1287. snapshot = Snapshot.objects.create(
  1288. url=url,
  1289. timestamp=timestamp,
  1290. title=title,
  1291. crawl=crawl,
  1292. )
  1293. # Update tags
  1294. if tag_list:
  1295. existing_tags = set(snapshot.tags.values_list('name', flat=True))
  1296. new_tags = set(tag_list) | existing_tags
  1297. snapshot.save_tags(new_tags)
  1298. # Queue for extraction and update additional fields
  1299. update_fields = []
  1300. if queue_for_extraction:
  1301. snapshot.status = Snapshot.StatusChoices.QUEUED
  1302. snapshot.retry_at = timezone.now()
  1303. update_fields.extend(['status', 'retry_at'])
  1304. # Update additional fields if provided
  1305. for field_name in ('depth', 'parent_snapshot_id', 'crawl_id', 'bookmarked_at'):
  1306. value = record.get(field_name)
  1307. if value is not None and getattr(snapshot, field_name) != value:
  1308. setattr(snapshot, field_name, value)
  1309. update_fields.append(field_name)
  1310. if update_fields:
  1311. snapshot.save(update_fields=update_fields + ['modified_at'])
  1312. return snapshot
  1313. # Aliases
  1314. from_json = from_jsonl
  1315. to_json = to_jsonl
  1316. def create_pending_archiveresults(self) -> list['ArchiveResult']:
  1317. """
  1318. Create ArchiveResult records for all enabled hooks.
  1319. Uses the hooks system to discover available hooks from:
  1320. - archivebox/plugins/*/on_Snapshot__*.{py,sh,js}
  1321. - data/plugins/*/on_Snapshot__*.{py,sh,js}
  1322. Creates one ArchiveResult per hook (not per plugin), with hook_name set.
  1323. This enables step-based execution where all hooks in a step can run in parallel.
  1324. """
  1325. from archivebox.hooks import discover_hooks
  1326. hooks = discover_hooks('Snapshot')
  1327. archiveresults = []
  1328. for hook_path in hooks:
  1329. hook_name = hook_path.name # e.g., 'on_Snapshot__50_wget.py'
  1330. plugin = hook_path.parent.name # e.g., 'wget'
  1331. # Check if AR already exists for this specific hook
  1332. if ArchiveResult.objects.filter(snapshot=self, hook_name=hook_name).exists():
  1333. continue
  1334. archiveresult, created = ArchiveResult.objects.get_or_create(
  1335. snapshot=self,
  1336. hook_name=hook_name,
  1337. defaults={
  1338. 'plugin': plugin,
  1339. 'status': ArchiveResult.INITIAL_STATE,
  1340. 'retry_at': timezone.now(),
  1341. },
  1342. )
  1343. if archiveresult.status == ArchiveResult.INITIAL_STATE:
  1344. archiveresults.append(archiveresult)
  1345. return archiveresults
  1346. def advance_step_if_ready(self) -> bool:
  1347. """
  1348. Advance current_step if all foreground hooks in current step are finished.
  1349. Called by the state machine to check if step can advance.
  1350. Background hooks (.bg) don't block step advancement.
  1351. Step advancement rules:
  1352. - All foreground ARs in current step must be finished (SUCCEEDED/FAILED/SKIPPED)
  1353. - Background ARs (hook_name contains '.bg.') are ignored for advancement
  1354. - When ready, increments current_step by 1 (up to 9)
  1355. Returns:
  1356. True if step was advanced, False if not ready or already at step 9.
  1357. """
  1358. from archivebox.hooks import extract_step, is_background_hook
  1359. if self.current_step >= 9:
  1360. return False # Already at final step
  1361. # Get all ARs for current step that are foreground
  1362. current_step_ars = self.archiveresult_set.filter(
  1363. hook_name__isnull=False
  1364. ).exclude(hook_name='')
  1365. # Check each AR in current step
  1366. for ar in current_step_ars:
  1367. ar_step = extract_step(ar.hook_name)
  1368. if ar_step != self.current_step:
  1369. continue # Not in current step
  1370. if is_background_hook(ar.hook_name):
  1371. continue # Background hooks don't block
  1372. # Foreground hook in current step - check if finished
  1373. if ar.status not in ArchiveResult.FINAL_OR_ACTIVE_STATES:
  1374. # Still pending/queued - can't advance
  1375. return False
  1376. if ar.status == ArchiveResult.StatusChoices.STARTED:
  1377. # Still running - can't advance
  1378. return False
  1379. # All foreground hooks in current step are finished - advance!
  1380. self.current_step += 1
  1381. self.save(update_fields=['current_step', 'modified_at'])
  1382. return True
  1383. def is_finished_processing(self) -> bool:
  1384. """
  1385. Check if this snapshot has finished processing.
  1386. Used by SnapshotMachine.is_finished() to determine if snapshot is complete.
  1387. Returns:
  1388. True if all archiveresults are finished (or no work to do), False otherwise.
  1389. """
  1390. # if no archiveresults exist yet, it's not finished
  1391. if not self.archiveresult_set.exists():
  1392. return False
  1393. # Try to advance step if ready (handles step-based hook execution)
  1394. # This will increment current_step when all foreground hooks in current step are done
  1395. while self.advance_step_if_ready():
  1396. pass # Keep advancing until we can't anymore
  1397. # if archiveresults exist but are still pending, it's not finished
  1398. if self.pending_archiveresults().exists():
  1399. return False
  1400. # Don't wait for background hooks - they'll be cleaned up on entering sealed state
  1401. # Background hooks in STARTED state are excluded by pending_archiveresults()
  1402. # (STARTED is in FINAL_OR_ACTIVE_STATES) so once all results are FINAL or ACTIVE,
  1403. # we can transition to sealed and cleanup() will kill the background hooks
  1404. # otherwise archiveresults exist and are all finished, so it's finished
  1405. return True
  1406. def retry_failed_archiveresults(self, retry_at: Optional['timezone.datetime'] = None) -> int:
  1407. """
  1408. Reset failed/skipped ArchiveResults to queued for retry.
  1409. This enables seamless retry of the entire extraction pipeline:
  1410. - Resets FAILED and SKIPPED results to QUEUED
  1411. - Sets retry_at so workers pick them up
  1412. - Plugins run in order (numeric prefix)
  1413. - Each plugin checks its dependencies at runtime
  1414. Dependency handling (e.g., chrome_session → screenshot):
  1415. - Plugins check if required outputs exist before running
  1416. - If dependency output missing → plugin returns 'skipped'
  1417. - On retry, if dependency now succeeds → dependent can run
  1418. Returns count of ArchiveResults reset.
  1419. """
  1420. retry_at = retry_at or timezone.now()
  1421. count = self.archiveresult_set.filter(
  1422. status__in=[
  1423. ArchiveResult.StatusChoices.FAILED,
  1424. ArchiveResult.StatusChoices.SKIPPED,
  1425. ]
  1426. ).update(
  1427. status=ArchiveResult.StatusChoices.QUEUED,
  1428. retry_at=retry_at,
  1429. output=None,
  1430. start_ts=None,
  1431. end_ts=None,
  1432. )
  1433. # Also reset the snapshot and current_step so it gets re-checked from the beginning
  1434. if count > 0:
  1435. self.status = self.StatusChoices.STARTED
  1436. self.retry_at = retry_at
  1437. self.current_step = 0 # Reset to step 0 for retry
  1438. self.save(update_fields=['status', 'retry_at', 'current_step', 'modified_at'])
  1439. return count
  1440. # =========================================================================
  1441. # URL Helper Properties (migrated from Link schema)
  1442. # =========================================================================
  1443. @cached_property
  1444. def url_hash(self) -> str:
  1445. from hashlib import sha256
  1446. return sha256(self.url.encode()).hexdigest()[:8]
  1447. @cached_property
  1448. def scheme(self) -> str:
  1449. return self.url.split('://')[0]
  1450. @cached_property
  1451. def path(self) -> str:
  1452. parts = self.url.split('://', 1)
  1453. return '/' + parts[1].split('/', 1)[1] if len(parts) > 1 and '/' in parts[1] else '/'
  1454. @cached_property
  1455. def basename(self) -> str:
  1456. return self.path.split('/')[-1]
  1457. @cached_property
  1458. def extension(self) -> str:
  1459. basename = self.basename
  1460. return basename.split('.')[-1] if '.' in basename else ''
  1461. @cached_property
  1462. def base_url(self) -> str:
  1463. return f'{self.scheme}://{self.domain}'
  1464. @cached_property
  1465. def is_static(self) -> bool:
  1466. static_extensions = {'.pdf', '.jpg', '.jpeg', '.png', '.gif', '.webp', '.svg', '.mp4', '.mp3', '.wav', '.webm'}
  1467. return any(self.url.lower().endswith(ext) for ext in static_extensions)
  1468. @cached_property
  1469. def is_archived(self) -> bool:
  1470. output_paths = (
  1471. self.domain,
  1472. 'output.html',
  1473. 'output.pdf',
  1474. 'screenshot.png',
  1475. 'singlefile.html',
  1476. 'readability/content.html',
  1477. 'mercury/content.html',
  1478. 'htmltotext.txt',
  1479. 'media',
  1480. 'git',
  1481. )
  1482. return any((Path(self.output_dir) / path).exists() for path in output_paths)
  1483. # =========================================================================
  1484. # Date/Time Properties (migrated from Link schema)
  1485. # =========================================================================
  1486. @cached_property
  1487. def bookmarked_date(self) -> Optional[str]:
  1488. max_ts = (timezone.now() + timedelta(days=30)).timestamp()
  1489. if self.timestamp and self.timestamp.replace('.', '').isdigit():
  1490. if 0 < float(self.timestamp) < max_ts:
  1491. return self._ts_to_date_str(datetime.fromtimestamp(float(self.timestamp)))
  1492. return str(self.timestamp)
  1493. return None
  1494. @cached_property
  1495. def downloaded_datestr(self) -> Optional[str]:
  1496. return self._ts_to_date_str(self.downloaded_at) if self.downloaded_at else None
  1497. @cached_property
  1498. def archive_dates(self) -> List[datetime]:
  1499. return [
  1500. result.start_ts
  1501. for result in self.archiveresult_set.all()
  1502. if result.start_ts
  1503. ]
  1504. @cached_property
  1505. def oldest_archive_date(self) -> Optional[datetime]:
  1506. dates = self.archive_dates
  1507. return min(dates) if dates else None
  1508. @cached_property
  1509. def newest_archive_date(self) -> Optional[datetime]:
  1510. dates = self.archive_dates
  1511. return max(dates) if dates else None
  1512. @cached_property
  1513. def num_outputs(self) -> int:
  1514. return self.archiveresult_set.filter(status='succeeded').count()
  1515. @cached_property
  1516. def num_failures(self) -> int:
  1517. return self.archiveresult_set.filter(status='failed').count()
  1518. # =========================================================================
  1519. # Output Path Methods (migrated from Link schema)
  1520. # =========================================================================
  1521. def canonical_outputs(self) -> Dict[str, Optional[str]]:
  1522. """
  1523. Intelligently discover the best output file for each plugin.
  1524. Uses actual ArchiveResult data and filesystem scanning with smart heuristics.
  1525. """
  1526. FAVICON_PROVIDER = 'https://www.google.com/s2/favicons?domain={}'
  1527. # Mimetypes that can be embedded/previewed in an iframe
  1528. IFRAME_EMBEDDABLE_EXTENSIONS = {
  1529. 'html', 'htm', 'pdf', 'txt', 'md', 'json', 'jsonl',
  1530. 'png', 'jpg', 'jpeg', 'gif', 'webp', 'svg', 'ico',
  1531. 'mp4', 'webm', 'mp3', 'opus', 'ogg', 'wav',
  1532. }
  1533. MIN_DISPLAY_SIZE = 15_000 # 15KB - filter out tiny files
  1534. MAX_SCAN_FILES = 50 # Don't scan massive directories
  1535. def find_best_output_in_dir(dir_path: Path, plugin_name: str) -> Optional[str]:
  1536. """Find the best representative file in a plugin's output directory"""
  1537. if not dir_path.exists() or not dir_path.is_dir():
  1538. return None
  1539. candidates = []
  1540. file_count = 0
  1541. # Special handling for media plugin - look for thumbnails
  1542. is_media_dir = plugin_name == 'media'
  1543. # Scan for suitable files
  1544. for file_path in dir_path.rglob('*'):
  1545. file_count += 1
  1546. if file_count > MAX_SCAN_FILES:
  1547. break
  1548. if file_path.is_dir() or file_path.name.startswith('.'):
  1549. continue
  1550. ext = file_path.suffix.lstrip('.').lower()
  1551. if ext not in IFRAME_EMBEDDABLE_EXTENSIONS:
  1552. continue
  1553. try:
  1554. size = file_path.stat().st_size
  1555. except OSError:
  1556. continue
  1557. # For media dir, allow smaller image files (thumbnails are often < 15KB)
  1558. min_size = 5_000 if (is_media_dir and ext in ('png', 'jpg', 'jpeg', 'webp', 'gif')) else MIN_DISPLAY_SIZE
  1559. if size < min_size:
  1560. continue
  1561. # Prefer main files: index.html, output.*, content.*, etc.
  1562. priority = 0
  1563. name_lower = file_path.name.lower()
  1564. if is_media_dir:
  1565. # Special prioritization for media directories
  1566. if any(keyword in name_lower for keyword in ('thumb', 'thumbnail', 'cover', 'poster')):
  1567. priority = 200 # Highest priority for thumbnails
  1568. elif ext in ('png', 'jpg', 'jpeg', 'webp', 'gif'):
  1569. priority = 150 # High priority for any image
  1570. elif ext in ('mp4', 'webm', 'mp3', 'opus', 'ogg'):
  1571. priority = 100 # Lower priority for actual media files
  1572. else:
  1573. priority = 50
  1574. elif 'index' in name_lower:
  1575. priority = 100
  1576. elif name_lower.startswith(('output', 'content', plugin_name)):
  1577. priority = 50
  1578. elif ext in ('html', 'htm', 'pdf'):
  1579. priority = 30
  1580. elif ext in ('png', 'jpg', 'jpeg', 'webp'):
  1581. priority = 20
  1582. else:
  1583. priority = 10
  1584. candidates.append((priority, size, file_path))
  1585. if not candidates:
  1586. return None
  1587. # Sort by priority (desc), then size (desc)
  1588. candidates.sort(key=lambda x: (x[0], x[1]), reverse=True)
  1589. best_file = candidates[0][2]
  1590. return str(best_file.relative_to(Path(self.output_dir)))
  1591. canonical = {
  1592. 'index_path': 'index.html',
  1593. 'google_favicon_path': FAVICON_PROVIDER.format(self.domain),
  1594. 'archive_org_path': f'https://web.archive.org/web/{self.base_url}',
  1595. }
  1596. # Scan each ArchiveResult's output directory for the best file
  1597. snap_dir = Path(self.output_dir)
  1598. for result in self.archiveresult_set.filter(status='succeeded'):
  1599. if not result.output_files and not result.output_str:
  1600. continue
  1601. # Try to find the best output file for this plugin
  1602. plugin_dir = snap_dir / result.plugin
  1603. best_output = None
  1604. # Check output_files first (new field)
  1605. if result.output_files:
  1606. first_file = next(iter(result.output_files.keys()), None)
  1607. if first_file and (plugin_dir / first_file).exists():
  1608. best_output = f'{result.plugin}/{first_file}'
  1609. # Fallback to output_str if it looks like a path
  1610. if not best_output and result.output_str and (snap_dir / result.output_str).exists():
  1611. best_output = result.output_str
  1612. if not best_output and plugin_dir.exists():
  1613. # Intelligently find the best file in the plugin's directory
  1614. best_output = find_best_output_in_dir(plugin_dir, result.plugin)
  1615. if best_output:
  1616. canonical[f'{result.plugin}_path'] = best_output
  1617. # Also scan top-level for legacy outputs (backwards compatibility)
  1618. for file_path in snap_dir.glob('*'):
  1619. if file_path.is_dir() or file_path.name in ('index.html', 'index.json'):
  1620. continue
  1621. ext = file_path.suffix.lstrip('.').lower()
  1622. if ext not in IFRAME_EMBEDDABLE_EXTENSIONS:
  1623. continue
  1624. try:
  1625. size = file_path.stat().st_size
  1626. if size >= MIN_DISPLAY_SIZE:
  1627. # Add as generic output with stem as key
  1628. key = f'{file_path.stem}_path'
  1629. if key not in canonical:
  1630. canonical[key] = file_path.name
  1631. except OSError:
  1632. continue
  1633. if self.is_static:
  1634. static_path = f'warc/{self.timestamp}'
  1635. canonical.update({
  1636. 'title': self.basename,
  1637. 'wget_path': static_path,
  1638. })
  1639. return canonical
  1640. def latest_outputs(self, status: Optional[str] = None) -> Dict[str, Any]:
  1641. """Get the latest output that each plugin produced"""
  1642. from archivebox.hooks import get_plugins
  1643. from django.db.models import Q
  1644. latest: Dict[str, Any] = {}
  1645. for plugin in get_plugins():
  1646. results = self.archiveresult_set.filter(plugin=plugin)
  1647. if status is not None:
  1648. results = results.filter(status=status)
  1649. # Filter for results with output_files or output_str
  1650. results = results.filter(Q(output_files__isnull=False) | ~Q(output_str='')).order_by('-start_ts')
  1651. result = results.first()
  1652. # Return embed_path() for backwards compatibility
  1653. latest[plugin] = result.embed_path() if result else None
  1654. return latest
  1655. # =========================================================================
  1656. # Serialization Methods
  1657. # =========================================================================
  1658. def to_dict(self, extended: bool = False) -> Dict[str, Any]:
  1659. """Convert Snapshot to a dictionary (replacement for Link._asdict())"""
  1660. from archivebox.misc.util import ts_to_date_str
  1661. result = {
  1662. 'TYPE': 'core.models.Snapshot',
  1663. 'id': str(self.id),
  1664. 'url': self.url,
  1665. 'timestamp': self.timestamp,
  1666. 'title': self.title,
  1667. 'tags': self.tags_str(),
  1668. 'downloaded_at': self.downloaded_at.isoformat() if self.downloaded_at else None,
  1669. 'bookmarked_at': self.bookmarked_at.isoformat() if self.bookmarked_at else None,
  1670. 'created_at': self.created_at.isoformat() if self.created_at else None,
  1671. # Computed properties
  1672. 'domain': self.domain,
  1673. 'scheme': self.scheme,
  1674. 'base_url': self.base_url,
  1675. 'path': self.path,
  1676. 'basename': self.basename,
  1677. 'extension': self.extension,
  1678. 'is_static': self.is_static,
  1679. 'is_archived': self.is_archived,
  1680. 'archive_path': self.archive_path,
  1681. 'output_dir': self.output_dir,
  1682. 'link_dir': self.output_dir, # backwards compatibility alias
  1683. 'archive_size': self.archive_size,
  1684. 'bookmarked_date': self.bookmarked_date,
  1685. 'downloaded_datestr': self.downloaded_datestr,
  1686. 'num_outputs': self.num_outputs,
  1687. 'num_failures': self.num_failures,
  1688. }
  1689. if extended:
  1690. result['canonical'] = self.canonical_outputs()
  1691. return result
  1692. def to_json(self, indent: int = 4) -> str:
  1693. """Convert to JSON string"""
  1694. return to_json(self.to_dict(extended=True), indent=indent)
  1695. def to_csv(self, cols: Optional[List[str]] = None, separator: str = ',', ljust: int = 0) -> str:
  1696. """Convert to CSV string"""
  1697. data = self.to_dict()
  1698. cols = cols or ['timestamp', 'is_archived', 'url']
  1699. return separator.join(to_json(data.get(col, ''), indent=None).ljust(ljust) for col in cols)
  1700. def write_json_details(self, out_dir: Optional[str] = None) -> None:
  1701. """Write JSON index file for this snapshot to its output directory"""
  1702. out_dir = out_dir or self.output_dir
  1703. path = Path(out_dir) / CONSTANTS.JSON_INDEX_FILENAME
  1704. atomic_write(str(path), self.to_dict(extended=True))
  1705. def write_html_details(self, out_dir: Optional[str] = None) -> None:
  1706. """Write HTML detail page for this snapshot to its output directory"""
  1707. from django.template.loader import render_to_string
  1708. from archivebox.config.common import SERVER_CONFIG
  1709. from archivebox.config.configset import get_config
  1710. from archivebox.misc.logging_util import printable_filesize
  1711. out_dir = out_dir or self.output_dir
  1712. config = get_config()
  1713. SAVE_ARCHIVE_DOT_ORG = config.get('SAVE_ARCHIVE_DOT_ORG', True)
  1714. TITLE_LOADING_MSG = 'Not yet archived...'
  1715. canonical = self.canonical_outputs()
  1716. context = {
  1717. **self.to_dict(extended=True),
  1718. **{f'{k}_path': v for k, v in canonical.items()},
  1719. 'canonical': {f'{k}_path': v for k, v in canonical.items()},
  1720. 'title': htmlencode(self.title or (self.base_url if self.is_archived else TITLE_LOADING_MSG)),
  1721. 'url_str': htmlencode(urldecode(self.base_url)),
  1722. 'archive_url': urlencode(f'warc/{self.timestamp}' or (self.domain if self.is_archived else '')) or 'about:blank',
  1723. 'extension': self.extension or 'html',
  1724. 'tags': self.tags_str() or 'untagged',
  1725. 'size': printable_filesize(self.archive_size) if self.archive_size else 'pending',
  1726. 'status': 'archived' if self.is_archived else 'not yet archived',
  1727. 'status_color': 'success' if self.is_archived else 'danger',
  1728. 'oldest_archive_date': ts_to_date_str(self.oldest_archive_date),
  1729. 'SAVE_ARCHIVE_DOT_ORG': SAVE_ARCHIVE_DOT_ORG,
  1730. 'PREVIEW_ORIGINALS': SERVER_CONFIG.PREVIEW_ORIGINALS,
  1731. }
  1732. rendered_html = render_to_string('snapshot.html', context)
  1733. atomic_write(str(Path(out_dir) / CONSTANTS.HTML_INDEX_FILENAME), rendered_html)
  1734. # =========================================================================
  1735. # Helper Methods
  1736. # =========================================================================
  1737. @staticmethod
  1738. def _ts_to_date_str(dt: Optional[datetime]) -> Optional[str]:
  1739. return dt.strftime('%Y-%m-%d %H:%M:%S') if dt else None
  1740. # =============================================================================
  1741. # Snapshot State Machine
  1742. # =============================================================================
  1743. class SnapshotMachine(BaseStateMachine, strict_states=True):
  1744. """
  1745. State machine for managing Snapshot lifecycle.
  1746. Hook Lifecycle:
  1747. ┌─────────────────────────────────────────────────────────────┐
  1748. │ QUEUED State │
  1749. │ • Waiting for snapshot to be ready │
  1750. └─────────────────────────────────────────────────────────────┘
  1751. ↓ tick() when can_start()
  1752. ┌─────────────────────────────────────────────────────────────┐
  1753. │ STARTED State → enter_started() │
  1754. │ 1. snapshot.run() │
  1755. │ • discover_hooks('Snapshot') → finds all plugin hooks │
  1756. │ • create_pending_archiveresults() → creates ONE │
  1757. │ ArchiveResult per hook (NO execution yet) │
  1758. │ 2. ArchiveResults process independently with their own │
  1759. │ state machines (see ArchiveResultMachine) │
  1760. │ 3. Advance through steps 0-9 as foreground hooks complete │
  1761. └─────────────────────────────────────────────────────────────┘
  1762. ↓ tick() when is_finished()
  1763. ┌─────────────────────────────────────────────────────────────┐
  1764. │ SEALED State → enter_sealed() │
  1765. │ • cleanup() → kills any background hooks still running │
  1766. │ • Set retry_at=None (no more processing) │
  1767. └─────────────────────────────────────────────────────────────┘
  1768. https://github.com/ArchiveBox/ArchiveBox/wiki/ArchiveBox-Architecture-Diagrams
  1769. """
  1770. model_attr_name = 'snapshot'
  1771. # States
  1772. queued = State(value=Snapshot.StatusChoices.QUEUED, initial=True)
  1773. started = State(value=Snapshot.StatusChoices.STARTED)
  1774. sealed = State(value=Snapshot.StatusChoices.SEALED, final=True)
  1775. # Tick Event
  1776. tick = (
  1777. queued.to.itself(unless='can_start') |
  1778. queued.to(started, cond='can_start') |
  1779. started.to.itself(unless='is_finished') |
  1780. started.to(sealed, cond='is_finished')
  1781. )
  1782. def can_start(self) -> bool:
  1783. can_start = bool(self.snapshot.url)
  1784. return can_start
  1785. def is_finished(self) -> bool:
  1786. """Check if snapshot processing is complete - delegates to model method."""
  1787. return self.snapshot.is_finished_processing()
  1788. @queued.enter
  1789. def enter_queued(self):
  1790. self.snapshot.update_and_requeue(
  1791. retry_at=timezone.now(),
  1792. status=Snapshot.StatusChoices.QUEUED,
  1793. )
  1794. @started.enter
  1795. def enter_started(self):
  1796. # lock the snapshot while we create the pending archiveresults
  1797. self.snapshot.update_and_requeue(
  1798. retry_at=timezone.now() + timedelta(seconds=30), # if failed, wait 30s before retrying
  1799. )
  1800. # Run the snapshot - creates pending archiveresults for all enabled plugins
  1801. self.snapshot.run()
  1802. # unlock the snapshot after we're done + set status = started
  1803. self.snapshot.update_and_requeue(
  1804. retry_at=timezone.now() + timedelta(seconds=5), # check again in 5s
  1805. status=Snapshot.StatusChoices.STARTED,
  1806. )
  1807. @sealed.enter
  1808. def enter_sealed(self):
  1809. # Clean up background hooks
  1810. self.snapshot.cleanup()
  1811. self.snapshot.update_and_requeue(
  1812. retry_at=None,
  1813. status=Snapshot.StatusChoices.SEALED,
  1814. )
  1815. class ArchiveResult(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHealthStats, ModelWithStateMachine):
  1816. class StatusChoices(models.TextChoices):
  1817. QUEUED = 'queued', 'Queued'
  1818. STARTED = 'started', 'Started'
  1819. BACKOFF = 'backoff', 'Waiting to retry'
  1820. SUCCEEDED = 'succeeded', 'Succeeded'
  1821. FAILED = 'failed', 'Failed'
  1822. SKIPPED = 'skipped', 'Skipped'
  1823. @classmethod
  1824. def get_plugin_choices(cls):
  1825. """Get plugin choices from discovered hooks (for forms/admin)."""
  1826. plugins = [get_plugin_name(e) for e in get_plugins()]
  1827. return tuple((e, e) for e in plugins)
  1828. # Keep AutoField for backward compatibility with 0.7.x databases
  1829. # UUID field is added separately by migration for new records
  1830. id = models.AutoField(primary_key=True, editable=False)
  1831. # Note: unique constraint is added by migration 0027 - don't set unique=True here
  1832. # or SQLite table recreation in earlier migrations will fail
  1833. uuid = models.UUIDField(default=uuid7, null=True, blank=True, db_index=True)
  1834. created_at = models.DateTimeField(default=timezone.now, db_index=True)
  1835. modified_at = models.DateTimeField(auto_now=True)
  1836. snapshot: Snapshot = models.ForeignKey(Snapshot, on_delete=models.CASCADE) # type: ignore
  1837. # No choices= constraint - plugin names come from plugin system and can be any string
  1838. plugin = models.CharField(max_length=32, blank=False, null=False, db_index=True, default='')
  1839. hook_name = models.CharField(max_length=255, blank=True, default='', db_index=True, help_text='Full filename of the hook that executed (e.g., on_Snapshot__50_wget.py)')
  1840. # Process FK - tracks execution details (cmd, pwd, stdout, stderr, etc.)
  1841. # Required - every ArchiveResult must have a Process
  1842. process = models.OneToOneField(
  1843. 'machine.Process',
  1844. on_delete=models.PROTECT,
  1845. null=False, # Required after migration 4
  1846. related_name='archiveresult',
  1847. help_text='Process execution details for this archive result'
  1848. )
  1849. # New output fields (replacing old 'output' field)
  1850. output_str = models.TextField(blank=True, default='', help_text='Human-readable output summary')
  1851. output_json = models.JSONField(null=True, blank=True, default=None, help_text='Structured metadata (headers, redirects, etc.)')
  1852. output_files = models.JSONField(default=dict, help_text='Dict of {relative_path: {metadata}}')
  1853. output_size = models.BigIntegerField(default=0, help_text='Total bytes of all output files')
  1854. output_mimetypes = models.CharField(max_length=512, blank=True, default='', help_text='CSV of mimetypes sorted by size')
  1855. start_ts = models.DateTimeField(default=None, null=True, blank=True)
  1856. end_ts = models.DateTimeField(default=None, null=True, blank=True)
  1857. status = ModelWithStateMachine.StatusField(choices=StatusChoices.choices, default=StatusChoices.QUEUED)
  1858. retry_at = ModelWithStateMachine.RetryAtField(default=timezone.now)
  1859. notes = models.TextField(blank=True, null=False, default='')
  1860. # output_dir is computed via @property from snapshot.output_dir / plugin
  1861. state_machine_name = 'archivebox.core.models.ArchiveResultMachine'
  1862. retry_at_field_name = 'retry_at'
  1863. state_field_name = 'status'
  1864. active_state = StatusChoices.STARTED
  1865. class Meta(TypedModelMeta):
  1866. app_label = 'core'
  1867. verbose_name = 'Archive Result'
  1868. verbose_name_plural = 'Archive Results Log'
  1869. def __str__(self):
  1870. return f'[{self.id}] {self.snapshot.url[:64]} -> {self.plugin}'
  1871. @property
  1872. def created_by(self):
  1873. """Convenience property to access the user who created this archive result via its snapshot's crawl."""
  1874. return self.snapshot.crawl.created_by
  1875. def to_jsonl(self) -> dict:
  1876. """
  1877. Convert ArchiveResult model instance to a JSONL record.
  1878. """
  1879. from archivebox.config import VERSION
  1880. record = {
  1881. 'type': 'ArchiveResult',
  1882. 'schema_version': VERSION,
  1883. 'id': str(self.id),
  1884. 'snapshot_id': str(self.snapshot_id),
  1885. 'plugin': self.plugin,
  1886. 'hook_name': self.hook_name,
  1887. 'status': self.status,
  1888. 'output_str': self.output_str,
  1889. 'start_ts': self.start_ts.isoformat() if self.start_ts else None,
  1890. 'end_ts': self.end_ts.isoformat() if self.end_ts else None,
  1891. }
  1892. # Include optional fields if set
  1893. if self.output_json:
  1894. record['output_json'] = self.output_json
  1895. if self.output_files:
  1896. record['output_files'] = self.output_files
  1897. if self.output_size:
  1898. record['output_size'] = self.output_size
  1899. if self.output_mimetypes:
  1900. record['output_mimetypes'] = self.output_mimetypes
  1901. if self.cmd:
  1902. record['cmd'] = self.cmd
  1903. if self.cmd_version:
  1904. record['cmd_version'] = self.cmd_version
  1905. if self.process_id:
  1906. record['process_id'] = str(self.process_id)
  1907. return record
  1908. @staticmethod
  1909. def from_jsonl(record: Dict[str, Any], overrides: Dict[str, Any] = None):
  1910. """
  1911. Create or get an ArchiveResult from a JSONL record.
  1912. Args:
  1913. record: Dict with 'snapshot_id' and 'plugin' (required)
  1914. overrides: Optional dict of field overrides
  1915. Returns:
  1916. ArchiveResult instance or None if invalid
  1917. """
  1918. from django.utils import timezone
  1919. overrides = overrides or {}
  1920. # Check if already exists by ID
  1921. ar_id = record.get('id')
  1922. if ar_id:
  1923. try:
  1924. return ArchiveResult.objects.get(id=ar_id)
  1925. except ArchiveResult.DoesNotExist:
  1926. pass
  1927. snapshot_id = record.get('snapshot_id')
  1928. plugin = record.get('plugin')
  1929. hook_name = record.get('hook_name', '')
  1930. if not snapshot_id or not plugin:
  1931. return None
  1932. # Get the snapshot
  1933. try:
  1934. snapshot = Snapshot.objects.get(id=snapshot_id)
  1935. except Snapshot.DoesNotExist:
  1936. return None
  1937. # Create or get the archive result
  1938. ar, created = ArchiveResult.objects.get_or_create(
  1939. snapshot=snapshot,
  1940. plugin=plugin,
  1941. hook_name=hook_name,
  1942. defaults={
  1943. 'status': ArchiveResult.INITIAL_STATE,
  1944. 'retry_at': timezone.now(),
  1945. **overrides,
  1946. },
  1947. )
  1948. return ar
  1949. # Aliases
  1950. from_json = from_jsonl
  1951. to_json = to_jsonl
  1952. def save(self, *args, **kwargs):
  1953. is_new = self._state.adding
  1954. # Create Process record if this is a new ArchiveResult and no process exists yet
  1955. if is_new and not self.process_id:
  1956. from archivebox.machine.models import Process, Machine
  1957. process = Process.objects.create(
  1958. machine=Machine.current(),
  1959. pwd=str(Path(self.snapshot.output_dir) / self.plugin),
  1960. cmd=[], # Will be set by run()
  1961. status='queued',
  1962. timeout=120,
  1963. env={},
  1964. )
  1965. self.process = process
  1966. # Skip ModelWithOutputDir.save() to avoid creating index.json in plugin directories
  1967. # Call the Django Model.save() directly instead
  1968. models.Model.save(self, *args, **kwargs)
  1969. if is_new:
  1970. from archivebox.misc.logging_util import log_worker_event
  1971. log_worker_event(
  1972. worker_type='DB',
  1973. event='Created ArchiveResult',
  1974. indent_level=3,
  1975. plugin=self.plugin,
  1976. metadata={
  1977. 'id': str(self.id),
  1978. 'snapshot_id': str(self.snapshot_id),
  1979. 'snapshot_url': str(self.snapshot.url)[:64],
  1980. 'status': self.status,
  1981. },
  1982. )
  1983. @cached_property
  1984. def snapshot_dir(self):
  1985. return Path(self.snapshot.output_dir)
  1986. @cached_property
  1987. def url(self):
  1988. return self.snapshot.url
  1989. @property
  1990. def api_url(self) -> str:
  1991. return reverse_lazy('api-1:get_archiveresult', args=[self.id])
  1992. def get_absolute_url(self):
  1993. return f'/{self.snapshot.archive_path}/{self.plugin}'
  1994. @property
  1995. def plugin_module(self) -> Any | None:
  1996. # Hook scripts are now used instead of Python plugin modules
  1997. # The plugin name maps to hooks in archivebox/plugins/{plugin}/
  1998. return None
  1999. def output_exists(self) -> bool:
  2000. return os.path.exists(Path(self.snapshot_dir) / self.plugin)
  2001. def embed_path(self) -> Optional[str]:
  2002. """
  2003. Get the relative path to the embeddable output file for this result.
  2004. Returns the first file from output_files if set, otherwise tries to
  2005. find a reasonable default based on the plugin type.
  2006. """
  2007. # Check output_files dict for primary output
  2008. if self.output_files:
  2009. # Return first file from output_files (dict preserves insertion order)
  2010. first_file = next(iter(self.output_files.keys()), None)
  2011. if first_file:
  2012. return f'{self.plugin}/{first_file}'
  2013. # Fallback: check output_str if it looks like a file path
  2014. if self.output_str and ('/' in self.output_str or '.' in self.output_str):
  2015. return self.output_str
  2016. # Try to find output file based on plugin's canonical output path
  2017. canonical = self.snapshot.canonical_outputs()
  2018. plugin_key = f'{self.plugin}_path'
  2019. if plugin_key in canonical:
  2020. return canonical[plugin_key]
  2021. # Fallback to plugin directory
  2022. return f'{self.plugin}/'
  2023. def create_output_dir(self):
  2024. output_dir = Path(self.snapshot_dir) / self.plugin
  2025. output_dir.mkdir(parents=True, exist_ok=True)
  2026. return output_dir
  2027. @property
  2028. def output_dir_name(self) -> str:
  2029. return self.plugin
  2030. @property
  2031. def output_dir_parent(self) -> str:
  2032. return str(self.snapshot.OUTPUT_DIR.relative_to(CONSTANTS.DATA_DIR))
  2033. # Properties that delegate to Process model (for backwards compatibility)
  2034. # These properties will replace the direct fields after migration is complete
  2035. # They allow existing code to continue using archiveresult.pwd, .cmd, etc.
  2036. # Note: After migration 3 creates Process records and migration 5 removes the old fields,
  2037. # these properties provide seamless access to Process data through ArchiveResult
  2038. # Uncommented after migration 3 completed - properties now active
  2039. @property
  2040. def pwd(self) -> str:
  2041. """Working directory (from Process)."""
  2042. return self.process.pwd if self.process_id else ''
  2043. @property
  2044. def cmd(self) -> list:
  2045. """Command array (from Process)."""
  2046. return self.process.cmd if self.process_id else []
  2047. @property
  2048. def cmd_version(self) -> str:
  2049. """Command version (from Process.binary)."""
  2050. return self.process.cmd_version if self.process_id else ''
  2051. @property
  2052. def binary(self):
  2053. """Binary FK (from Process)."""
  2054. return self.process.binary if self.process_id else None
  2055. @property
  2056. def iface(self):
  2057. """Network interface FK (from Process)."""
  2058. return self.process.iface if self.process_id else None
  2059. @property
  2060. def machine(self):
  2061. """Machine FK (from Process)."""
  2062. return self.process.machine if self.process_id else None
  2063. @property
  2064. def timeout(self) -> int:
  2065. """Timeout in seconds (from Process)."""
  2066. return self.process.timeout if self.process_id else 120
  2067. def save_search_index(self):
  2068. pass
  2069. def cascade_health_update(self, success: bool):
  2070. """Update health stats for self, parent Snapshot, and grandparent Crawl."""
  2071. self.increment_health_stats(success)
  2072. self.snapshot.increment_health_stats(success)
  2073. self.snapshot.crawl.increment_health_stats(success)
  2074. def run(self):
  2075. """
  2076. Execute this ArchiveResult's hook and update status.
  2077. If self.hook_name is set, runs only that specific hook.
  2078. If self.hook_name is empty, discovers and runs all hooks for self.plugin (backwards compat).
  2079. Updates status/output fields, queues discovered URLs, and triggers indexing.
  2080. """
  2081. from django.utils import timezone
  2082. from archivebox.hooks import BUILTIN_PLUGINS_DIR, USER_PLUGINS_DIR, run_hook, is_background_hook
  2083. from archivebox.config.configset import get_config
  2084. # Get merged config with proper context
  2085. config = get_config(
  2086. crawl=self.snapshot.crawl,
  2087. snapshot=self.snapshot,
  2088. )
  2089. # Determine which hook(s) to run
  2090. hooks = []
  2091. if self.hook_name:
  2092. # SPECIFIC HOOK MODE: Find the specific hook by name
  2093. for base_dir in (BUILTIN_PLUGINS_DIR, USER_PLUGINS_DIR):
  2094. if not base_dir.exists():
  2095. continue
  2096. plugin_dir = base_dir / self.plugin
  2097. if plugin_dir.exists():
  2098. hook_path = plugin_dir / self.hook_name
  2099. if hook_path.exists():
  2100. hooks.append(hook_path)
  2101. break
  2102. else:
  2103. # LEGACY MODE: Discover all hooks for this plugin (backwards compatibility)
  2104. for base_dir in (BUILTIN_PLUGINS_DIR, USER_PLUGINS_DIR):
  2105. if not base_dir.exists():
  2106. continue
  2107. plugin_dir = base_dir / self.plugin
  2108. if plugin_dir.exists():
  2109. matches = list(plugin_dir.glob('on_Snapshot__*.*'))
  2110. if matches:
  2111. hooks.extend(sorted(matches))
  2112. if not hooks:
  2113. self.status = self.StatusChoices.FAILED
  2114. if self.hook_name:
  2115. self.output_str = f'Hook not found: {self.plugin}/{self.hook_name}'
  2116. else:
  2117. self.output_str = f'No hooks found for plugin: {self.plugin}'
  2118. self.retry_at = None
  2119. self.save()
  2120. return
  2121. # Output directory is plugin_dir for the hook output
  2122. plugin_dir = Path(self.snapshot.output_dir) / self.plugin
  2123. start_ts = timezone.now()
  2124. is_bg_hook = False
  2125. for hook in hooks:
  2126. # Check if this is a background hook
  2127. is_bg_hook = is_background_hook(hook.name)
  2128. result = run_hook(
  2129. hook,
  2130. output_dir=plugin_dir,
  2131. config=config,
  2132. url=self.snapshot.url,
  2133. snapshot_id=str(self.snapshot.id),
  2134. crawl_id=str(self.snapshot.crawl.id),
  2135. depth=self.snapshot.depth,
  2136. )
  2137. # Background hooks return None
  2138. if result is None:
  2139. is_bg_hook = True
  2140. # Update status based on hook execution
  2141. if is_bg_hook:
  2142. # BACKGROUND HOOK - still running, return immediately
  2143. # Status stays STARTED, will be finalized by Snapshot.cleanup()
  2144. self.status = self.StatusChoices.STARTED
  2145. self.start_ts = start_ts
  2146. if self.process_id:
  2147. self.process.pwd = str(plugin_dir)
  2148. self.process.save()
  2149. self.save()
  2150. return
  2151. # FOREGROUND HOOK - completed, update from filesystem
  2152. self.start_ts = start_ts
  2153. if self.process_id:
  2154. self.process.pwd = str(plugin_dir)
  2155. self.process.save()
  2156. self.update_from_output()
  2157. # Clean up empty output directory if no files were created
  2158. if plugin_dir.exists() and not self.output_files:
  2159. try:
  2160. if not any(plugin_dir.iterdir()):
  2161. plugin_dir.rmdir()
  2162. except (OSError, RuntimeError):
  2163. pass
  2164. def update_from_output(self):
  2165. """
  2166. Update this ArchiveResult from filesystem logs and output files.
  2167. Used for:
  2168. - Foreground hooks that completed (called from ArchiveResult.run())
  2169. - Background hooks that completed (called from Snapshot.cleanup())
  2170. Updates:
  2171. - status, output_str, output_json from ArchiveResult JSONL record
  2172. - output_files, output_size, output_mimetypes by walking filesystem
  2173. - end_ts, retry_at, cmd, cmd_version, binary FK
  2174. - Processes side-effect records (Snapshot, Tag, etc.) via process_hook_records()
  2175. """
  2176. import json
  2177. import mimetypes
  2178. from collections import defaultdict
  2179. from pathlib import Path
  2180. from django.utils import timezone
  2181. from archivebox.hooks import process_hook_records
  2182. plugin_dir = Path(self.pwd) if self.pwd else None
  2183. if not plugin_dir or not plugin_dir.exists():
  2184. self.status = self.StatusChoices.FAILED
  2185. self.output_str = 'Output directory not found'
  2186. self.end_ts = timezone.now()
  2187. self.retry_at = None
  2188. self.save()
  2189. return
  2190. # Read and parse JSONL output from stdout.log
  2191. stdout_file = plugin_dir / 'stdout.log'
  2192. stdout = stdout_file.read_text() if stdout_file.exists() else ''
  2193. records = []
  2194. for line in stdout.splitlines():
  2195. if line.strip() and line.strip().startswith('{'):
  2196. try:
  2197. records.append(json.loads(line))
  2198. except json.JSONDecodeError:
  2199. continue
  2200. # Find ArchiveResult record and update status/output from it
  2201. ar_records = [r for r in records if r.get('type') == 'ArchiveResult']
  2202. if ar_records:
  2203. hook_data = ar_records[0]
  2204. # Update status
  2205. status_map = {
  2206. 'succeeded': self.StatusChoices.SUCCEEDED,
  2207. 'failed': self.StatusChoices.FAILED,
  2208. 'skipped': self.StatusChoices.SKIPPED,
  2209. }
  2210. self.status = status_map.get(hook_data.get('status', 'failed'), self.StatusChoices.FAILED)
  2211. # Update output fields
  2212. self.output_str = hook_data.get('output_str') or hook_data.get('output') or ''
  2213. self.output_json = hook_data.get('output_json')
  2214. # Update cmd fields
  2215. if hook_data.get('cmd'):
  2216. if self.process_id:
  2217. self.process.cmd = hook_data['cmd']
  2218. self.process.save()
  2219. self._set_binary_from_cmd(hook_data['cmd'])
  2220. # Note: cmd_version is derived from binary.version, not stored on Process
  2221. else:
  2222. # No ArchiveResult record = failed
  2223. self.status = self.StatusChoices.FAILED
  2224. self.output_str = 'Hook did not output ArchiveResult record'
  2225. # Walk filesystem and populate output_files, output_size, output_mimetypes
  2226. exclude_names = {'stdout.log', 'stderr.log', 'hook.pid', 'listener.pid'}
  2227. mime_sizes = defaultdict(int)
  2228. total_size = 0
  2229. output_files = {}
  2230. for file_path in plugin_dir.rglob('*'):
  2231. if not file_path.is_file():
  2232. continue
  2233. if file_path.name in exclude_names:
  2234. continue
  2235. try:
  2236. stat = file_path.stat()
  2237. mime_type, _ = mimetypes.guess_type(str(file_path))
  2238. mime_type = mime_type or 'application/octet-stream'
  2239. relative_path = str(file_path.relative_to(plugin_dir))
  2240. output_files[relative_path] = {}
  2241. mime_sizes[mime_type] += stat.st_size
  2242. total_size += stat.st_size
  2243. except (OSError, IOError):
  2244. continue
  2245. self.output_files = output_files
  2246. self.output_size = total_size
  2247. sorted_mimes = sorted(mime_sizes.items(), key=lambda x: x[1], reverse=True)
  2248. self.output_mimetypes = ','.join(mime for mime, _ in sorted_mimes)
  2249. # Update timestamps
  2250. self.end_ts = timezone.now()
  2251. self.retry_at = None
  2252. self.save()
  2253. # Process side-effect records (filter Snapshots for depth/URL)
  2254. filtered_records = []
  2255. for record in records:
  2256. record_type = record.get('type')
  2257. # Skip ArchiveResult records (already processed above)
  2258. if record_type == 'ArchiveResult':
  2259. continue
  2260. # Filter Snapshot records for depth/URL constraints
  2261. if record_type == 'Snapshot':
  2262. url = record.get('url')
  2263. if not url:
  2264. continue
  2265. depth = record.get('depth', self.snapshot.depth + 1)
  2266. if depth > self.snapshot.crawl.max_depth:
  2267. continue
  2268. if not self._url_passes_filters(url):
  2269. continue
  2270. filtered_records.append(record)
  2271. # Process filtered records with unified dispatcher
  2272. overrides = {
  2273. 'snapshot': self.snapshot,
  2274. 'crawl': self.snapshot.crawl,
  2275. 'created_by_id': self.created_by.pk,
  2276. }
  2277. process_hook_records(filtered_records, overrides=overrides)
  2278. # Cleanup PID files and empty logs
  2279. pid_file = plugin_dir / 'hook.pid'
  2280. pid_file.unlink(missing_ok=True)
  2281. stderr_file = plugin_dir / 'stderr.log'
  2282. if stdout_file.exists() and stdout_file.stat().st_size == 0:
  2283. stdout_file.unlink()
  2284. if stderr_file.exists() and stderr_file.stat().st_size == 0:
  2285. stderr_file.unlink()
  2286. def _set_binary_from_cmd(self, cmd: list) -> None:
  2287. """
  2288. Find Binary for command and set binary FK.
  2289. Tries matching by absolute path first, then by binary name.
  2290. Only matches binaries on the current machine.
  2291. """
  2292. if not cmd:
  2293. return
  2294. from archivebox.machine.models import Machine
  2295. bin_path_or_name = cmd[0] if isinstance(cmd, list) else cmd
  2296. machine = Machine.current()
  2297. # Try matching by absolute path first
  2298. binary = Binary.objects.filter(
  2299. abspath=bin_path_or_name,
  2300. machine=machine
  2301. ).first()
  2302. if binary:
  2303. if self.process_id:
  2304. self.process.binary = binary
  2305. self.process.save()
  2306. return
  2307. # Fallback: match by binary name
  2308. bin_name = Path(bin_path_or_name).name
  2309. binary = Binary.objects.filter(
  2310. name=bin_name,
  2311. machine=machine
  2312. ).first()
  2313. if binary:
  2314. if self.process_id:
  2315. self.process.binary = binary
  2316. self.process.save()
  2317. def _url_passes_filters(self, url: str) -> bool:
  2318. """Check if URL passes URL_ALLOWLIST and URL_DENYLIST config filters.
  2319. Uses proper config hierarchy: defaults -> file -> env -> machine -> user -> crawl -> snapshot
  2320. """
  2321. import re
  2322. from archivebox.config.configset import get_config
  2323. # Get merged config with proper hierarchy
  2324. config = get_config(
  2325. user=self.created_by,
  2326. crawl=self.snapshot.crawl,
  2327. snapshot=self.snapshot,
  2328. )
  2329. # Get allowlist/denylist (can be string or list)
  2330. allowlist_raw = config.get('URL_ALLOWLIST', '')
  2331. denylist_raw = config.get('URL_DENYLIST', '')
  2332. # Normalize to list of patterns
  2333. def to_pattern_list(value):
  2334. if isinstance(value, list):
  2335. return value
  2336. if isinstance(value, str):
  2337. return [p.strip() for p in value.split(',') if p.strip()]
  2338. return []
  2339. allowlist = to_pattern_list(allowlist_raw)
  2340. denylist = to_pattern_list(denylist_raw)
  2341. # Denylist takes precedence
  2342. if denylist:
  2343. for pattern in denylist:
  2344. try:
  2345. if re.search(pattern, url):
  2346. return False
  2347. except re.error:
  2348. continue # Skip invalid regex patterns
  2349. # If allowlist exists, URL must match at least one pattern
  2350. if allowlist:
  2351. for pattern in allowlist:
  2352. try:
  2353. if re.search(pattern, url):
  2354. return True
  2355. except re.error:
  2356. continue # Skip invalid regex patterns
  2357. return False # No allowlist patterns matched
  2358. return True # No filters or passed filters
  2359. @property
  2360. def output_dir(self) -> Path:
  2361. """Get the output directory for this plugin's results."""
  2362. return Path(self.snapshot.output_dir) / self.plugin
  2363. def is_background_hook(self) -> bool:
  2364. """Check if this ArchiveResult is for a background hook."""
  2365. plugin_dir = Path(self.pwd) if self.pwd else None
  2366. if not plugin_dir:
  2367. return False
  2368. pid_file = plugin_dir / 'hook.pid'
  2369. return pid_file.exists()
  2370. # =============================================================================
  2371. # ArchiveResult State Machine
  2372. # =============================================================================
  2373. class ArchiveResultMachine(BaseStateMachine, strict_states=True):
  2374. """
  2375. State machine for managing ArchiveResult (single plugin execution) lifecycle.
  2376. Hook Lifecycle:
  2377. ┌─────────────────────────────────────────────────────────────┐
  2378. │ QUEUED State │
  2379. │ • Waiting for its turn to run │
  2380. └─────────────────────────────────────────────────────────────┘
  2381. ↓ tick() when can_start()
  2382. ┌─────────────────────────────────────────────────────────────┐
  2383. │ STARTED State → enter_started() │
  2384. │ 1. archiveresult.run() │
  2385. │ • Find specific hook by hook_name │
  2386. │ • run_hook(script, output_dir, ...) → subprocess │
  2387. │ │
  2388. │ 2a. FOREGROUND hook (returns HookResult): │
  2389. │ • update_from_output() immediately │
  2390. │ - Read stdout.log │
  2391. │ - Parse JSONL records │
  2392. │ - Extract 'ArchiveResult' record → update status │
  2393. │ - Walk output_dir → populate output_files │
  2394. │ - Call process_hook_records() for side effects │
  2395. │ │
  2396. │ 2b. BACKGROUND hook (returns None): │
  2397. │ • Status stays STARTED │
  2398. │ • Continues running in background │
  2399. │ • Killed by Snapshot.cleanup() when sealed │
  2400. └─────────────────────────────────────────────────────────────┘
  2401. ↓ tick() checks status
  2402. ┌─────────────────────────────────────────────────────────────┐
  2403. │ SUCCEEDED / FAILED / SKIPPED / BACKOFF │
  2404. │ • Set by hook's JSONL output during update_from_output() │
  2405. │ • Health stats incremented (num_uses_succeeded/failed) │
  2406. │ • Parent Snapshot health stats also updated │
  2407. └─────────────────────────────────────────────────────────────┘
  2408. https://github.com/ArchiveBox/ArchiveBox/wiki/ArchiveBox-Architecture-Diagrams
  2409. """
  2410. model_attr_name = 'archiveresult'
  2411. # States
  2412. queued = State(value=ArchiveResult.StatusChoices.QUEUED, initial=True)
  2413. started = State(value=ArchiveResult.StatusChoices.STARTED)
  2414. backoff = State(value=ArchiveResult.StatusChoices.BACKOFF)
  2415. succeeded = State(value=ArchiveResult.StatusChoices.SUCCEEDED, final=True)
  2416. failed = State(value=ArchiveResult.StatusChoices.FAILED, final=True)
  2417. skipped = State(value=ArchiveResult.StatusChoices.SKIPPED, final=True)
  2418. # Tick Event - transitions based on conditions
  2419. tick = (
  2420. queued.to.itself(unless='can_start') |
  2421. queued.to(started, cond='can_start') |
  2422. started.to.itself(unless='is_finished') |
  2423. started.to(succeeded, cond='is_succeeded') |
  2424. started.to(failed, cond='is_failed') |
  2425. started.to(skipped, cond='is_skipped') |
  2426. started.to(backoff, cond='is_backoff') |
  2427. backoff.to.itself(unless='can_start') |
  2428. backoff.to(started, cond='can_start') |
  2429. backoff.to(succeeded, cond='is_succeeded') |
  2430. backoff.to(failed, cond='is_failed') |
  2431. backoff.to(skipped, cond='is_skipped')
  2432. )
  2433. def can_start(self) -> bool:
  2434. if not self.archiveresult.snapshot.url:
  2435. return False
  2436. # Check if snapshot has exceeded MAX_URL_ATTEMPTS failed results
  2437. from archivebox.config.configset import get_config
  2438. config = get_config(
  2439. crawl=self.archiveresult.snapshot.crawl,
  2440. snapshot=self.archiveresult.snapshot,
  2441. )
  2442. max_attempts = config.get('MAX_URL_ATTEMPTS', 50)
  2443. # Count failed ArchiveResults for this snapshot (any plugin type)
  2444. failed_count = self.archiveresult.snapshot.archiveresult_set.filter(
  2445. status=ArchiveResult.StatusChoices.FAILED
  2446. ).count()
  2447. if failed_count >= max_attempts:
  2448. # Mark this result as skipped since we've hit the limit
  2449. self.archiveresult.status = ArchiveResult.StatusChoices.SKIPPED
  2450. self.archiveresult.output_str = f'Skipped: snapshot exceeded MAX_URL_ATTEMPTS ({max_attempts} failures)'
  2451. self.archiveresult.retry_at = None
  2452. self.archiveresult.save()
  2453. return False
  2454. return True
  2455. def is_succeeded(self) -> bool:
  2456. """Check if extractor plugin succeeded (status was set by run())."""
  2457. return self.archiveresult.status == ArchiveResult.StatusChoices.SUCCEEDED
  2458. def is_failed(self) -> bool:
  2459. """Check if extractor plugin failed (status was set by run())."""
  2460. return self.archiveresult.status == ArchiveResult.StatusChoices.FAILED
  2461. def is_skipped(self) -> bool:
  2462. """Check if extractor plugin was skipped (status was set by run())."""
  2463. return self.archiveresult.status == ArchiveResult.StatusChoices.SKIPPED
  2464. def is_backoff(self) -> bool:
  2465. """Check if we should backoff and retry later."""
  2466. # Backoff if status is still started (plugin didn't complete) and output_str is empty
  2467. return (
  2468. self.archiveresult.status == ArchiveResult.StatusChoices.STARTED and
  2469. not self.archiveresult.output_str
  2470. )
  2471. def is_finished(self) -> bool:
  2472. """Check if extraction has completed (success, failure, or skipped)."""
  2473. return self.archiveresult.status in (
  2474. ArchiveResult.StatusChoices.SUCCEEDED,
  2475. ArchiveResult.StatusChoices.FAILED,
  2476. ArchiveResult.StatusChoices.SKIPPED,
  2477. )
  2478. @queued.enter
  2479. def enter_queued(self):
  2480. self.archiveresult.update_and_requeue(
  2481. retry_at=timezone.now(),
  2482. status=ArchiveResult.StatusChoices.QUEUED,
  2483. start_ts=None,
  2484. ) # bump the snapshot's retry_at so they pickup any new changes
  2485. @started.enter
  2486. def enter_started(self):
  2487. # Update Process with network interface (optional - fails gracefully)
  2488. if self.archiveresult.process_id:
  2489. try:
  2490. from archivebox.machine.models import NetworkInterface
  2491. self.archiveresult.process.iface = NetworkInterface.current()
  2492. self.archiveresult.process.save()
  2493. except Exception:
  2494. pass # Network interface detection is optional
  2495. # Lock the object and mark start time
  2496. self.archiveresult.update_and_requeue(
  2497. retry_at=timezone.now() + timedelta(seconds=120), # 2 min timeout for plugin
  2498. status=ArchiveResult.StatusChoices.STARTED,
  2499. start_ts=timezone.now(),
  2500. )
  2501. # Run the plugin - this updates status, output, timestamps, etc.
  2502. self.archiveresult.run()
  2503. # Save the updated result
  2504. self.archiveresult.save()
  2505. @backoff.enter
  2506. def enter_backoff(self):
  2507. self.archiveresult.update_and_requeue(
  2508. retry_at=timezone.now() + timedelta(seconds=60),
  2509. status=ArchiveResult.StatusChoices.BACKOFF,
  2510. end_ts=None,
  2511. )
  2512. @succeeded.enter
  2513. def enter_succeeded(self):
  2514. self.archiveresult.update_and_requeue(
  2515. retry_at=None,
  2516. status=ArchiveResult.StatusChoices.SUCCEEDED,
  2517. end_ts=timezone.now(),
  2518. )
  2519. # Update health stats for ArchiveResult, Snapshot, and Crawl cascade
  2520. self.archiveresult.cascade_health_update(success=True)
  2521. @failed.enter
  2522. def enter_failed(self):
  2523. self.archiveresult.update_and_requeue(
  2524. retry_at=None,
  2525. status=ArchiveResult.StatusChoices.FAILED,
  2526. end_ts=timezone.now(),
  2527. )
  2528. # Update health stats for ArchiveResult, Snapshot, and Crawl cascade
  2529. self.archiveresult.cascade_health_update(success=False)
  2530. @skipped.enter
  2531. def enter_skipped(self):
  2532. self.archiveresult.update_and_requeue(
  2533. retry_at=None,
  2534. status=ArchiveResult.StatusChoices.SKIPPED,
  2535. end_ts=timezone.now(),
  2536. )
  2537. def after_transition(self, event: str, source: State, target: State):
  2538. self.archiveresult.snapshot.update_and_requeue() # bump snapshot retry time so it picks up all the new changes
  2539. # =============================================================================
  2540. # State Machine Registration
  2541. # =============================================================================
  2542. # Manually register state machines with python-statemachine registry
  2543. # (normally auto-discovered from statemachines.py, but we define them here for clarity)
  2544. registry.register(SnapshotMachine)
  2545. registry.register(ArchiveResultMachine)