| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809281028112812281328142815281628172818281928202821282228232824282528262827282828292830283128322833283428352836283728382839284028412842284328442845284628472848284928502851285228532854285528562857285828592860286128622863286428652866286728682869287028712872287328742875287628772878287928802881288228832884288528862887288828892890289128922893289428952896289728982899290029012902290329042905290629072908290929102911291229132914291529162917291829192920292129222923292429252926292729282929293029312932293329342935293629372938293929402941294229432944294529462947294829492950295129522953295429552956295729582959296029612962296329642965296629672968296929702971297229732974297529762977297829792980298129822983298429852986298729882989299029912992299329942995299629972998299930003001300230033004300530063007300830093010301130123013301430153016301730183019302030213022302330243025 |
- __package__ = 'archivebox.core'
- from typing import Optional, Dict, Iterable, Any, List, TYPE_CHECKING
- from archivebox.uuid_compat import uuid7
- from datetime import datetime, timedelta
- from django_stubs_ext.db.models import TypedModelMeta
- import os
- import json
- from pathlib import Path
- from statemachine import State, registry
- from django.db import models
- from django.db.models import QuerySet, Value, Case, When, IntegerField
- from django.utils.functional import cached_property
- from django.utils.text import slugify
- from django.utils import timezone
- from django.core.cache import cache
- from django.urls import reverse, reverse_lazy
- from django.contrib import admin
- from django.conf import settings
- from archivebox.config import CONSTANTS
- from archivebox.misc.system import get_dir_size, atomic_write
- from archivebox.misc.util import parse_date, base_url, domain as url_domain, to_json, ts_to_date_str, urlencode, htmlencode, urldecode
- from archivebox.misc.hashing import get_dir_info
- from archivebox.hooks import (
- get_plugins, get_plugin_name, get_plugin_icon,
- )
- from archivebox.base_models.models import (
- ModelWithUUID, ModelWithSerializers, ModelWithOutputDir,
- ModelWithConfig, ModelWithNotes, ModelWithHealthStats,
- get_or_create_system_user_pk,
- )
- from archivebox.workers.models import ModelWithStateMachine, BaseStateMachine
- from archivebox.workers.tasks import bg_archive_snapshot
- from archivebox.crawls.models import Crawl
- from archivebox.machine.models import NetworkInterface, Binary
- class Tag(ModelWithSerializers):
- # Keep AutoField for compatibility with main branch migrations
- # Don't use UUIDField here - requires complex FK transformation
- id = models.AutoField(primary_key=True, serialize=False, verbose_name='ID')
- created_by = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE, default=get_or_create_system_user_pk, null=True, related_name='tag_set')
- created_at = models.DateTimeField(default=timezone.now, db_index=True, null=True)
- modified_at = models.DateTimeField(auto_now=True)
- name = models.CharField(unique=True, blank=False, max_length=100)
- slug = models.SlugField(unique=True, blank=False, max_length=100, editable=False)
- snapshot_set: models.Manager['Snapshot']
- class Meta(TypedModelMeta):
- app_label = 'core'
- verbose_name = "Tag"
- verbose_name_plural = "Tags"
- def __str__(self):
- return self.name
- def save(self, *args, **kwargs):
- is_new = self._state.adding
- if is_new:
- self.slug = slugify(self.name)
- existing = set(Tag.objects.filter(slug__startswith=self.slug).values_list("slug", flat=True))
- i = None
- while True:
- slug = f"{slugify(self.name)}_{i}" if i else slugify(self.name)
- if slug not in existing:
- self.slug = slug
- break
- i = (i or 0) + 1
- super().save(*args, **kwargs)
- if is_new:
- from archivebox.misc.logging_util import log_worker_event
- log_worker_event(
- worker_type='DB',
- event='Created Tag',
- indent_level=0,
- metadata={
- 'id': self.id,
- 'name': self.name,
- 'slug': self.slug,
- },
- )
- @property
- def api_url(self) -> str:
- return reverse_lazy('api-1:get_tag', args=[self.id])
- def to_jsonl(self) -> dict:
- """
- Convert Tag model instance to a JSONL record.
- """
- from archivebox.config import VERSION
- return {
- 'type': 'Tag',
- 'schema_version': VERSION,
- 'id': str(self.id),
- 'name': self.name,
- 'slug': self.slug,
- }
- @staticmethod
- def from_jsonl(record: Dict[str, Any], overrides: Dict[str, Any] = None):
- """
- Create/update Tag from JSONL record.
- Args:
- record: JSONL record with 'name' field
- overrides: Optional dict with 'snapshot' to auto-attach tag
- Returns:
- Tag instance or None
- """
- name = record.get('name')
- if not name:
- return None
- tag, _ = Tag.objects.get_or_create(name=name)
- # Auto-attach to snapshot if in overrides
- if overrides and 'snapshot' in overrides and tag:
- overrides['snapshot'].tags.add(tag)
- return tag
- class SnapshotTag(models.Model):
- id = models.AutoField(primary_key=True)
- snapshot = models.ForeignKey('Snapshot', db_column='snapshot_id', on_delete=models.CASCADE, to_field='id')
- tag = models.ForeignKey(Tag, db_column='tag_id', on_delete=models.CASCADE, to_field='id')
- class Meta:
- app_label = 'core'
- db_table = 'core_snapshot_tags'
- unique_together = [('snapshot', 'tag')]
- class SnapshotQuerySet(models.QuerySet):
- """Custom QuerySet for Snapshot model with export methods that persist through .filter() etc."""
- # =========================================================================
- # Filtering Methods
- # =========================================================================
- FILTER_TYPES = {
- 'exact': lambda pattern: models.Q(url=pattern),
- 'substring': lambda pattern: models.Q(url__icontains=pattern),
- 'regex': lambda pattern: models.Q(url__iregex=pattern),
- 'domain': lambda pattern: models.Q(url__istartswith=f"http://{pattern}") | models.Q(url__istartswith=f"https://{pattern}") | models.Q(url__istartswith=f"ftp://{pattern}"),
- 'tag': lambda pattern: models.Q(tags__name=pattern),
- 'timestamp': lambda pattern: models.Q(timestamp=pattern),
- }
- def filter_by_patterns(self, patterns: List[str], filter_type: str = 'exact') -> 'SnapshotQuerySet':
- """Filter snapshots by URL patterns using specified filter type"""
- from archivebox.misc.logging import stderr
- q_filter = models.Q()
- for pattern in patterns:
- try:
- q_filter = q_filter | self.FILTER_TYPES[filter_type](pattern)
- except KeyError:
- stderr()
- stderr(f'[X] Got invalid pattern for --filter-type={filter_type}:', color='red')
- stderr(f' {pattern}')
- raise SystemExit(2)
- return self.filter(q_filter)
- def search(self, patterns: List[str]) -> 'SnapshotQuerySet':
- """Search snapshots using the configured search backend"""
- from archivebox.config.common import SEARCH_BACKEND_CONFIG
- from archivebox.search import query_search_index
- from archivebox.misc.logging import stderr
- if not SEARCH_BACKEND_CONFIG.USE_SEARCHING_BACKEND:
- stderr()
- stderr('[X] The search backend is not enabled, set config.USE_SEARCHING_BACKEND = True', color='red')
- raise SystemExit(2)
- qsearch = self.none()
- for pattern in patterns:
- try:
- qsearch |= query_search_index(pattern)
- except:
- raise SystemExit(2)
- return self.all() & qsearch
- # =========================================================================
- # Export Methods
- # =========================================================================
- def to_json(self, with_headers: bool = False) -> str:
- """Generate JSON index from snapshots"""
- import sys
- from datetime import datetime, timezone as tz
- from archivebox.config import VERSION
- from archivebox.config.common import SERVER_CONFIG
- MAIN_INDEX_HEADER = {
- 'info': 'This is an index of site data archived by ArchiveBox: The self-hosted web archive.',
- 'schema': 'archivebox.index.json',
- 'copyright_info': SERVER_CONFIG.FOOTER_INFO,
- 'meta': {
- 'project': 'ArchiveBox',
- 'version': VERSION,
- 'git_sha': VERSION,
- 'website': 'https://ArchiveBox.io',
- 'docs': 'https://github.com/ArchiveBox/ArchiveBox/wiki',
- 'source': 'https://github.com/ArchiveBox/ArchiveBox',
- 'issues': 'https://github.com/ArchiveBox/ArchiveBox/issues',
- 'dependencies': {},
- },
- } if with_headers else {}
- snapshot_dicts = [s.to_dict(extended=True) for s in self.iterator(chunk_size=500)]
- if with_headers:
- output = {
- **MAIN_INDEX_HEADER,
- 'num_links': len(snapshot_dicts),
- 'updated': datetime.now(tz.utc),
- 'last_run_cmd': sys.argv,
- 'links': snapshot_dicts,
- }
- else:
- output = snapshot_dicts
- return to_json(output, indent=4, sort_keys=True)
- def to_csv(self, cols: Optional[List[str]] = None, header: bool = True, separator: str = ',', ljust: int = 0) -> str:
- """Generate CSV output from snapshots"""
- cols = cols or ['timestamp', 'is_archived', 'url']
- header_str = separator.join(col.ljust(ljust) for col in cols) if header else ''
- row_strs = (s.to_csv(cols=cols, ljust=ljust, separator=separator) for s in self.iterator(chunk_size=500))
- return '\n'.join((header_str, *row_strs))
- def to_html(self, with_headers: bool = True) -> str:
- """Generate main index HTML from snapshots"""
- from datetime import datetime, timezone as tz
- from django.template.loader import render_to_string
- from archivebox.config import VERSION
- from archivebox.config.common import SERVER_CONFIG
- from archivebox.config.version import get_COMMIT_HASH
- template = 'static_index.html' if with_headers else 'minimal_index.html'
- snapshot_list = list(self.iterator(chunk_size=500))
- return render_to_string(template, {
- 'version': VERSION,
- 'git_sha': get_COMMIT_HASH() or VERSION,
- 'num_links': str(len(snapshot_list)),
- 'date_updated': datetime.now(tz.utc).strftime('%Y-%m-%d'),
- 'time_updated': datetime.now(tz.utc).strftime('%Y-%m-%d %H:%M'),
- 'links': snapshot_list,
- 'FOOTER_INFO': SERVER_CONFIG.FOOTER_INFO,
- })
- class SnapshotManager(models.Manager.from_queryset(SnapshotQuerySet)):
- """Manager for Snapshot model - uses SnapshotQuerySet for chainable methods"""
- def filter(self, *args, **kwargs):
- domain = kwargs.pop('domain', None)
- qs = super().filter(*args, **kwargs)
- if domain:
- qs = qs.filter(url__icontains=f'://{domain}')
- return qs
- def get_queryset(self):
- # Don't prefetch by default - it causes "too many open files" during bulk operations
- # Views/templates can add .prefetch_related('tags', 'archiveresult_set') where needed
- return super().get_queryset()
- # =========================================================================
- # Import Methods
- # =========================================================================
- def remove(self, atomic: bool = False) -> tuple:
- """Remove snapshots from the database"""
- from django.db import transaction
- if atomic:
- with transaction.atomic():
- return self.delete()
- return self.delete()
- class Snapshot(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHealthStats, ModelWithStateMachine):
- id = models.UUIDField(primary_key=True, default=uuid7, editable=False, unique=True)
- created_at = models.DateTimeField(default=timezone.now, db_index=True)
- modified_at = models.DateTimeField(auto_now=True)
- url = models.URLField(unique=False, db_index=True) # URLs can appear in multiple crawls
- timestamp = models.CharField(max_length=32, unique=True, db_index=True, editable=False)
- bookmarked_at = models.DateTimeField(default=timezone.now, db_index=True)
- crawl: Crawl = models.ForeignKey(Crawl, on_delete=models.CASCADE, null=False, related_name='snapshot_set', db_index=True) # type: ignore[assignment]
- parent_snapshot = models.ForeignKey('self', on_delete=models.SET_NULL, null=True, blank=True, related_name='child_snapshots', db_index=True, help_text='Parent snapshot that discovered this URL (for recursive crawling)')
- title = models.CharField(max_length=512, null=True, blank=True, db_index=True)
- downloaded_at = models.DateTimeField(default=None, null=True, editable=False, db_index=True, blank=True)
- depth = models.PositiveSmallIntegerField(default=0, db_index=True) # 0 for root snapshot, 1+ for discovered URLs
- fs_version = models.CharField(max_length=10, default='0.9.0', help_text='Filesystem version of this snapshot (e.g., "0.7.0", "0.8.0", "0.9.0"). Used to trigger lazy migration on save().')
- current_step = models.PositiveSmallIntegerField(default=0, db_index=True, help_text='Current hook step being executed (0-9). Used for sequential hook execution.')
- retry_at = ModelWithStateMachine.RetryAtField(default=timezone.now)
- status = ModelWithStateMachine.StatusField(choices=ModelWithStateMachine.StatusChoices, default=ModelWithStateMachine.StatusChoices.QUEUED)
- config = models.JSONField(default=dict, null=False, blank=False, editable=True)
- notes = models.TextField(blank=True, null=False, default='')
- # output_dir is computed via @cached_property from fs_version and get_storage_path_for_version()
- tags = models.ManyToManyField(Tag, blank=True, through=SnapshotTag, related_name='snapshot_set', through_fields=('snapshot', 'tag'))
- state_machine_name = 'archivebox.core.models.SnapshotMachine'
- state_field_name = 'status'
- retry_at_field_name = 'retry_at'
- StatusChoices = ModelWithStateMachine.StatusChoices
- active_state = StatusChoices.STARTED
- objects = SnapshotManager()
- archiveresult_set: models.Manager['ArchiveResult']
- class Meta(TypedModelMeta):
- app_label = 'core'
- verbose_name = "Snapshot"
- verbose_name_plural = "Snapshots"
- constraints = [
- # Allow same URL in different crawls, but not duplicates within same crawl
- models.UniqueConstraint(fields=['url', 'crawl'], name='unique_url_per_crawl'),
- # Global timestamp uniqueness for 1:1 symlink mapping
- models.UniqueConstraint(fields=['timestamp'], name='unique_timestamp'),
- ]
- def __str__(self):
- return f'[{self.id}] {self.url[:64]}'
- @property
- def created_by(self):
- """Convenience property to access the user who created this snapshot via its crawl."""
- return self.crawl.created_by
- @property
- def process_set(self):
- """Get all Process objects related to this snapshot's ArchiveResults."""
- from archivebox.machine.models import Process
- return Process.objects.filter(archiveresult__snapshot_id=self.id)
- @property
- def binary_set(self):
- """Get all Binary objects used by processes related to this snapshot."""
- from archivebox.machine.models import Binary
- return Binary.objects.filter(process_set__archiveresult__snapshot_id=self.id).distinct()
- def save(self, *args, **kwargs):
- is_new = self._state.adding
- if not self.bookmarked_at:
- self.bookmarked_at = self.created_at or timezone.now()
- if not self.timestamp:
- self.timestamp = str(self.bookmarked_at.timestamp())
- # Migrate filesystem if needed (happens automatically on save)
- if self.pk and self.fs_migration_needed:
- from django.db import transaction
- with transaction.atomic():
- # Walk through migration chain automatically
- current = self.fs_version
- target = self._fs_current_version()
- while current != target:
- next_ver = self._fs_next_version(current)
- method = f'_fs_migrate_from_{current.replace(".", "_")}_to_{next_ver.replace(".", "_")}'
- # Only run if method exists (most are no-ops)
- if hasattr(self, method):
- getattr(self, method)()
- current = next_ver
- # Update version (still in transaction)
- self.fs_version = target
- super().save(*args, **kwargs)
- if self.url not in self.crawl.urls:
- self.crawl.urls += f'\n{self.url}'
- self.crawl.save()
- if is_new:
- from archivebox.misc.logging_util import log_worker_event
- log_worker_event(
- worker_type='DB',
- event='Created Snapshot',
- indent_level=2,
- url=self.url,
- metadata={
- 'id': str(self.id),
- 'crawl_id': str(self.crawl_id),
- 'depth': self.depth,
- 'status': self.status,
- },
- )
- # =========================================================================
- # Filesystem Migration Methods
- # =========================================================================
- @staticmethod
- def _fs_current_version() -> str:
- """Get current ArchiveBox filesystem version (normalized to x.x.0 format)"""
- from archivebox.config import VERSION
- # Normalize version to x.x.0 format (e.g., "0.9.0rc1" -> "0.9.0")
- parts = VERSION.split('.')
- if len(parts) >= 2:
- major, minor = parts[0], parts[1]
- # Strip any non-numeric suffix from minor version
- minor = ''.join(c for c in minor if c.isdigit())
- return f'{major}.{minor}.0'
- return '0.9.0' # Fallback if version parsing fails
- @property
- def fs_migration_needed(self) -> bool:
- """Check if snapshot needs filesystem migration"""
- return self.fs_version != self._fs_current_version()
- def _fs_next_version(self, version: str) -> str:
- """Get next version in migration chain (0.7/0.8 had same layout, only 0.8→0.9 migration needed)"""
- # Treat 0.7.0 and 0.8.0 as equivalent (both used archive/{timestamp})
- if version in ('0.7.0', '0.8.0'):
- return '0.9.0'
- return self._fs_current_version()
- def _fs_migrate_from_0_8_0_to_0_9_0(self):
- """
- Migrate from flat to nested structure.
- 0.8.x: archive/{timestamp}/
- 0.9.x: users/{user}/snapshots/YYYYMMDD/{domain}/{uuid}/
- Transaction handling:
- 1. Copy files INSIDE transaction
- 2. Convert index.json to index.jsonl INSIDE transaction
- 3. Create symlink INSIDE transaction
- 4. Update fs_version INSIDE transaction (done by save())
- 5. Exit transaction (DB commit)
- 6. Delete old files OUTSIDE transaction (after commit)
- """
- import shutil
- from django.db import transaction
- old_dir = self.get_storage_path_for_version('0.8.0')
- new_dir = self.get_storage_path_for_version('0.9.0')
- if not old_dir.exists() or old_dir == new_dir or new_dir.exists():
- # Even if no directory migration needed, still convert index format
- self.convert_index_json_to_jsonl()
- return
- new_dir.mkdir(parents=True, exist_ok=True)
- # Copy all files (idempotent), skipping index.json (will be converted to jsonl)
- for old_file in old_dir.rglob('*'):
- if not old_file.is_file():
- continue
- rel_path = old_file.relative_to(old_dir)
- new_file = new_dir / rel_path
- # Skip if already copied
- if new_file.exists() and new_file.stat().st_size == old_file.stat().st_size:
- continue
- new_file.parent.mkdir(parents=True, exist_ok=True)
- shutil.copy2(old_file, new_file)
- # Verify all copied
- old_files = {f.relative_to(old_dir): f.stat().st_size
- for f in old_dir.rglob('*') if f.is_file()}
- new_files = {f.relative_to(new_dir): f.stat().st_size
- for f in new_dir.rglob('*') if f.is_file()}
- if old_files.keys() != new_files.keys():
- missing = old_files.keys() - new_files.keys()
- raise Exception(f"Migration incomplete: missing {missing}")
- # Convert index.json to index.jsonl in the new directory
- self.convert_index_json_to_jsonl()
- # Create backwards-compat symlink (INSIDE transaction)
- symlink_path = CONSTANTS.ARCHIVE_DIR / self.timestamp
- if symlink_path.is_symlink():
- symlink_path.unlink()
- if not symlink_path.exists() or symlink_path == old_dir:
- symlink_path.symlink_to(new_dir, target_is_directory=True)
- # Schedule old directory deletion AFTER transaction commits
- transaction.on_commit(lambda: self._cleanup_old_migration_dir(old_dir))
- def _cleanup_old_migration_dir(self, old_dir: Path):
- """
- Delete old directory after successful migration.
- Called via transaction.on_commit() after DB commit succeeds.
- """
- import shutil
- import logging
- if old_dir.exists() and not old_dir.is_symlink():
- try:
- shutil.rmtree(old_dir)
- except Exception as e:
- # Log but don't raise - migration succeeded, this is just cleanup
- logging.getLogger('archivebox.migration').warning(
- f"Could not remove old migration directory {old_dir}: {e}"
- )
- # =========================================================================
- # Path Calculation and Migration Helpers
- # =========================================================================
- @staticmethod
- def extract_domain_from_url(url: str) -> str:
- """
- Extract domain from URL for 0.9.x path structure.
- Uses full hostname with sanitized special chars.
- Examples:
- https://example.com:8080 → example.com_8080
- https://sub.example.com → sub.example.com
- file:///path → localhost
- data:text/html → data
- """
- from urllib.parse import urlparse
- try:
- parsed = urlparse(url)
- if parsed.scheme in ('http', 'https'):
- if parsed.port:
- return f"{parsed.hostname}_{parsed.port}".replace(':', '_')
- return parsed.hostname or 'unknown'
- elif parsed.scheme == 'file':
- return 'localhost'
- elif parsed.scheme:
- return parsed.scheme
- else:
- return 'unknown'
- except Exception:
- return 'unknown'
- def get_storage_path_for_version(self, version: str) -> Path:
- """
- Calculate storage path for specific filesystem version.
- Centralizes path logic so it's reusable.
- 0.7.x/0.8.x: archive/{timestamp}
- 0.9.x: users/{username}/snapshots/YYYYMMDD/{domain}/{uuid}/
- """
- from datetime import datetime
- if version in ('0.7.0', '0.8.0'):
- return CONSTANTS.ARCHIVE_DIR / self.timestamp
- elif version in ('0.9.0', '1.0.0'):
- username = self.created_by.username
- # Use created_at for date grouping (fallback to timestamp)
- if self.created_at:
- date_str = self.created_at.strftime('%Y%m%d')
- else:
- date_str = datetime.fromtimestamp(float(self.timestamp)).strftime('%Y%m%d')
- domain = self.extract_domain_from_url(self.url)
- return (
- CONSTANTS.DATA_DIR / 'users' / username / 'snapshots' /
- date_str / domain / str(self.id)
- )
- else:
- # Unknown version - use current
- return self.get_storage_path_for_version(self._fs_current_version())
- # =========================================================================
- # Loading and Creation from Filesystem (Used by archivebox update ONLY)
- # =========================================================================
- @classmethod
- def load_from_directory(cls, snapshot_dir: Path) -> Optional['Snapshot']:
- """
- Load existing Snapshot from DB by reading index.jsonl or index.json.
- Reads index file, extracts url+timestamp, queries DB.
- Returns existing Snapshot or None if not found/invalid.
- Does NOT create new snapshots.
- ONLY used by: archivebox update (for orphan detection)
- """
- import json
- # Try index.jsonl first (new format), then index.json (legacy)
- jsonl_path = snapshot_dir / CONSTANTS.JSONL_INDEX_FILENAME
- json_path = snapshot_dir / CONSTANTS.JSON_INDEX_FILENAME
- data = None
- if jsonl_path.exists():
- try:
- with open(jsonl_path) as f:
- for line in f:
- line = line.strip()
- if line.startswith('{'):
- record = json.loads(line)
- if record.get('type') == 'Snapshot':
- data = record
- break
- except (json.JSONDecodeError, OSError):
- pass
- elif json_path.exists():
- try:
- with open(json_path) as f:
- data = json.load(f)
- except (json.JSONDecodeError, OSError):
- pass
- if not data:
- return None
- url = data.get('url')
- if not url:
- return None
- # Get timestamp - prefer index file, fallback to folder name
- timestamp = cls._select_best_timestamp(
- index_timestamp=data.get('timestamp'),
- folder_name=snapshot_dir.name
- )
- if not timestamp:
- return None
- # Look up existing
- try:
- return cls.objects.get(url=url, timestamp=timestamp)
- except cls.DoesNotExist:
- return None
- except cls.MultipleObjectsReturned:
- # Should not happen with unique constraint
- return cls.objects.filter(url=url, timestamp=timestamp).first()
- @classmethod
- def create_from_directory(cls, snapshot_dir: Path) -> Optional['Snapshot']:
- """
- Create new Snapshot from orphaned directory.
- Validates timestamp, ensures uniqueness.
- Returns new UNSAVED Snapshot or None if invalid.
- ONLY used by: archivebox update (for orphan import)
- """
- import json
- # Try index.jsonl first (new format), then index.json (legacy)
- jsonl_path = snapshot_dir / CONSTANTS.JSONL_INDEX_FILENAME
- json_path = snapshot_dir / CONSTANTS.JSON_INDEX_FILENAME
- data = None
- if jsonl_path.exists():
- try:
- with open(jsonl_path) as f:
- for line in f:
- line = line.strip()
- if line.startswith('{'):
- record = json.loads(line)
- if record.get('type') == 'Snapshot':
- data = record
- break
- except (json.JSONDecodeError, OSError):
- pass
- elif json_path.exists():
- try:
- with open(json_path) as f:
- data = json.load(f)
- except (json.JSONDecodeError, OSError):
- pass
- if not data:
- return None
- url = data.get('url')
- if not url:
- return None
- # Get and validate timestamp
- timestamp = cls._select_best_timestamp(
- index_timestamp=data.get('timestamp'),
- folder_name=snapshot_dir.name
- )
- if not timestamp:
- return None
- # Ensure uniqueness (reuses existing logic from create_or_update_from_dict)
- timestamp = cls._ensure_unique_timestamp(url, timestamp)
- # Detect version
- fs_version = cls._detect_fs_version_from_index(data)
- # Get or create catchall crawl for orphaned snapshots
- from archivebox.crawls.models import Crawl
- system_user_id = get_or_create_system_user_pk()
- catchall_crawl, _ = Crawl.objects.get_or_create(
- label='[migration] orphaned snapshots',
- defaults={
- 'urls': f'# Orphaned snapshot: {url}',
- 'max_depth': 0,
- 'created_by_id': system_user_id,
- }
- )
- return cls(
- url=url,
- timestamp=timestamp,
- title=data.get('title', ''),
- fs_version=fs_version,
- crawl=catchall_crawl,
- )
- @staticmethod
- def _select_best_timestamp(index_timestamp: str, folder_name: str) -> Optional[str]:
- """
- Select best timestamp from index.json vs folder name.
- Validates range (1995-2035).
- Prefers index.json if valid.
- """
- def is_valid_timestamp(ts):
- try:
- ts_int = int(float(ts))
- # 1995-01-01 to 2035-12-31
- return 788918400 <= ts_int <= 2082758400
- except:
- return False
- index_valid = is_valid_timestamp(index_timestamp) if index_timestamp else False
- folder_valid = is_valid_timestamp(folder_name)
- if index_valid:
- return str(int(float(index_timestamp)))
- elif folder_valid:
- return str(int(float(folder_name)))
- else:
- return None
- @classmethod
- def _ensure_unique_timestamp(cls, url: str, timestamp: str) -> str:
- """
- Ensure timestamp is globally unique.
- If collision with different URL, increment by 1 until unique.
- NOTE: Logic already exists in create_or_update_from_dict (line 266-267)
- This is just an extracted, reusable version.
- """
- while cls.objects.filter(timestamp=timestamp).exclude(url=url).exists():
- timestamp = str(int(float(timestamp)) + 1)
- return timestamp
- @staticmethod
- def _detect_fs_version_from_index(data: dict) -> str:
- """
- Detect fs_version from index.json structure.
- - Has fs_version field: use it
- - Has history dict: 0.7.0
- - Has archive_results list: 0.8.0
- - Default: 0.7.0
- """
- if 'fs_version' in data:
- return data['fs_version']
- if 'history' in data and 'archive_results' not in data:
- return '0.7.0'
- if 'archive_results' in data:
- return '0.8.0'
- return '0.7.0'
- # =========================================================================
- # Index.json Reconciliation
- # =========================================================================
- def reconcile_with_index(self):
- """
- Merge index.json/index.jsonl with DB. DB is source of truth.
- - Title: longest non-URL
- - Tags: union
- - ArchiveResults: keep both (by plugin+start_ts)
- Converts index.json to index.jsonl if needed, then writes back in JSONL format.
- Used by: archivebox update (to sync index with DB)
- """
- import json
- # Try to convert index.json to index.jsonl first
- self.convert_index_json_to_jsonl()
- # Check for index.jsonl (preferred) or index.json (legacy)
- jsonl_path = Path(self.output_dir) / CONSTANTS.JSONL_INDEX_FILENAME
- json_path = Path(self.output_dir) / CONSTANTS.JSON_INDEX_FILENAME
- index_data = {}
- if jsonl_path.exists():
- # Read from JSONL format
- jsonl_data = self.read_index_jsonl()
- if jsonl_data['snapshot']:
- index_data = jsonl_data['snapshot']
- # Convert archive_results list to expected format
- index_data['archive_results'] = jsonl_data['archive_results']
- elif json_path.exists():
- # Fallback to legacy JSON format
- try:
- with open(json_path) as f:
- index_data = json.load(f)
- except:
- pass
- # Merge title
- self._merge_title_from_index(index_data)
- # Merge tags
- self._merge_tags_from_index(index_data)
- # Merge ArchiveResults
- self._merge_archive_results_from_index(index_data)
- # Write back in JSONL format
- self.write_index_jsonl()
- def reconcile_with_index_json(self):
- """Deprecated: use reconcile_with_index() instead."""
- return self.reconcile_with_index()
- def _merge_title_from_index(self, index_data: dict):
- """Merge title - prefer longest non-URL title."""
- index_title = index_data.get('title', '').strip()
- db_title = self.title or ''
- candidates = [t for t in [index_title, db_title] if t and t != self.url]
- if candidates:
- best_title = max(candidates, key=len)
- if self.title != best_title:
- self.title = best_title
- def _merge_tags_from_index(self, index_data: dict):
- """Merge tags - union of both sources."""
- from django.db import transaction
- index_tags = set(index_data.get('tags', '').split(',')) if index_data.get('tags') else set()
- index_tags = {t.strip() for t in index_tags if t.strip()}
- db_tags = set(self.tags.values_list('name', flat=True))
- new_tags = index_tags - db_tags
- if new_tags:
- with transaction.atomic():
- for tag_name in new_tags:
- tag, _ = Tag.objects.get_or_create(name=tag_name)
- self.tags.add(tag)
- def _merge_archive_results_from_index(self, index_data: dict):
- """Merge ArchiveResults - keep both (by plugin+start_ts)."""
- existing = {
- (ar.plugin, ar.start_ts): ar
- for ar in ArchiveResult.objects.filter(snapshot=self)
- }
- # Handle 0.8.x format (archive_results list)
- for result_data in index_data.get('archive_results', []):
- self._create_archive_result_if_missing(result_data, existing)
- # Handle 0.7.x format (history dict)
- if 'history' in index_data and isinstance(index_data['history'], dict):
- for plugin, result_list in index_data['history'].items():
- if isinstance(result_list, list):
- for result_data in result_list:
- # Support both old 'extractor' and new 'plugin' keys for backwards compat
- result_data['plugin'] = result_data.get('plugin') or result_data.get('extractor') or plugin
- self._create_archive_result_if_missing(result_data, existing)
- def _create_archive_result_if_missing(self, result_data: dict, existing: dict):
- """Create ArchiveResult if not already in DB."""
- from dateutil import parser
- # Support both old 'extractor' and new 'plugin' keys for backwards compat
- plugin = result_data.get('plugin') or result_data.get('extractor', '')
- if not plugin:
- return
- start_ts = None
- if result_data.get('start_ts'):
- try:
- start_ts = parser.parse(result_data['start_ts'])
- except:
- pass
- if (plugin, start_ts) in existing:
- return
- try:
- end_ts = None
- if result_data.get('end_ts'):
- try:
- end_ts = parser.parse(result_data['end_ts'])
- except:
- pass
- # Support both 'output' (legacy) and 'output_str' (new JSONL) field names
- output_str = result_data.get('output_str') or result_data.get('output', '')
- ArchiveResult.objects.create(
- snapshot=self,
- plugin=plugin,
- hook_name=result_data.get('hook_name', ''),
- status=result_data.get('status', 'failed'),
- output_str=output_str,
- cmd=result_data.get('cmd', []),
- pwd=result_data.get('pwd', str(self.output_dir)),
- start_ts=start_ts,
- end_ts=end_ts,
- )
- except:
- pass
- def write_index_json(self):
- """Write index.json in 0.9.x format (deprecated, use write_index_jsonl)."""
- import json
- index_path = Path(self.output_dir) / 'index.json'
- data = {
- 'url': self.url,
- 'timestamp': self.timestamp,
- 'title': self.title or '',
- 'tags': ','.join(sorted(self.tags.values_list('name', flat=True))),
- 'fs_version': self.fs_version,
- 'bookmarked_at': self.bookmarked_at.isoformat() if self.bookmarked_at else None,
- 'created_at': self.created_at.isoformat() if self.created_at else None,
- 'archive_results': [
- {
- 'plugin': ar.plugin,
- 'status': ar.status,
- 'start_ts': ar.start_ts.isoformat() if ar.start_ts else None,
- 'end_ts': ar.end_ts.isoformat() if ar.end_ts else None,
- 'output': ar.output_str or '',
- 'cmd': ar.cmd if isinstance(ar.cmd, list) else [],
- 'pwd': ar.pwd,
- }
- for ar in ArchiveResult.objects.filter(snapshot=self).order_by('start_ts')
- ],
- }
- index_path.parent.mkdir(parents=True, exist_ok=True)
- with open(index_path, 'w') as f:
- json.dump(data, f, indent=2, sort_keys=True)
- def write_index_jsonl(self):
- """
- Write index.jsonl in flat JSONL format.
- Each line is a JSON record with a 'type' field:
- - Snapshot: snapshot metadata (crawl_id, url, tags, etc.)
- - ArchiveResult: extractor results (plugin, status, output, etc.)
- - Binary: binary info used for the extraction
- - Process: process execution details (cmd, exit_code, timing, etc.)
- """
- import json
- index_path = Path(self.output_dir) / CONSTANTS.JSONL_INDEX_FILENAME
- index_path.parent.mkdir(parents=True, exist_ok=True)
- # Track unique binaries and processes to avoid duplicates
- binaries_seen = set()
- processes_seen = set()
- with open(index_path, 'w') as f:
- # Write Snapshot record first (to_jsonl includes crawl_id, fs_version)
- f.write(json.dumps(self.to_jsonl()) + '\n')
- # Write ArchiveResult records with their associated Binary and Process
- # Use select_related to optimize queries
- for ar in self.archiveresult_set.select_related('process__binary').order_by('start_ts'):
- # Write Binary record if not already written
- if ar.process and ar.process.binary and ar.process.binary_id not in binaries_seen:
- binaries_seen.add(ar.process.binary_id)
- f.write(json.dumps(ar.process.binary.to_jsonl()) + '\n')
- # Write Process record if not already written
- if ar.process and ar.process_id not in processes_seen:
- processes_seen.add(ar.process_id)
- f.write(json.dumps(ar.process.to_jsonl()) + '\n')
- # Write ArchiveResult record
- f.write(json.dumps(ar.to_jsonl()) + '\n')
- def read_index_jsonl(self) -> dict:
- """
- Read index.jsonl and return parsed records grouped by type.
- Returns dict with keys: 'snapshot', 'archive_results', 'binaries', 'processes'
- """
- import json
- from archivebox.misc.jsonl import (
- TYPE_SNAPSHOT, TYPE_ARCHIVERESULT, TYPE_BINARY, TYPE_PROCESS,
- )
- index_path = Path(self.output_dir) / CONSTANTS.JSONL_INDEX_FILENAME
- result = {
- 'snapshot': None,
- 'archive_results': [],
- 'binaries': [],
- 'processes': [],
- }
- if not index_path.exists():
- return result
- with open(index_path, 'r') as f:
- for line in f:
- line = line.strip()
- if not line or not line.startswith('{'):
- continue
- try:
- record = json.loads(line)
- record_type = record.get('type')
- if record_type == TYPE_SNAPSHOT:
- result['snapshot'] = record
- elif record_type == TYPE_ARCHIVERESULT:
- result['archive_results'].append(record)
- elif record_type == TYPE_BINARY:
- result['binaries'].append(record)
- elif record_type == TYPE_PROCESS:
- result['processes'].append(record)
- except json.JSONDecodeError:
- continue
- return result
- def convert_index_json_to_jsonl(self) -> bool:
- """
- Convert index.json to index.jsonl format.
- Reads existing index.json, creates index.jsonl, and removes index.json.
- Returns True if conversion was performed, False if no conversion needed.
- """
- import json
- json_path = Path(self.output_dir) / CONSTANTS.JSON_INDEX_FILENAME
- jsonl_path = Path(self.output_dir) / CONSTANTS.JSONL_INDEX_FILENAME
- # Skip if already converted or no json file exists
- if jsonl_path.exists() or not json_path.exists():
- return False
- try:
- with open(json_path, 'r') as f:
- data = json.load(f)
- except (json.JSONDecodeError, OSError):
- return False
- # Detect format version and extract records
- fs_version = data.get('fs_version', '0.7.0')
- jsonl_path.parent.mkdir(parents=True, exist_ok=True)
- with open(jsonl_path, 'w') as f:
- # Write Snapshot record
- snapshot_record = {
- 'type': 'Snapshot',
- 'id': str(self.id),
- 'crawl_id': str(self.crawl_id) if self.crawl_id else None,
- 'url': data.get('url', self.url),
- 'timestamp': data.get('timestamp', self.timestamp),
- 'title': data.get('title', self.title or ''),
- 'tags': data.get('tags', ''),
- 'fs_version': fs_version,
- 'bookmarked_at': data.get('bookmarked_at'),
- 'created_at': data.get('created_at'),
- }
- f.write(json.dumps(snapshot_record) + '\n')
- # Handle 0.8.x/0.9.x format (archive_results list)
- for result_data in data.get('archive_results', []):
- ar_record = {
- 'type': 'ArchiveResult',
- 'snapshot_id': str(self.id),
- 'plugin': result_data.get('plugin', ''),
- 'status': result_data.get('status', ''),
- 'output_str': result_data.get('output', ''),
- 'start_ts': result_data.get('start_ts'),
- 'end_ts': result_data.get('end_ts'),
- }
- if result_data.get('cmd'):
- ar_record['cmd'] = result_data['cmd']
- f.write(json.dumps(ar_record) + '\n')
- # Handle 0.7.x format (history dict)
- if 'history' in data and isinstance(data['history'], dict):
- for plugin, result_list in data['history'].items():
- if not isinstance(result_list, list):
- continue
- for result_data in result_list:
- ar_record = {
- 'type': 'ArchiveResult',
- 'snapshot_id': str(self.id),
- 'plugin': result_data.get('plugin') or result_data.get('extractor') or plugin,
- 'status': result_data.get('status', ''),
- 'output_str': result_data.get('output', ''),
- 'start_ts': result_data.get('start_ts'),
- 'end_ts': result_data.get('end_ts'),
- }
- if result_data.get('cmd'):
- ar_record['cmd'] = result_data['cmd']
- f.write(json.dumps(ar_record) + '\n')
- # Remove old index.json after successful conversion
- try:
- json_path.unlink()
- except OSError:
- pass
- return True
- # =========================================================================
- # Snapshot Utilities
- # =========================================================================
- @staticmethod
- def move_directory_to_invalid(snapshot_dir: Path):
- """
- Move invalid directory to data/invalid/YYYYMMDD/.
- Used by: archivebox update (when encountering invalid directories)
- """
- from datetime import datetime
- import shutil
- invalid_dir = CONSTANTS.DATA_DIR / 'invalid' / datetime.now().strftime('%Y%m%d')
- invalid_dir.mkdir(parents=True, exist_ok=True)
- dest = invalid_dir / snapshot_dir.name
- counter = 1
- while dest.exists():
- dest = invalid_dir / f"{snapshot_dir.name}_{counter}"
- counter += 1
- try:
- shutil.move(str(snapshot_dir), str(dest))
- except:
- pass
- @classmethod
- def find_and_merge_duplicates(cls) -> int:
- """
- Find and merge snapshots with same url:timestamp.
- Returns count of duplicate sets merged.
- Used by: archivebox update (Phase 3: deduplication)
- """
- from django.db.models import Count
- duplicates = (
- cls.objects
- .values('url', 'timestamp')
- .annotate(count=Count('id'))
- .filter(count__gt=1)
- )
- merged = 0
- for dup in duplicates.iterator(chunk_size=500):
- snapshots = list(
- cls.objects
- .filter(url=dup['url'], timestamp=dup['timestamp'])
- .order_by('created_at') # Keep oldest
- )
- if len(snapshots) > 1:
- try:
- cls._merge_snapshots(snapshots)
- merged += 1
- except:
- pass
- return merged
- @classmethod
- def _merge_snapshots(cls, snapshots: list['Snapshot']):
- """
- Merge exact duplicates.
- Keep oldest, union files + ArchiveResults.
- """
- import shutil
- keeper = snapshots[0]
- duplicates = snapshots[1:]
- keeper_dir = Path(keeper.output_dir)
- for dup in duplicates:
- dup_dir = Path(dup.output_dir)
- # Merge files
- if dup_dir.exists() and dup_dir != keeper_dir:
- for dup_file in dup_dir.rglob('*'):
- if not dup_file.is_file():
- continue
- rel = dup_file.relative_to(dup_dir)
- keeper_file = keeper_dir / rel
- if not keeper_file.exists():
- keeper_file.parent.mkdir(parents=True, exist_ok=True)
- shutil.copy2(dup_file, keeper_file)
- try:
- shutil.rmtree(dup_dir)
- except:
- pass
- # Merge tags
- for tag in dup.tags.all():
- keeper.tags.add(tag)
- # Move ArchiveResults
- ArchiveResult.objects.filter(snapshot=dup).update(snapshot=keeper)
- # Delete
- dup.delete()
- # =========================================================================
- # Output Directory Properties
- # =========================================================================
- @property
- def output_dir_parent(self) -> str:
- return 'archive'
- @property
- def output_dir_name(self) -> str:
- return str(self.timestamp)
- def archive(self, overwrite=False, methods=None):
- return bg_archive_snapshot(self, overwrite=overwrite, methods=methods)
- @admin.display(description='Tags')
- def tags_str(self, nocache=True) -> str | None:
- calc_tags_str = lambda: ','.join(sorted(tag.name for tag in self.tags.all()))
- if hasattr(self, '_prefetched_objects_cache') and 'tags' in self._prefetched_objects_cache:
- return calc_tags_str()
- cache_key = f'{self.pk}-tags'
- return cache.get_or_set(cache_key, calc_tags_str) if not nocache else calc_tags_str()
- def icons(self) -> str:
- """Generate HTML icons showing which extractor plugins have succeeded for this snapshot"""
- from django.utils.html import format_html, mark_safe
- cache_key = f'result_icons:{self.pk}:{(self.downloaded_at or self.modified_at or self.created_at or self.bookmarked_at).timestamp()}'
- def calc_icons():
- if hasattr(self, '_prefetched_objects_cache') and 'archiveresult_set' in self._prefetched_objects_cache:
- archive_results = {r.plugin: r for r in self.archiveresult_set.all() if r.status == "succeeded" and (r.output_files or r.output_str)}
- else:
- # Filter for results that have either output_files or output_str
- from django.db.models import Q
- archive_results = {r.plugin: r for r in self.archiveresult_set.filter(
- Q(status="succeeded") & (Q(output_files__isnull=False) | ~Q(output_str=''))
- )}
- path = self.archive_path
- canon = self.canonical_outputs()
- output = ""
- output_template = '<a href="/{}/{}" class="exists-{}" title="{}">{}</a> '
- # Get all plugins from hooks system (sorted by numeric prefix)
- all_plugins = [get_plugin_name(e) for e in get_plugins()]
- for plugin in all_plugins:
- result = archive_results.get(plugin)
- existing = result and result.status == 'succeeded' and (result.output_files or result.output_str)
- icon = get_plugin_icon(plugin)
- # Skip plugins with empty icons that have no output
- # (e.g., staticfile only shows when there's actual output)
- if not icon.strip() and not existing:
- continue
- output += format_html(
- output_template,
- path,
- canon.get(plugin, plugin + '/'),
- str(bool(existing)),
- plugin,
- icon
- )
- return format_html('<span class="files-icons" style="font-size: 1.1em; opacity: 0.8; min-width: 240px; display: inline-block">{}</span>', mark_safe(output))
- cache_result = cache.get(cache_key)
- if cache_result:
- return cache_result
- fresh_result = calc_icons()
- cache.set(cache_key, fresh_result, timeout=60 * 60 * 24)
- return fresh_result
- @property
- def api_url(self) -> str:
- return reverse_lazy('api-1:get_snapshot', args=[self.id])
- def get_absolute_url(self):
- return f'/{self.archive_path}'
- @cached_property
- def domain(self) -> str:
- return url_domain(self.url)
- @cached_property
- def output_dir(self):
- """The filesystem path to the snapshot's output directory."""
- import os
- current_path = self.get_storage_path_for_version(self.fs_version)
- if current_path.exists():
- return str(current_path)
- # Check for backwards-compat symlink
- old_path = CONSTANTS.ARCHIVE_DIR / self.timestamp
- if old_path.is_symlink():
- return str(Path(os.readlink(old_path)).resolve())
- elif old_path.exists():
- return str(old_path)
- return str(current_path)
- @cached_property
- def archive_path(self):
- return f'{CONSTANTS.ARCHIVE_DIR_NAME}/{self.timestamp}'
- @cached_property
- def archive_size(self):
- try:
- return get_dir_size(self.output_dir)[0]
- except Exception:
- return 0
- def save_tags(self, tags: Iterable[str] = ()) -> None:
- tags_id = [Tag.objects.get_or_create(name=tag)[0].pk for tag in tags if tag.strip()]
- self.tags.clear()
- self.tags.add(*tags_id)
- def pending_archiveresults(self) -> QuerySet['ArchiveResult']:
- return self.archiveresult_set.exclude(status__in=ArchiveResult.FINAL_OR_ACTIVE_STATES)
- def run(self) -> list['ArchiveResult']:
- """
- Execute snapshot by creating pending ArchiveResults for all enabled hooks.
- Called by: SnapshotMachine.enter_started()
- Hook Lifecycle:
- 1. discover_hooks('Snapshot') → finds all plugin hooks
- 2. For each hook:
- - Create ArchiveResult with status=QUEUED
- - Store hook_name (e.g., 'on_Snapshot__50_wget.py')
- 3. ArchiveResults execute independently via ArchiveResultMachine
- 4. Hook execution happens in ArchiveResult.run(), NOT here
- Returns:
- list[ArchiveResult]: Newly created pending results
- """
- return self.create_pending_archiveresults()
- def cleanup(self):
- """
- Clean up background ArchiveResult hooks.
- Called by the state machine when entering the 'sealed' state.
- Kills any background hooks and finalizes their ArchiveResults.
- """
- from archivebox.hooks import kill_process
- # Kill any background ArchiveResult hooks
- if not self.OUTPUT_DIR.exists():
- return
- # Find all .pid files in this snapshot's output directory
- for pid_file in self.OUTPUT_DIR.glob('**/*.pid'):
- kill_process(pid_file, validate=True)
- # Update all STARTED ArchiveResults from filesystem
- results = self.archiveresult_set.filter(status=ArchiveResult.StatusChoices.STARTED)
- for ar in results:
- ar.update_from_output()
- def has_running_background_hooks(self) -> bool:
- """
- Check if any ArchiveResult background hooks are still running.
- Used by state machine to determine if snapshot is finished.
- """
- from archivebox.hooks import process_is_alive
- if not self.OUTPUT_DIR.exists():
- return False
- for plugin_dir in self.OUTPUT_DIR.iterdir():
- if not plugin_dir.is_dir():
- continue
- pid_file = plugin_dir / 'hook.pid'
- if process_is_alive(pid_file):
- return True
- return False
- def to_jsonl(self) -> dict:
- """
- Convert Snapshot model instance to a JSONL record.
- Includes all fields needed to fully reconstruct/identify this snapshot.
- """
- from archivebox.config import VERSION
- return {
- 'type': 'Snapshot',
- 'schema_version': VERSION,
- 'id': str(self.id),
- 'crawl_id': str(self.crawl_id),
- 'url': self.url,
- 'title': self.title,
- 'tags': self.tags_str(),
- 'bookmarked_at': self.bookmarked_at.isoformat() if self.bookmarked_at else None,
- 'created_at': self.created_at.isoformat() if self.created_at else None,
- 'timestamp': self.timestamp,
- 'depth': self.depth,
- 'status': self.status,
- 'fs_version': self.fs_version,
- }
- @staticmethod
- def from_jsonl(record: Dict[str, Any], overrides: Dict[str, Any] = None, queue_for_extraction: bool = True):
- """
- Create/update Snapshot from JSONL record or dict.
- Unified method that handles:
- - ID-based patching: {"id": "...", "title": "new title"}
- - URL-based create/update: {"url": "...", "title": "...", "tags": "..."}
- - Auto-creates Crawl if not provided
- - Optionally queues for extraction
- Args:
- record: Dict with 'url' (for create) or 'id' (for patch), plus other fields
- overrides: Dict with 'crawl', 'snapshot' (parent), 'created_by_id'
- queue_for_extraction: If True, sets status=QUEUED and retry_at (default: True)
- Returns:
- Snapshot instance or None
- """
- import re
- from django.utils import timezone
- from archivebox.misc.util import parse_date
- from archivebox.base_models.models import get_or_create_system_user_pk
- from archivebox.config.common import GENERAL_CONFIG
- overrides = overrides or {}
- # If 'id' is provided, lookup and patch that specific snapshot
- snapshot_id = record.get('id')
- if snapshot_id:
- try:
- snapshot = Snapshot.objects.get(id=snapshot_id)
- # Generically update all fields present in record
- update_fields = []
- for field_name, value in record.items():
- # Skip internal fields
- if field_name in ('id', 'type'):
- continue
- # Skip if field doesn't exist on model
- if not hasattr(snapshot, field_name):
- continue
- # Special parsing for date fields
- if field_name in ('bookmarked_at', 'retry_at', 'created_at', 'modified_at'):
- if value and isinstance(value, str):
- value = parse_date(value)
- # Update field if value is provided and different
- if value is not None and getattr(snapshot, field_name) != value:
- setattr(snapshot, field_name, value)
- update_fields.append(field_name)
- if update_fields:
- snapshot.save(update_fields=update_fields + ['modified_at'])
- return snapshot
- except Snapshot.DoesNotExist:
- # ID not found, fall through to create-by-URL logic
- pass
- url = record.get('url')
- if not url:
- return None
- # Determine or create crawl (every snapshot must have a crawl)
- crawl = overrides.get('crawl')
- parent_snapshot = overrides.get('snapshot') # Parent snapshot
- created_by_id = overrides.get('created_by_id') or (parent_snapshot.created_by.pk if parent_snapshot else get_or_create_system_user_pk())
- # If no crawl provided, inherit from parent or auto-create one
- if not crawl:
- if parent_snapshot:
- # Inherit crawl from parent snapshot
- crawl = parent_snapshot.crawl
- else:
- # Auto-create a single-URL crawl
- from archivebox.crawls.models import Crawl
- from archivebox.config import CONSTANTS
- timestamp_str = timezone.now().strftime("%Y-%m-%d__%H-%M-%S")
- sources_file = CONSTANTS.SOURCES_DIR / f'{timestamp_str}__auto_crawl.txt'
- sources_file.parent.mkdir(parents=True, exist_ok=True)
- sources_file.write_text(url)
- crawl = Crawl.objects.create(
- urls=url,
- max_depth=0,
- label=f'auto-created for {url[:50]}',
- created_by_id=created_by_id,
- )
- # Parse tags
- tags_str = record.get('tags', '')
- tag_list = []
- if tags_str:
- tag_list = list(dict.fromkeys(
- tag.strip() for tag in re.split(GENERAL_CONFIG.TAG_SEPARATOR_PATTERN, tags_str)
- if tag.strip()
- ))
- # Get most recent snapshot with this URL (URLs can exist in multiple crawls)
- snapshot = Snapshot.objects.filter(url=url).order_by('-created_at').first()
- title = record.get('title')
- timestamp = record.get('timestamp')
- if snapshot:
- # Update existing snapshot
- if title and (not snapshot.title or len(title) > len(snapshot.title or '')):
- snapshot.title = title
- snapshot.save(update_fields=['title', 'modified_at'])
- else:
- # Create new snapshot
- if timestamp:
- while Snapshot.objects.filter(timestamp=timestamp).exists():
- timestamp = str(float(timestamp) + 1.0)
- snapshot = Snapshot.objects.create(
- url=url,
- timestamp=timestamp,
- title=title,
- crawl=crawl,
- )
- # Update tags
- if tag_list:
- existing_tags = set(snapshot.tags.values_list('name', flat=True))
- new_tags = set(tag_list) | existing_tags
- snapshot.save_tags(new_tags)
- # Queue for extraction and update additional fields
- update_fields = []
- if queue_for_extraction:
- snapshot.status = Snapshot.StatusChoices.QUEUED
- snapshot.retry_at = timezone.now()
- update_fields.extend(['status', 'retry_at'])
- # Update additional fields if provided
- for field_name in ('depth', 'parent_snapshot_id', 'crawl_id', 'bookmarked_at'):
- value = record.get(field_name)
- if value is not None and getattr(snapshot, field_name) != value:
- setattr(snapshot, field_name, value)
- update_fields.append(field_name)
- if update_fields:
- snapshot.save(update_fields=update_fields + ['modified_at'])
- return snapshot
- def create_pending_archiveresults(self) -> list['ArchiveResult']:
- """
- Create ArchiveResult records for all enabled hooks.
- Uses the hooks system to discover available hooks from:
- - archivebox/plugins/*/on_Snapshot__*.{py,sh,js}
- - data/plugins/*/on_Snapshot__*.{py,sh,js}
- Creates one ArchiveResult per hook (not per plugin), with hook_name set.
- This enables step-based execution where all hooks in a step can run in parallel.
- """
- from archivebox.hooks import discover_hooks
- hooks = discover_hooks('Snapshot')
- archiveresults = []
- for hook_path in hooks:
- hook_name = hook_path.name # e.g., 'on_Snapshot__50_wget.py'
- plugin = hook_path.parent.name # e.g., 'wget'
- # Check if AR already exists for this specific hook
- if ArchiveResult.objects.filter(snapshot=self, hook_name=hook_name).exists():
- continue
- archiveresult, created = ArchiveResult.objects.get_or_create(
- snapshot=self,
- hook_name=hook_name,
- defaults={
- 'plugin': plugin,
- 'status': ArchiveResult.INITIAL_STATE,
- 'retry_at': timezone.now(),
- },
- )
- if archiveresult.status == ArchiveResult.INITIAL_STATE:
- archiveresults.append(archiveresult)
- return archiveresults
- def advance_step_if_ready(self) -> bool:
- """
- Advance current_step if all foreground hooks in current step are finished.
- Called by the state machine to check if step can advance.
- Background hooks (.bg) don't block step advancement.
- Step advancement rules:
- - All foreground ARs in current step must be finished (SUCCEEDED/FAILED/SKIPPED)
- - Background ARs (hook_name contains '.bg.') are ignored for advancement
- - When ready, increments current_step by 1 (up to 9)
- Returns:
- True if step was advanced, False if not ready or already at step 9.
- """
- from archivebox.hooks import extract_step, is_background_hook
- if self.current_step >= 9:
- return False # Already at final step
- # Get all ARs for current step that are foreground
- current_step_ars = self.archiveresult_set.filter(
- hook_name__isnull=False
- ).exclude(hook_name='')
- # Check each AR in current step
- for ar in current_step_ars:
- ar_step = extract_step(ar.hook_name)
- if ar_step != self.current_step:
- continue # Not in current step
- if is_background_hook(ar.hook_name):
- continue # Background hooks don't block
- # Foreground hook in current step - check if finished
- if ar.status not in ArchiveResult.FINAL_OR_ACTIVE_STATES:
- # Still pending/queued - can't advance
- return False
- if ar.status == ArchiveResult.StatusChoices.STARTED:
- # Still running - can't advance
- return False
- # All foreground hooks in current step are finished - advance!
- self.current_step += 1
- self.save(update_fields=['current_step', 'modified_at'])
- return True
- def is_finished_processing(self) -> bool:
- """
- Check if this snapshot has finished processing.
- Used by SnapshotMachine.is_finished() to determine if snapshot is complete.
- Returns:
- True if all archiveresults are finished (or no work to do), False otherwise.
- """
- # if no archiveresults exist yet, it's not finished
- if not self.archiveresult_set.exists():
- return False
- # Try to advance step if ready (handles step-based hook execution)
- # This will increment current_step when all foreground hooks in current step are done
- while self.advance_step_if_ready():
- pass # Keep advancing until we can't anymore
- # if archiveresults exist but are still pending, it's not finished
- if self.pending_archiveresults().exists():
- return False
- # Don't wait for background hooks - they'll be cleaned up on entering sealed state
- # Background hooks in STARTED state are excluded by pending_archiveresults()
- # (STARTED is in FINAL_OR_ACTIVE_STATES) so once all results are FINAL or ACTIVE,
- # we can transition to sealed and cleanup() will kill the background hooks
- # otherwise archiveresults exist and are all finished, so it's finished
- return True
- def retry_failed_archiveresults(self, retry_at: Optional['timezone.datetime'] = None) -> int:
- """
- Reset failed/skipped ArchiveResults to queued for retry.
- This enables seamless retry of the entire extraction pipeline:
- - Resets FAILED and SKIPPED results to QUEUED
- - Sets retry_at so workers pick them up
- - Plugins run in order (numeric prefix)
- - Each plugin checks its dependencies at runtime
- Dependency handling (e.g., chrome_session → screenshot):
- - Plugins check if required outputs exist before running
- - If dependency output missing → plugin returns 'skipped'
- - On retry, if dependency now succeeds → dependent can run
- Returns count of ArchiveResults reset.
- """
- retry_at = retry_at or timezone.now()
- count = self.archiveresult_set.filter(
- status__in=[
- ArchiveResult.StatusChoices.FAILED,
- ArchiveResult.StatusChoices.SKIPPED,
- ]
- ).update(
- status=ArchiveResult.StatusChoices.QUEUED,
- retry_at=retry_at,
- output=None,
- start_ts=None,
- end_ts=None,
- )
- # Also reset the snapshot and current_step so it gets re-checked from the beginning
- if count > 0:
- self.status = self.StatusChoices.STARTED
- self.retry_at = retry_at
- self.current_step = 0 # Reset to step 0 for retry
- self.save(update_fields=['status', 'retry_at', 'current_step', 'modified_at'])
- return count
- # =========================================================================
- # URL Helper Properties (migrated from Link schema)
- # =========================================================================
- @cached_property
- def url_hash(self) -> str:
- from hashlib import sha256
- return sha256(self.url.encode()).hexdigest()[:8]
- @cached_property
- def scheme(self) -> str:
- return self.url.split('://')[0]
- @cached_property
- def path(self) -> str:
- parts = self.url.split('://', 1)
- return '/' + parts[1].split('/', 1)[1] if len(parts) > 1 and '/' in parts[1] else '/'
- @cached_property
- def basename(self) -> str:
- return self.path.split('/')[-1]
- @cached_property
- def extension(self) -> str:
- basename = self.basename
- return basename.split('.')[-1] if '.' in basename else ''
- @cached_property
- def base_url(self) -> str:
- return f'{self.scheme}://{self.domain}'
- @cached_property
- def is_static(self) -> bool:
- static_extensions = {'.pdf', '.jpg', '.jpeg', '.png', '.gif', '.webp', '.svg', '.mp4', '.mp3', '.wav', '.webm'}
- return any(self.url.lower().endswith(ext) for ext in static_extensions)
- @cached_property
- def is_archived(self) -> bool:
- output_paths = (
- self.domain,
- 'output.html',
- 'output.pdf',
- 'screenshot.png',
- 'singlefile.html',
- 'readability/content.html',
- 'mercury/content.html',
- 'htmltotext.txt',
- 'media',
- 'git',
- )
- return any((Path(self.output_dir) / path).exists() for path in output_paths)
- # =========================================================================
- # Date/Time Properties (migrated from Link schema)
- # =========================================================================
- @cached_property
- def bookmarked_date(self) -> Optional[str]:
- max_ts = (timezone.now() + timedelta(days=30)).timestamp()
- if self.timestamp and self.timestamp.replace('.', '').isdigit():
- if 0 < float(self.timestamp) < max_ts:
- return self._ts_to_date_str(datetime.fromtimestamp(float(self.timestamp)))
- return str(self.timestamp)
- return None
- @cached_property
- def downloaded_datestr(self) -> Optional[str]:
- return self._ts_to_date_str(self.downloaded_at) if self.downloaded_at else None
- @cached_property
- def archive_dates(self) -> List[datetime]:
- return [
- result.start_ts
- for result in self.archiveresult_set.all()
- if result.start_ts
- ]
- @cached_property
- def oldest_archive_date(self) -> Optional[datetime]:
- dates = self.archive_dates
- return min(dates) if dates else None
- @cached_property
- def newest_archive_date(self) -> Optional[datetime]:
- dates = self.archive_dates
- return max(dates) if dates else None
- @cached_property
- def num_outputs(self) -> int:
- return self.archiveresult_set.filter(status='succeeded').count()
- @cached_property
- def num_failures(self) -> int:
- return self.archiveresult_set.filter(status='failed').count()
- # =========================================================================
- # Output Path Methods (migrated from Link schema)
- # =========================================================================
- def canonical_outputs(self) -> Dict[str, Optional[str]]:
- """
- Intelligently discover the best output file for each plugin.
- Uses actual ArchiveResult data and filesystem scanning with smart heuristics.
- """
- FAVICON_PROVIDER = 'https://www.google.com/s2/favicons?domain={}'
- # Mimetypes that can be embedded/previewed in an iframe
- IFRAME_EMBEDDABLE_EXTENSIONS = {
- 'html', 'htm', 'pdf', 'txt', 'md', 'json', 'jsonl',
- 'png', 'jpg', 'jpeg', 'gif', 'webp', 'svg', 'ico',
- 'mp4', 'webm', 'mp3', 'opus', 'ogg', 'wav',
- }
- MIN_DISPLAY_SIZE = 15_000 # 15KB - filter out tiny files
- MAX_SCAN_FILES = 50 # Don't scan massive directories
- def find_best_output_in_dir(dir_path: Path, plugin_name: str) -> Optional[str]:
- """Find the best representative file in a plugin's output directory"""
- if not dir_path.exists() or not dir_path.is_dir():
- return None
- candidates = []
- file_count = 0
- # Special handling for media plugin - look for thumbnails
- is_media_dir = plugin_name == 'media'
- # Scan for suitable files
- for file_path in dir_path.rglob('*'):
- file_count += 1
- if file_count > MAX_SCAN_FILES:
- break
- if file_path.is_dir() or file_path.name.startswith('.'):
- continue
- ext = file_path.suffix.lstrip('.').lower()
- if ext not in IFRAME_EMBEDDABLE_EXTENSIONS:
- continue
- try:
- size = file_path.stat().st_size
- except OSError:
- continue
- # For media dir, allow smaller image files (thumbnails are often < 15KB)
- min_size = 5_000 if (is_media_dir and ext in ('png', 'jpg', 'jpeg', 'webp', 'gif')) else MIN_DISPLAY_SIZE
- if size < min_size:
- continue
- # Prefer main files: index.html, output.*, content.*, etc.
- priority = 0
- name_lower = file_path.name.lower()
- if is_media_dir:
- # Special prioritization for media directories
- if any(keyword in name_lower for keyword in ('thumb', 'thumbnail', 'cover', 'poster')):
- priority = 200 # Highest priority for thumbnails
- elif ext in ('png', 'jpg', 'jpeg', 'webp', 'gif'):
- priority = 150 # High priority for any image
- elif ext in ('mp4', 'webm', 'mp3', 'opus', 'ogg'):
- priority = 100 # Lower priority for actual media files
- else:
- priority = 50
- elif 'index' in name_lower:
- priority = 100
- elif name_lower.startswith(('output', 'content', plugin_name)):
- priority = 50
- elif ext in ('html', 'htm', 'pdf'):
- priority = 30
- elif ext in ('png', 'jpg', 'jpeg', 'webp'):
- priority = 20
- else:
- priority = 10
- candidates.append((priority, size, file_path))
- if not candidates:
- return None
- # Sort by priority (desc), then size (desc)
- candidates.sort(key=lambda x: (x[0], x[1]), reverse=True)
- best_file = candidates[0][2]
- return str(best_file.relative_to(Path(self.output_dir)))
- canonical = {
- 'index_path': 'index.html',
- 'google_favicon_path': FAVICON_PROVIDER.format(self.domain),
- 'archive_org_path': f'https://web.archive.org/web/{self.base_url}',
- }
- # Scan each ArchiveResult's output directory for the best file
- snap_dir = Path(self.output_dir)
- for result in self.archiveresult_set.filter(status='succeeded'):
- if not result.output_files and not result.output_str:
- continue
- # Try to find the best output file for this plugin
- plugin_dir = snap_dir / result.plugin
- best_output = None
- # Check output_files first (new field)
- if result.output_files:
- first_file = next(iter(result.output_files.keys()), None)
- if first_file and (plugin_dir / first_file).exists():
- best_output = f'{result.plugin}/{first_file}'
- # Fallback to output_str if it looks like a path
- if not best_output and result.output_str and (snap_dir / result.output_str).exists():
- best_output = result.output_str
- if not best_output and plugin_dir.exists():
- # Intelligently find the best file in the plugin's directory
- best_output = find_best_output_in_dir(plugin_dir, result.plugin)
- if best_output:
- canonical[f'{result.plugin}_path'] = best_output
- # Also scan top-level for legacy outputs (backwards compatibility)
- for file_path in snap_dir.glob('*'):
- if file_path.is_dir() or file_path.name in ('index.html', 'index.json'):
- continue
- ext = file_path.suffix.lstrip('.').lower()
- if ext not in IFRAME_EMBEDDABLE_EXTENSIONS:
- continue
- try:
- size = file_path.stat().st_size
- if size >= MIN_DISPLAY_SIZE:
- # Add as generic output with stem as key
- key = f'{file_path.stem}_path'
- if key not in canonical:
- canonical[key] = file_path.name
- except OSError:
- continue
- if self.is_static:
- static_path = f'warc/{self.timestamp}'
- canonical.update({
- 'title': self.basename,
- 'wget_path': static_path,
- })
- return canonical
- def latest_outputs(self, status: Optional[str] = None) -> Dict[str, Any]:
- """Get the latest output that each plugin produced"""
- from archivebox.hooks import get_plugins
- from django.db.models import Q
- latest: Dict[str, Any] = {}
- for plugin in get_plugins():
- results = self.archiveresult_set.filter(plugin=plugin)
- if status is not None:
- results = results.filter(status=status)
- # Filter for results with output_files or output_str
- results = results.filter(Q(output_files__isnull=False) | ~Q(output_str='')).order_by('-start_ts')
- result = results.first()
- # Return embed_path() for backwards compatibility
- latest[plugin] = result.embed_path() if result else None
- return latest
- # =========================================================================
- # Serialization Methods
- # =========================================================================
- def to_dict(self, extended: bool = False) -> Dict[str, Any]:
- """Convert Snapshot to a dictionary (replacement for Link._asdict())"""
- from archivebox.misc.util import ts_to_date_str
- result = {
- 'TYPE': 'core.models.Snapshot',
- 'id': str(self.id),
- 'url': self.url,
- 'timestamp': self.timestamp,
- 'title': self.title,
- 'tags': self.tags_str(),
- 'downloaded_at': self.downloaded_at.isoformat() if self.downloaded_at else None,
- 'bookmarked_at': self.bookmarked_at.isoformat() if self.bookmarked_at else None,
- 'created_at': self.created_at.isoformat() if self.created_at else None,
- # Computed properties
- 'domain': self.domain,
- 'scheme': self.scheme,
- 'base_url': self.base_url,
- 'path': self.path,
- 'basename': self.basename,
- 'extension': self.extension,
- 'is_static': self.is_static,
- 'is_archived': self.is_archived,
- 'archive_path': self.archive_path,
- 'output_dir': self.output_dir,
- 'link_dir': self.output_dir, # backwards compatibility alias
- 'archive_size': self.archive_size,
- 'bookmarked_date': self.bookmarked_date,
- 'downloaded_datestr': self.downloaded_datestr,
- 'num_outputs': self.num_outputs,
- 'num_failures': self.num_failures,
- }
- if extended:
- result['canonical'] = self.canonical_outputs()
- return result
- def to_json(self, indent: int = 4) -> str:
- """Convert to JSON string"""
- return to_json(self.to_dict(extended=True), indent=indent)
- def to_csv(self, cols: Optional[List[str]] = None, separator: str = ',', ljust: int = 0) -> str:
- """Convert to CSV string"""
- data = self.to_dict()
- cols = cols or ['timestamp', 'is_archived', 'url']
- return separator.join(to_json(data.get(col, ''), indent=None).ljust(ljust) for col in cols)
- def write_json_details(self, out_dir: Optional[str] = None) -> None:
- """Write JSON index file for this snapshot to its output directory"""
- out_dir = out_dir or self.output_dir
- path = Path(out_dir) / CONSTANTS.JSON_INDEX_FILENAME
- atomic_write(str(path), self.to_dict(extended=True))
- def write_html_details(self, out_dir: Optional[str] = None) -> None:
- """Write HTML detail page for this snapshot to its output directory"""
- from django.template.loader import render_to_string
- from archivebox.config.common import SERVER_CONFIG
- from archivebox.config.configset import get_config
- from archivebox.misc.logging_util import printable_filesize
- out_dir = out_dir or self.output_dir
- config = get_config()
- SAVE_ARCHIVE_DOT_ORG = config.get('SAVE_ARCHIVE_DOT_ORG', True)
- TITLE_LOADING_MSG = 'Not yet archived...'
- canonical = self.canonical_outputs()
- context = {
- **self.to_dict(extended=True),
- **{f'{k}_path': v for k, v in canonical.items()},
- 'canonical': {f'{k}_path': v for k, v in canonical.items()},
- 'title': htmlencode(self.title or (self.base_url if self.is_archived else TITLE_LOADING_MSG)),
- 'url_str': htmlencode(urldecode(self.base_url)),
- 'archive_url': urlencode(f'warc/{self.timestamp}' or (self.domain if self.is_archived else '')) or 'about:blank',
- 'extension': self.extension or 'html',
- 'tags': self.tags_str() or 'untagged',
- 'size': printable_filesize(self.archive_size) if self.archive_size else 'pending',
- 'status': 'archived' if self.is_archived else 'not yet archived',
- 'status_color': 'success' if self.is_archived else 'danger',
- 'oldest_archive_date': ts_to_date_str(self.oldest_archive_date),
- 'SAVE_ARCHIVE_DOT_ORG': SAVE_ARCHIVE_DOT_ORG,
- 'PREVIEW_ORIGINALS': SERVER_CONFIG.PREVIEW_ORIGINALS,
- }
- rendered_html = render_to_string('snapshot.html', context)
- atomic_write(str(Path(out_dir) / CONSTANTS.HTML_INDEX_FILENAME), rendered_html)
- # =========================================================================
- # Helper Methods
- # =========================================================================
- @staticmethod
- def _ts_to_date_str(dt: Optional[datetime]) -> Optional[str]:
- return dt.strftime('%Y-%m-%d %H:%M:%S') if dt else None
- # =============================================================================
- # Snapshot State Machine
- # =============================================================================
- class SnapshotMachine(BaseStateMachine, strict_states=True):
- """
- State machine for managing Snapshot lifecycle.
- Hook Lifecycle:
- ┌─────────────────────────────────────────────────────────────┐
- │ QUEUED State │
- │ • Waiting for snapshot to be ready │
- └─────────────────────────────────────────────────────────────┘
- ↓ tick() when can_start()
- ┌─────────────────────────────────────────────────────────────┐
- │ STARTED State → enter_started() │
- │ 1. snapshot.run() │
- │ • discover_hooks('Snapshot') → finds all plugin hooks │
- │ • create_pending_archiveresults() → creates ONE │
- │ ArchiveResult per hook (NO execution yet) │
- │ 2. ArchiveResults process independently with their own │
- │ state machines (see ArchiveResultMachine) │
- │ 3. Advance through steps 0-9 as foreground hooks complete │
- └─────────────────────────────────────────────────────────────┘
- ↓ tick() when is_finished()
- ┌─────────────────────────────────────────────────────────────┐
- │ SEALED State → enter_sealed() │
- │ • cleanup() → kills any background hooks still running │
- │ • Set retry_at=None (no more processing) │
- └─────────────────────────────────────────────────────────────┘
- https://github.com/ArchiveBox/ArchiveBox/wiki/ArchiveBox-Architecture-Diagrams
- """
- model_attr_name = 'snapshot'
- # States
- queued = State(value=Snapshot.StatusChoices.QUEUED, initial=True)
- started = State(value=Snapshot.StatusChoices.STARTED)
- sealed = State(value=Snapshot.StatusChoices.SEALED, final=True)
- # Tick Event
- tick = (
- queued.to.itself(unless='can_start') |
- queued.to(started, cond='can_start') |
- started.to.itself(unless='is_finished') |
- started.to(sealed, cond='is_finished')
- )
- def can_start(self) -> bool:
- can_start = bool(self.snapshot.url)
- return can_start
- def is_finished(self) -> bool:
- """Check if snapshot processing is complete - delegates to model method."""
- return self.snapshot.is_finished_processing()
- @queued.enter
- def enter_queued(self):
- self.snapshot.update_and_requeue(
- retry_at=timezone.now(),
- status=Snapshot.StatusChoices.QUEUED,
- )
- @started.enter
- def enter_started(self):
- # lock the snapshot while we create the pending archiveresults
- self.snapshot.update_and_requeue(
- retry_at=timezone.now() + timedelta(seconds=30), # if failed, wait 30s before retrying
- )
- # Run the snapshot - creates pending archiveresults for all enabled plugins
- self.snapshot.run()
- # unlock the snapshot after we're done + set status = started
- self.snapshot.update_and_requeue(
- retry_at=timezone.now() + timedelta(seconds=5), # check again in 5s
- status=Snapshot.StatusChoices.STARTED,
- )
- @sealed.enter
- def enter_sealed(self):
- # Clean up background hooks
- self.snapshot.cleanup()
- self.snapshot.update_and_requeue(
- retry_at=None,
- status=Snapshot.StatusChoices.SEALED,
- )
- class ArchiveResult(ModelWithOutputDir, ModelWithConfig, ModelWithNotes, ModelWithHealthStats, ModelWithStateMachine):
- class StatusChoices(models.TextChoices):
- QUEUED = 'queued', 'Queued'
- STARTED = 'started', 'Started'
- BACKOFF = 'backoff', 'Waiting to retry'
- SUCCEEDED = 'succeeded', 'Succeeded'
- FAILED = 'failed', 'Failed'
- SKIPPED = 'skipped', 'Skipped'
- @classmethod
- def get_plugin_choices(cls):
- """Get plugin choices from discovered hooks (for forms/admin)."""
- plugins = [get_plugin_name(e) for e in get_plugins()]
- return tuple((e, e) for e in plugins)
- # Keep AutoField for backward compatibility with 0.7.x databases
- # UUID field is added separately by migration for new records
- id = models.AutoField(primary_key=True, editable=False)
- # Note: unique constraint is added by migration 0027 - don't set unique=True here
- # or SQLite table recreation in earlier migrations will fail
- uuid = models.UUIDField(default=uuid7, null=True, blank=True, db_index=True)
- created_at = models.DateTimeField(default=timezone.now, db_index=True)
- modified_at = models.DateTimeField(auto_now=True)
- snapshot: Snapshot = models.ForeignKey(Snapshot, on_delete=models.CASCADE) # type: ignore
- # No choices= constraint - plugin names come from plugin system and can be any string
- plugin = models.CharField(max_length=32, blank=False, null=False, db_index=True, default='')
- hook_name = models.CharField(max_length=255, blank=True, default='', db_index=True, help_text='Full filename of the hook that executed (e.g., on_Snapshot__50_wget.py)')
- # Process FK - tracks execution details (cmd, pwd, stdout, stderr, etc.)
- # Required - every ArchiveResult must have a Process
- process = models.OneToOneField(
- 'machine.Process',
- on_delete=models.PROTECT,
- null=False, # Required after migration 4
- related_name='archiveresult',
- help_text='Process execution details for this archive result'
- )
- # New output fields (replacing old 'output' field)
- output_str = models.TextField(blank=True, default='', help_text='Human-readable output summary')
- output_json = models.JSONField(null=True, blank=True, default=None, help_text='Structured metadata (headers, redirects, etc.)')
- output_files = models.JSONField(default=dict, help_text='Dict of {relative_path: {metadata}}')
- output_size = models.BigIntegerField(default=0, help_text='Total bytes of all output files')
- output_mimetypes = models.CharField(max_length=512, blank=True, default='', help_text='CSV of mimetypes sorted by size')
- start_ts = models.DateTimeField(default=None, null=True, blank=True)
- end_ts = models.DateTimeField(default=None, null=True, blank=True)
- status = ModelWithStateMachine.StatusField(choices=StatusChoices.choices, default=StatusChoices.QUEUED)
- retry_at = ModelWithStateMachine.RetryAtField(default=timezone.now)
- notes = models.TextField(blank=True, null=False, default='')
- # output_dir is computed via @property from snapshot.output_dir / plugin
- state_machine_name = 'archivebox.core.models.ArchiveResultMachine'
- retry_at_field_name = 'retry_at'
- state_field_name = 'status'
- active_state = StatusChoices.STARTED
- class Meta(TypedModelMeta):
- app_label = 'core'
- verbose_name = 'Archive Result'
- verbose_name_plural = 'Archive Results Log'
- def __str__(self):
- return f'[{self.id}] {self.snapshot.url[:64]} -> {self.plugin}'
- @property
- def created_by(self):
- """Convenience property to access the user who created this archive result via its snapshot's crawl."""
- return self.snapshot.crawl.created_by
- def to_jsonl(self) -> dict:
- """
- Convert ArchiveResult model instance to a JSONL record.
- """
- from archivebox.config import VERSION
- record = {
- 'type': 'ArchiveResult',
- 'schema_version': VERSION,
- 'id': str(self.id),
- 'snapshot_id': str(self.snapshot_id),
- 'plugin': self.plugin,
- 'hook_name': self.hook_name,
- 'status': self.status,
- 'output_str': self.output_str,
- 'start_ts': self.start_ts.isoformat() if self.start_ts else None,
- 'end_ts': self.end_ts.isoformat() if self.end_ts else None,
- }
- # Include optional fields if set
- if self.output_json:
- record['output_json'] = self.output_json
- if self.output_files:
- record['output_files'] = self.output_files
- if self.output_size:
- record['output_size'] = self.output_size
- if self.output_mimetypes:
- record['output_mimetypes'] = self.output_mimetypes
- if self.cmd:
- record['cmd'] = self.cmd
- if self.cmd_version:
- record['cmd_version'] = self.cmd_version
- if self.process_id:
- record['process_id'] = str(self.process_id)
- return record
- def save(self, *args, **kwargs):
- is_new = self._state.adding
- # Create Process record if this is a new ArchiveResult and no process exists yet
- if is_new and not self.process_id:
- from archivebox.machine.models import Process, Machine
- process = Process.objects.create(
- machine=Machine.current(),
- pwd=str(Path(self.snapshot.output_dir) / self.plugin),
- cmd=[], # Will be set by run()
- status='queued',
- timeout=120,
- env={},
- )
- self.process = process
- # Skip ModelWithOutputDir.save() to avoid creating index.json in plugin directories
- # Call the Django Model.save() directly instead
- models.Model.save(self, *args, **kwargs)
- if is_new:
- from archivebox.misc.logging_util import log_worker_event
- log_worker_event(
- worker_type='DB',
- event='Created ArchiveResult',
- indent_level=3,
- plugin=self.plugin,
- metadata={
- 'id': str(self.id),
- 'snapshot_id': str(self.snapshot_id),
- 'snapshot_url': str(self.snapshot.url)[:64],
- 'status': self.status,
- },
- )
- @cached_property
- def snapshot_dir(self):
- return Path(self.snapshot.output_dir)
- @cached_property
- def url(self):
- return self.snapshot.url
- @property
- def api_url(self) -> str:
- return reverse_lazy('api-1:get_archiveresult', args=[self.id])
- def get_absolute_url(self):
- return f'/{self.snapshot.archive_path}/{self.plugin}'
- @property
- def plugin_module(self) -> Any | None:
- # Hook scripts are now used instead of Python plugin modules
- # The plugin name maps to hooks in archivebox/plugins/{plugin}/
- return None
- def output_exists(self) -> bool:
- return os.path.exists(Path(self.snapshot_dir) / self.plugin)
- def embed_path(self) -> Optional[str]:
- """
- Get the relative path to the embeddable output file for this result.
- Returns the first file from output_files if set, otherwise tries to
- find a reasonable default based on the plugin type.
- """
- # Check output_files dict for primary output
- if self.output_files:
- # Return first file from output_files (dict preserves insertion order)
- first_file = next(iter(self.output_files.keys()), None)
- if first_file:
- return f'{self.plugin}/{first_file}'
- # Fallback: check output_str if it looks like a file path
- if self.output_str and ('/' in self.output_str or '.' in self.output_str):
- return self.output_str
- # Try to find output file based on plugin's canonical output path
- canonical = self.snapshot.canonical_outputs()
- plugin_key = f'{self.plugin}_path'
- if plugin_key in canonical:
- return canonical[plugin_key]
- # Fallback to plugin directory
- return f'{self.plugin}/'
- def create_output_dir(self):
- output_dir = Path(self.snapshot_dir) / self.plugin
- output_dir.mkdir(parents=True, exist_ok=True)
- return output_dir
- @property
- def output_dir_name(self) -> str:
- return self.plugin
- @property
- def output_dir_parent(self) -> str:
- return str(self.snapshot.OUTPUT_DIR.relative_to(CONSTANTS.DATA_DIR))
- # Properties that delegate to Process model (for backwards compatibility)
- # These properties will replace the direct fields after migration is complete
- # They allow existing code to continue using archiveresult.pwd, .cmd, etc.
- # Note: After migration 3 creates Process records and migration 5 removes the old fields,
- # these properties provide seamless access to Process data through ArchiveResult
- # Uncommented after migration 3 completed - properties now active
- @property
- def pwd(self) -> str:
- """Working directory (from Process)."""
- return self.process.pwd if self.process_id else ''
- @property
- def cmd(self) -> list:
- """Command array (from Process)."""
- return self.process.cmd if self.process_id else []
- @property
- def cmd_version(self) -> str:
- """Command version (from Process.binary)."""
- return self.process.cmd_version if self.process_id else ''
- @property
- def binary(self):
- """Binary FK (from Process)."""
- return self.process.binary if self.process_id else None
- @property
- def iface(self):
- """Network interface FK (from Process)."""
- return self.process.iface if self.process_id else None
- @property
- def machine(self):
- """Machine FK (from Process)."""
- return self.process.machine if self.process_id else None
- @property
- def timeout(self) -> int:
- """Timeout in seconds (from Process)."""
- return self.process.timeout if self.process_id else 120
- def save_search_index(self):
- pass
- def cascade_health_update(self, success: bool):
- """Update health stats for self, parent Snapshot, and grandparent Crawl."""
- self.increment_health_stats(success)
- self.snapshot.increment_health_stats(success)
- self.snapshot.crawl.increment_health_stats(success)
- def run(self):
- """
- Execute this ArchiveResult's hook and update status.
- If self.hook_name is set, runs only that specific hook.
- If self.hook_name is empty, discovers and runs all hooks for self.plugin (backwards compat).
- Updates status/output fields, queues discovered URLs, and triggers indexing.
- """
- from django.utils import timezone
- from archivebox.hooks import BUILTIN_PLUGINS_DIR, USER_PLUGINS_DIR, run_hook, is_background_hook
- from archivebox.config.configset import get_config
- # Get merged config with proper context
- config = get_config(
- crawl=self.snapshot.crawl,
- snapshot=self.snapshot,
- )
- # Determine which hook(s) to run
- hooks = []
- if self.hook_name:
- # SPECIFIC HOOK MODE: Find the specific hook by name
- for base_dir in (BUILTIN_PLUGINS_DIR, USER_PLUGINS_DIR):
- if not base_dir.exists():
- continue
- plugin_dir = base_dir / self.plugin
- if plugin_dir.exists():
- hook_path = plugin_dir / self.hook_name
- if hook_path.exists():
- hooks.append(hook_path)
- break
- else:
- # LEGACY MODE: Discover all hooks for this plugin (backwards compatibility)
- for base_dir in (BUILTIN_PLUGINS_DIR, USER_PLUGINS_DIR):
- if not base_dir.exists():
- continue
- plugin_dir = base_dir / self.plugin
- if plugin_dir.exists():
- matches = list(plugin_dir.glob('on_Snapshot__*.*'))
- if matches:
- hooks.extend(sorted(matches))
- if not hooks:
- self.status = self.StatusChoices.FAILED
- if self.hook_name:
- self.output_str = f'Hook not found: {self.plugin}/{self.hook_name}'
- else:
- self.output_str = f'No hooks found for plugin: {self.plugin}'
- self.retry_at = None
- self.save()
- return
- # Output directory is plugin_dir for the hook output
- plugin_dir = Path(self.snapshot.output_dir) / self.plugin
- start_ts = timezone.now()
- is_bg_hook = False
- for hook in hooks:
- # Check if this is a background hook
- is_bg_hook = is_background_hook(hook.name)
- result = run_hook(
- hook,
- output_dir=plugin_dir,
- config=config,
- url=self.snapshot.url,
- snapshot_id=str(self.snapshot.id),
- crawl_id=str(self.snapshot.crawl.id),
- depth=self.snapshot.depth,
- )
- # Background hooks return None
- if result is None:
- is_bg_hook = True
- # Update status based on hook execution
- if is_bg_hook:
- # BACKGROUND HOOK - still running, return immediately
- # Status stays STARTED, will be finalized by Snapshot.cleanup()
- self.status = self.StatusChoices.STARTED
- self.start_ts = start_ts
- if self.process_id:
- self.process.pwd = str(plugin_dir)
- self.process.save()
- self.save()
- return
- # FOREGROUND HOOK - completed, update from filesystem
- self.start_ts = start_ts
- if self.process_id:
- self.process.pwd = str(plugin_dir)
- self.process.save()
- self.update_from_output()
- # Clean up empty output directory if no files were created
- if plugin_dir.exists() and not self.output_files:
- try:
- if not any(plugin_dir.iterdir()):
- plugin_dir.rmdir()
- except (OSError, RuntimeError):
- pass
- def update_from_output(self):
- """
- Update this ArchiveResult from filesystem logs and output files.
- Used for:
- - Foreground hooks that completed (called from ArchiveResult.run())
- - Background hooks that completed (called from Snapshot.cleanup())
- Updates:
- - status, output_str, output_json from ArchiveResult JSONL record
- - output_files, output_size, output_mimetypes by walking filesystem
- - end_ts, retry_at, cmd, cmd_version, binary FK
- - Processes side-effect records (Snapshot, Tag, etc.) via process_hook_records()
- """
- import json
- import mimetypes
- from collections import defaultdict
- from pathlib import Path
- from django.utils import timezone
- from archivebox.hooks import process_hook_records
- plugin_dir = Path(self.pwd) if self.pwd else None
- if not plugin_dir or not plugin_dir.exists():
- self.status = self.StatusChoices.FAILED
- self.output_str = 'Output directory not found'
- self.end_ts = timezone.now()
- self.retry_at = None
- self.save()
- return
- # Read and parse JSONL output from stdout.log
- stdout_file = plugin_dir / 'stdout.log'
- stdout = stdout_file.read_text() if stdout_file.exists() else ''
- records = []
- for line in stdout.splitlines():
- if line.strip() and line.strip().startswith('{'):
- try:
- records.append(json.loads(line))
- except json.JSONDecodeError:
- continue
- # Find ArchiveResult record and update status/output from it
- ar_records = [r for r in records if r.get('type') == 'ArchiveResult']
- if ar_records:
- hook_data = ar_records[0]
- # Update status
- status_map = {
- 'succeeded': self.StatusChoices.SUCCEEDED,
- 'failed': self.StatusChoices.FAILED,
- 'skipped': self.StatusChoices.SKIPPED,
- }
- self.status = status_map.get(hook_data.get('status', 'failed'), self.StatusChoices.FAILED)
- # Update output fields
- self.output_str = hook_data.get('output_str') or hook_data.get('output') or ''
- self.output_json = hook_data.get('output_json')
- # Update cmd fields
- if hook_data.get('cmd'):
- if self.process_id:
- self.process.cmd = hook_data['cmd']
- self.process.save()
- self._set_binary_from_cmd(hook_data['cmd'])
- # Note: cmd_version is derived from binary.version, not stored on Process
- else:
- # No ArchiveResult record = failed
- self.status = self.StatusChoices.FAILED
- self.output_str = 'Hook did not output ArchiveResult record'
- # Walk filesystem and populate output_files, output_size, output_mimetypes
- exclude_names = {'stdout.log', 'stderr.log', 'hook.pid', 'listener.pid'}
- mime_sizes = defaultdict(int)
- total_size = 0
- output_files = {}
- for file_path in plugin_dir.rglob('*'):
- if not file_path.is_file():
- continue
- if file_path.name in exclude_names:
- continue
- try:
- stat = file_path.stat()
- mime_type, _ = mimetypes.guess_type(str(file_path))
- mime_type = mime_type or 'application/octet-stream'
- relative_path = str(file_path.relative_to(plugin_dir))
- output_files[relative_path] = {}
- mime_sizes[mime_type] += stat.st_size
- total_size += stat.st_size
- except (OSError, IOError):
- continue
- self.output_files = output_files
- self.output_size = total_size
- sorted_mimes = sorted(mime_sizes.items(), key=lambda x: x[1], reverse=True)
- self.output_mimetypes = ','.join(mime for mime, _ in sorted_mimes)
- # Update timestamps
- self.end_ts = timezone.now()
- self.retry_at = None
- self.save()
- # Process side-effect records (filter Snapshots for depth/URL)
- filtered_records = []
- for record in records:
- record_type = record.get('type')
- # Skip ArchiveResult records (already processed above)
- if record_type == 'ArchiveResult':
- continue
- # Filter Snapshot records for depth/URL constraints
- if record_type == 'Snapshot':
- url = record.get('url')
- if not url:
- continue
- depth = record.get('depth', self.snapshot.depth + 1)
- if depth > self.snapshot.crawl.max_depth:
- continue
- if not self._url_passes_filters(url):
- continue
- filtered_records.append(record)
- # Process filtered records with unified dispatcher
- overrides = {
- 'snapshot': self.snapshot,
- 'crawl': self.snapshot.crawl,
- 'created_by_id': self.created_by.pk,
- }
- process_hook_records(filtered_records, overrides=overrides)
- # Cleanup PID files and empty logs
- pid_file = plugin_dir / 'hook.pid'
- pid_file.unlink(missing_ok=True)
- stderr_file = plugin_dir / 'stderr.log'
- if stdout_file.exists() and stdout_file.stat().st_size == 0:
- stdout_file.unlink()
- if stderr_file.exists() and stderr_file.stat().st_size == 0:
- stderr_file.unlink()
- def _set_binary_from_cmd(self, cmd: list) -> None:
- """
- Find Binary for command and set binary FK.
- Tries matching by absolute path first, then by binary name.
- Only matches binaries on the current machine.
- """
- if not cmd:
- return
- from archivebox.machine.models import Machine
- bin_path_or_name = cmd[0] if isinstance(cmd, list) else cmd
- machine = Machine.current()
- # Try matching by absolute path first
- binary = Binary.objects.filter(
- abspath=bin_path_or_name,
- machine=machine
- ).first()
- if binary:
- if self.process_id:
- self.process.binary = binary
- self.process.save()
- return
- # Fallback: match by binary name
- bin_name = Path(bin_path_or_name).name
- binary = Binary.objects.filter(
- name=bin_name,
- machine=machine
- ).first()
- if binary:
- if self.process_id:
- self.process.binary = binary
- self.process.save()
- def _url_passes_filters(self, url: str) -> bool:
- """Check if URL passes URL_ALLOWLIST and URL_DENYLIST config filters.
- Uses proper config hierarchy: defaults -> file -> env -> machine -> user -> crawl -> snapshot
- """
- import re
- from archivebox.config.configset import get_config
- # Get merged config with proper hierarchy
- config = get_config(
- user=self.created_by,
- crawl=self.snapshot.crawl,
- snapshot=self.snapshot,
- )
- # Get allowlist/denylist (can be string or list)
- allowlist_raw = config.get('URL_ALLOWLIST', '')
- denylist_raw = config.get('URL_DENYLIST', '')
- # Normalize to list of patterns
- def to_pattern_list(value):
- if isinstance(value, list):
- return value
- if isinstance(value, str):
- return [p.strip() for p in value.split(',') if p.strip()]
- return []
- allowlist = to_pattern_list(allowlist_raw)
- denylist = to_pattern_list(denylist_raw)
- # Denylist takes precedence
- if denylist:
- for pattern in denylist:
- try:
- if re.search(pattern, url):
- return False
- except re.error:
- continue # Skip invalid regex patterns
- # If allowlist exists, URL must match at least one pattern
- if allowlist:
- for pattern in allowlist:
- try:
- if re.search(pattern, url):
- return True
- except re.error:
- continue # Skip invalid regex patterns
- return False # No allowlist patterns matched
- return True # No filters or passed filters
- @property
- def output_dir(self) -> Path:
- """Get the output directory for this plugin's results."""
- return Path(self.snapshot.output_dir) / self.plugin
- def is_background_hook(self) -> bool:
- """Check if this ArchiveResult is for a background hook."""
- plugin_dir = Path(self.pwd) if self.pwd else None
- if not plugin_dir:
- return False
- pid_file = plugin_dir / 'hook.pid'
- return pid_file.exists()
- # =============================================================================
- # ArchiveResult State Machine
- # =============================================================================
- class ArchiveResultMachine(BaseStateMachine, strict_states=True):
- """
- State machine for managing ArchiveResult (single plugin execution) lifecycle.
- Hook Lifecycle:
- ┌─────────────────────────────────────────────────────────────┐
- │ QUEUED State │
- │ • Waiting for its turn to run │
- └─────────────────────────────────────────────────────────────┘
- ↓ tick() when can_start()
- ┌─────────────────────────────────────────────────────────────┐
- │ STARTED State → enter_started() │
- │ 1. archiveresult.run() │
- │ • Find specific hook by hook_name │
- │ • run_hook(script, output_dir, ...) → subprocess │
- │ │
- │ 2a. FOREGROUND hook (returns HookResult): │
- │ • update_from_output() immediately │
- │ - Read stdout.log │
- │ - Parse JSONL records │
- │ - Extract 'ArchiveResult' record → update status │
- │ - Walk output_dir → populate output_files │
- │ - Call process_hook_records() for side effects │
- │ │
- │ 2b. BACKGROUND hook (returns None): │
- │ • Status stays STARTED │
- │ • Continues running in background │
- │ • Killed by Snapshot.cleanup() when sealed │
- └─────────────────────────────────────────────────────────────┘
- ↓ tick() checks status
- ┌─────────────────────────────────────────────────────────────┐
- │ SUCCEEDED / FAILED / SKIPPED / BACKOFF │
- │ • Set by hook's JSONL output during update_from_output() │
- │ • Health stats incremented (num_uses_succeeded/failed) │
- │ • Parent Snapshot health stats also updated │
- └─────────────────────────────────────────────────────────────┘
- https://github.com/ArchiveBox/ArchiveBox/wiki/ArchiveBox-Architecture-Diagrams
- """
- model_attr_name = 'archiveresult'
- # States
- queued = State(value=ArchiveResult.StatusChoices.QUEUED, initial=True)
- started = State(value=ArchiveResult.StatusChoices.STARTED)
- backoff = State(value=ArchiveResult.StatusChoices.BACKOFF)
- succeeded = State(value=ArchiveResult.StatusChoices.SUCCEEDED, final=True)
- failed = State(value=ArchiveResult.StatusChoices.FAILED, final=True)
- skipped = State(value=ArchiveResult.StatusChoices.SKIPPED, final=True)
- # Tick Event - transitions based on conditions
- tick = (
- queued.to.itself(unless='can_start') |
- queued.to(started, cond='can_start') |
- started.to.itself(unless='is_finished') |
- started.to(succeeded, cond='is_succeeded') |
- started.to(failed, cond='is_failed') |
- started.to(skipped, cond='is_skipped') |
- started.to(backoff, cond='is_backoff') |
- backoff.to.itself(unless='can_start') |
- backoff.to(started, cond='can_start') |
- backoff.to(succeeded, cond='is_succeeded') |
- backoff.to(failed, cond='is_failed') |
- backoff.to(skipped, cond='is_skipped')
- )
- def can_start(self) -> bool:
- if not self.archiveresult.snapshot.url:
- return False
- # Check if snapshot has exceeded MAX_URL_ATTEMPTS failed results
- from archivebox.config.configset import get_config
- config = get_config(
- crawl=self.archiveresult.snapshot.crawl,
- snapshot=self.archiveresult.snapshot,
- )
- max_attempts = config.get('MAX_URL_ATTEMPTS', 50)
- # Count failed ArchiveResults for this snapshot (any plugin type)
- failed_count = self.archiveresult.snapshot.archiveresult_set.filter(
- status=ArchiveResult.StatusChoices.FAILED
- ).count()
- if failed_count >= max_attempts:
- # Mark this result as skipped since we've hit the limit
- self.archiveresult.status = ArchiveResult.StatusChoices.SKIPPED
- self.archiveresult.output_str = f'Skipped: snapshot exceeded MAX_URL_ATTEMPTS ({max_attempts} failures)'
- self.archiveresult.retry_at = None
- self.archiveresult.save()
- return False
- return True
- def is_succeeded(self) -> bool:
- """Check if extractor plugin succeeded (status was set by run())."""
- return self.archiveresult.status == ArchiveResult.StatusChoices.SUCCEEDED
- def is_failed(self) -> bool:
- """Check if extractor plugin failed (status was set by run())."""
- return self.archiveresult.status == ArchiveResult.StatusChoices.FAILED
- def is_skipped(self) -> bool:
- """Check if extractor plugin was skipped (status was set by run())."""
- return self.archiveresult.status == ArchiveResult.StatusChoices.SKIPPED
- def is_backoff(self) -> bool:
- """Check if we should backoff and retry later."""
- # Backoff if status is still started (plugin didn't complete) and output_str is empty
- return (
- self.archiveresult.status == ArchiveResult.StatusChoices.STARTED and
- not self.archiveresult.output_str
- )
- def is_finished(self) -> bool:
- """Check if extraction has completed (success, failure, or skipped)."""
- return self.archiveresult.status in (
- ArchiveResult.StatusChoices.SUCCEEDED,
- ArchiveResult.StatusChoices.FAILED,
- ArchiveResult.StatusChoices.SKIPPED,
- )
- @queued.enter
- def enter_queued(self):
- self.archiveresult.update_and_requeue(
- retry_at=timezone.now(),
- status=ArchiveResult.StatusChoices.QUEUED,
- start_ts=None,
- ) # bump the snapshot's retry_at so they pickup any new changes
- @started.enter
- def enter_started(self):
- from archivebox.machine.models import NetworkInterface
- # Update Process with network interface
- if self.archiveresult.process_id:
- self.archiveresult.process.iface = NetworkInterface.current()
- self.archiveresult.process.save()
- # Lock the object and mark start time
- self.archiveresult.update_and_requeue(
- retry_at=timezone.now() + timedelta(seconds=120), # 2 min timeout for plugin
- status=ArchiveResult.StatusChoices.STARTED,
- start_ts=timezone.now(),
- )
- # Run the plugin - this updates status, output, timestamps, etc.
- self.archiveresult.run()
- # Save the updated result
- self.archiveresult.save()
- @backoff.enter
- def enter_backoff(self):
- self.archiveresult.update_and_requeue(
- retry_at=timezone.now() + timedelta(seconds=60),
- status=ArchiveResult.StatusChoices.BACKOFF,
- end_ts=None,
- )
- @succeeded.enter
- def enter_succeeded(self):
- self.archiveresult.update_and_requeue(
- retry_at=None,
- status=ArchiveResult.StatusChoices.SUCCEEDED,
- end_ts=timezone.now(),
- )
- # Update health stats for ArchiveResult, Snapshot, and Crawl cascade
- self.archiveresult.cascade_health_update(success=True)
- @failed.enter
- def enter_failed(self):
- self.archiveresult.update_and_requeue(
- retry_at=None,
- status=ArchiveResult.StatusChoices.FAILED,
- end_ts=timezone.now(),
- )
- # Update health stats for ArchiveResult, Snapshot, and Crawl cascade
- self.archiveresult.cascade_health_update(success=False)
- @skipped.enter
- def enter_skipped(self):
- self.archiveresult.update_and_requeue(
- retry_at=None,
- status=ArchiveResult.StatusChoices.SKIPPED,
- end_ts=timezone.now(),
- )
- def after_transition(self, event: str, source: State, target: State):
- self.archiveresult.snapshot.update_and_requeue() # bump snapshot retry time so it picks up all the new changes
- # =============================================================================
- # State Machine Registration
- # =============================================================================
- # Manually register state machines with python-statemachine registry
- # (normally auto-discovered from statemachines.py, but we define them here for clarity)
- registry.register(SnapshotMachine)
- registry.register(ArchiveResultMachine)
|