| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531 |
- __package__ = 'archivebox.core'
- from typing import Optional, List, Dict, Iterable
- from django_stubs_ext.db.models import TypedModelMeta
- import json
- import random
- import uuid
- from uuid import uuid4
- from pathlib import Path
- from django.db import models
- from django.utils.functional import cached_property
- from django.utils.text import slugify
- from django.core.cache import cache
- from django.urls import reverse, reverse_lazy
- from django.db.models import Case, When, Value, IntegerField
- from django.conf import settings
- from abid_utils.models import ABIDModel, ABIDField
- from ..system import get_dir_size
- from ..util import parse_date, base_url
- from ..index.schema import Link
- from ..index.html import snapshot_icons
- from ..extractors import ARCHIVE_METHODS_INDEXING_PRECEDENCE, EXTRACTORS
- def rand_int_id():
- return random.getrandbits(32)
- # class BaseModel(models.Model):
- # # TODO: migrate all models to a shared base class with all our standard fields and helpers:
- # # ulid/created/modified/owner/is_deleted/as_json/from_json/etc.
- # #
- # # id = models.AutoField(primary_key=True, serialize=False, verbose_name='ID')
- # # ulid = models.CharField(max_length=26, null=True, blank=True, db_index=True, unique=True)
- # class Meta(TypedModelMeta):
- # abstract = True
- class Tag(ABIDModel):
- """
- Based on django-taggit model + ABID base.
- """
- abid_prefix = 'tag_'
- abid_ts_src = 'self.created' # TODO: add created/modified time
- abid_uri_src = 'self.slug'
- abid_subtype_src = '"03"'
- abid_rand_src = 'self.old_id'
- old_id = models.BigIntegerField(unique=True, default=rand_int_id, serialize=False, verbose_name='Old ID') # legacy PK
- id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False, unique=True)
- abid = ABIDField(prefix=abid_prefix)
- name = models.CharField(unique=True, blank=False, max_length=100)
- slug = models.SlugField(unique=True, blank=False, max_length=100, editable=False)
- # slug is autoset on save from name, never set it manually
- snapshot_set: models.Manager['Snapshot']
- class Meta(TypedModelMeta):
- verbose_name = "Tag"
- verbose_name_plural = "Tags"
- def __str__(self):
- return self.name
- # @property
- # def old_id(self):
- # return self.id
- def slugify(self, tag, i=None):
- slug = slugify(tag)
- if i is not None:
- slug += "_%d" % i
- return slug
- def save(self, *args, **kwargs):
- if self._state.adding and not self.slug:
- self.slug = self.slugify(self.name)
- # if name is different but slug conficts with another tags slug, append a counter
- # with transaction.atomic():
- slugs = set(
- type(self)
- ._default_manager.filter(slug__startswith=self.slug)
- .values_list("slug", flat=True)
- )
- i = None
- while True:
- slug = self.slugify(self.name, i)
- if slug not in slugs:
- self.slug = slug
- return super().save(*args, **kwargs)
- i = 1 if i is None else i+1
- else:
- return super().save(*args, **kwargs)
-
- @property
- def api_url(self) -> str:
- # /api/v1/core/snapshot/{uulid}
- return reverse_lazy('api-1:get_tag', args=[self.abid])
- @property
- def api_docs_url(self) -> str:
- return f'/api/v1/docs#/Core%20Models/api_v1_core_get_tag'
- class SnapshotTag(models.Model):
- id = models.AutoField(primary_key=True)
- snapshot = models.ForeignKey('Snapshot', db_column='snapshot_id', on_delete=models.CASCADE, to_field='id')
- tag = models.ForeignKey(Tag, db_column='tag_id', on_delete=models.CASCADE, to_field='id')
- class Meta:
- db_table = 'core_snapshot_tags'
- unique_together = [('snapshot', 'tag')]
- class SnapshotManager(models.Manager):
- def get_queryset(self):
- return super().get_queryset().prefetch_related('tags', 'archiveresult_set') # .annotate(archiveresult_count=models.Count('archiveresult')).distinct()
- class Snapshot(ABIDModel):
- abid_prefix = 'snp_'
- abid_ts_src = 'self.added'
- abid_uri_src = 'self.url'
- abid_subtype_src = '"01"'
- abid_rand_src = 'self.old_id'
- old_id = models.UUIDField(default=uuid.uuid4, editable=False, unique=True) # legacy pk
- id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=True, unique=True)
- abid = ABIDField(prefix=abid_prefix)
- url = models.URLField(unique=True, db_index=True)
- timestamp = models.CharField(max_length=32, unique=True, db_index=True, editable=False)
- title = models.CharField(max_length=512, null=True, blank=True, db_index=True)
-
- tags = models.ManyToManyField(Tag, blank=True, through=SnapshotTag, related_name='snapshot_set', through_fields=('snapshot', 'tag'))
- added = models.DateTimeField(auto_now_add=True, db_index=True)
- updated = models.DateTimeField(auto_now=True, blank=True, null=True, db_index=True)
- keys = ('url', 'timestamp', 'title', 'tags', 'updated')
- archiveresult_set: models.Manager['ArchiveResult']
- objects = SnapshotManager()
- def __repr__(self) -> str:
- title = (self.title_stripped or '-')[:64]
- return f'[{self.timestamp}] {self.url[:64]} ({title})'
- def __str__(self) -> str:
- title = (self.title_stripped or '-')[:64]
- return f'[{self.timestamp}] {self.url[:64]} ({title})'
- @classmethod
- def from_json(cls, info: dict):
- info = {k: v for k, v in info.items() if k in cls.keys}
- return cls(**info)
- def as_json(self, *args) -> dict:
- args = args or self.keys
- return {
- key: getattr(self, key) if key != 'tags' else self.tags_str(nocache=False)
- for key in args
- }
- def as_link(self) -> Link:
- return Link.from_json(self.as_json())
- def as_link_with_details(self) -> Link:
- from ..index import load_link_details
- return load_link_details(self.as_link())
- def tags_str(self, nocache=True) -> str | None:
- calc_tags_str = lambda: ','.join(sorted(tag.name for tag in self.tags.all()))
- cache_key = f'{self.pk}-{(self.updated or self.added).timestamp()}-tags'
-
- if hasattr(self, '_prefetched_objects_cache') and 'tags' in self._prefetched_objects_cache:
- # tags are pre-fetched already, use them directly (best because db is always freshest)
- tags_str = calc_tags_str()
- return tags_str
-
- if nocache:
- tags_str = calc_tags_str()
- cache.set(cache_key, tags_str)
- return tags_str
- return cache.get_or_set(cache_key, calc_tags_str)
- def icons(self) -> str:
- return snapshot_icons(self)
-
- @property
- def api_url(self) -> str:
- # /api/v1/core/snapshot/{uulid}
- return reverse_lazy('api-1:get_snapshot', args=[self.abid])
-
- @property
- def api_docs_url(self) -> str:
- return f'/api/v1/docs#/Core%20Models/api_v1_core_get_snapshot'
-
- @cached_property
- def title_stripped(self) -> str:
- return (self.title or '').replace("\n", " ").replace("\r", "")
- @cached_property
- def extension(self) -> str:
- from ..util import extension
- return extension(self.url)
- @cached_property
- def bookmarked(self):
- return parse_date(self.timestamp)
- @cached_property
- def bookmarked_date(self):
- # TODO: remove this
- return self.bookmarked
- @cached_property
- def is_archived(self):
- return self.as_link().is_archived
- @cached_property
- def num_outputs(self) -> int:
- # DONT DO THIS: it will trigger a separate query for every snapshot
- # return self.archiveresult_set.filter(status='succeeded').count()
- # this is better:
- return sum((1 for result in self.archiveresult_set.all() if result.status == 'succeeded'))
- @cached_property
- def base_url(self):
- return base_url(self.url)
- @cached_property
- def link_dir(self):
- return str(settings.CONFIG.ARCHIVE_DIR / self.timestamp)
- @cached_property
- def archive_path(self):
- return '{}/{}'.format(settings.CONFIG.ARCHIVE_DIR_NAME, self.timestamp)
- @cached_property
- def archive_size(self):
- cache_key = f'{str(self.pk)[:12]}-{(self.updated or self.added).timestamp()}-size'
- def calc_dir_size():
- try:
- return get_dir_size(self.link_dir)[0]
- except Exception:
- return 0
- return cache.get_or_set(cache_key, calc_dir_size)
- @cached_property
- def thumbnail_url(self) -> Optional[str]:
- if hasattr(self, '_prefetched_objects_cache') and 'archiveresult_set' in self._prefetched_objects_cache:
- result = (sorted(
- (
- result
- for result in self.archiveresult_set.all()
- if result.extractor == 'screenshot' and result.status =='succeeded' and result.output
- ),
- key=lambda result: result.created,
- ) or [None])[-1]
- else:
- result = self.archiveresult_set.filter(
- extractor='screenshot',
- status='succeeded'
- ).only('output').last()
- if result:
- return reverse('Snapshot', args=[f'{str(self.timestamp)}/{result.output}'])
- return None
- @cached_property
- def headers(self) -> Optional[Dict[str, str]]:
- try:
- return json.loads((Path(self.link_dir) / 'headers.json').read_text(encoding='utf-8').strip())
- except Exception:
- pass
- return None
- @cached_property
- def status_code(self) -> Optional[str]:
- return self.headers.get('Status-Code') if self.headers else None
- @cached_property
- def history(self) -> dict:
- # TODO: use ArchiveResult for this instead of json
- return self.as_link_with_details().history
- @cached_property
- def latest_title(self) -> Optional[str]:
- if self.title:
- return self.title # whoopdedoo that was easy
-
- # check if ArchiveResult set has already been prefetched, if so use it instead of fetching it from db again
- if hasattr(self, '_prefetched_objects_cache') and 'archiveresult_set' in self._prefetched_objects_cache:
- try:
- return (sorted(
- (
- result.output.strip()
- for result in self.archiveresult_set.all()
- if result.extractor == 'title' and result.status =='succeeded' and result.output
- ),
- key=lambda title: len(title),
- ) or [None])[-1]
- except IndexError:
- pass
-
- try:
- # take longest successful title from ArchiveResult db history
- return sorted(
- self.archiveresult_set\
- .filter(extractor='title', status='succeeded', output__isnull=False)\
- .values_list('output', flat=True),
- key=lambda r: len(r),
- )[-1]
- except IndexError:
- pass
- try:
- # take longest successful title from Link json index file history
- return sorted(
- (
- result.output.strip()
- for result in self.history['title']
- if result.status == 'succeeded' and result.output.strip()
- ),
- key=lambda r: len(r),
- )[-1]
- except (KeyError, IndexError):
- pass
- return None
- def save_tags(self, tags: Iterable[str]=()) -> None:
- tags_id = []
- for tag in tags:
- if tag.strip():
- tags_id.append(Tag.objects.get_or_create(name=tag)[0].pk)
- self.tags.clear()
- self.tags.add(*tags_id)
- # def get_storage_dir(self, create=True, symlink=True) -> Path:
- # date_str = self.added.strftime('%Y%m%d')
- # domain_str = domain(self.url)
- # abs_storage_dir = Path(settings.CONFIG.ARCHIVE_DIR) / 'snapshots' / date_str / domain_str / str(self.ulid)
- # if create and not abs_storage_dir.is_dir():
- # abs_storage_dir.mkdir(parents=True, exist_ok=True)
- # if symlink:
- # LINK_PATHS = [
- # Path(settings.CONFIG.ARCHIVE_DIR).parent / 'index' / 'all_by_id' / str(self.ulid),
- # # Path(settings.CONFIG.ARCHIVE_DIR).parent / 'index' / 'snapshots_by_id' / str(self.ulid),
- # Path(settings.CONFIG.ARCHIVE_DIR).parent / 'index' / 'snapshots_by_date' / date_str / domain_str / str(self.ulid),
- # Path(settings.CONFIG.ARCHIVE_DIR).parent / 'index' / 'snapshots_by_domain' / domain_str / date_str / str(self.ulid),
- # ]
- # for link_path in LINK_PATHS:
- # link_path.parent.mkdir(parents=True, exist_ok=True)
- # try:
- # link_path.symlink_to(abs_storage_dir)
- # except FileExistsError:
- # link_path.unlink()
- # link_path.symlink_to(abs_storage_dir)
- # return abs_storage_dir
- class ArchiveResultManager(models.Manager):
- def indexable(self, sorted: bool = True):
- """Return only ArchiveResults containing text suitable for full-text search (sorted in order of typical result quality)"""
-
- INDEXABLE_METHODS = [ r[0] for r in ARCHIVE_METHODS_INDEXING_PRECEDENCE ]
- qs = self.get_queryset().filter(extractor__in=INDEXABLE_METHODS, status='succeeded')
- if sorted:
- precedence = [
- When(extractor=method, then=Value(precedence))
- for method, precedence in ARCHIVE_METHODS_INDEXING_PRECEDENCE
- ]
- qs = qs.annotate(
- indexing_precedence=Case(
- *precedence,
- default=Value(1000),
- output_field=IntegerField()
- )
- ).order_by('indexing_precedence')
- return qs
- class ArchiveResult(ABIDModel):
- abid_prefix = 'res_'
- abid_ts_src = 'self.snapshot.added'
- abid_uri_src = 'self.snapshot.url'
- abid_subtype_src = 'self.extractor'
- abid_rand_src = 'self.old_id'
- EXTRACTOR_CHOICES = (
- ('htmltotext', 'htmltotext'),
- ('git', 'git'),
- ('singlefile', 'singlefile'),
- ('media', 'media'),
- ('archive_org', 'archive_org'),
- ('readability', 'readability'),
- ('mercury', 'mercury'),
- ('favicon', 'favicon'),
- ('pdf', 'pdf'),
- ('headers', 'headers'),
- ('screenshot', 'screenshot'),
- ('dom', 'dom'),
- ('title', 'title'),
- ('wget', 'wget'),
- )
- STATUS_CHOICES = [
- ("succeeded", "succeeded"),
- ("failed", "failed"),
- ("skipped", "skipped")
- ]
- old_id = models.BigIntegerField(default=rand_int_id, serialize=False, verbose_name='Old ID')
- id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=True, unique=True, verbose_name='ID')
- abid = ABIDField(prefix=abid_prefix)
- snapshot = models.ForeignKey(Snapshot, on_delete=models.CASCADE, to_field='id', db_column='snapshot_id')
- extractor = models.CharField(choices=EXTRACTOR_CHOICES, max_length=32)
- cmd = models.JSONField()
- pwd = models.CharField(max_length=256)
- cmd_version = models.CharField(max_length=128, default=None, null=True, blank=True)
- output = models.CharField(max_length=1024)
- start_ts = models.DateTimeField(db_index=True)
- end_ts = models.DateTimeField()
- status = models.CharField(max_length=16, choices=STATUS_CHOICES)
- objects = ArchiveResultManager()
- class Meta(TypedModelMeta):
- verbose_name = 'Archive Result'
- verbose_name_plural = 'Archive Results Log'
-
- def __str__(self):
- return self.extractor
- @cached_property
- def snapshot_dir(self):
- return Path(self.snapshot.link_dir)
- @property
- def api_url(self) -> str:
- # /api/v1/core/archiveresult/{uulid}
- return reverse_lazy('api-1:get_archiveresult', args=[self.abid])
-
- @property
- def api_docs_url(self) -> str:
- return f'/api/v1/docs#/Core%20Models/api_v1_core_get_archiveresult'
- @property
- def extractor_module(self):
- return EXTRACTORS[self.extractor]
- def output_path(self) -> str:
- """return the canonical output filename or directory name within the snapshot dir"""
- return self.extractor_module.get_output_path()
- def embed_path(self) -> str:
- """
- return the actual runtime-calculated path to the file on-disk that
- should be used for user-facing iframe embeds of this result
- """
- if get_embed_path_func := getattr(self.extractor_module, 'get_embed_path', None):
- return get_embed_path_func(self)
- return self.extractor_module.get_output_path()
- def legacy_output_path(self):
- link = self.snapshot.as_link()
- return link.canonical_outputs().get(f'{self.extractor}_path')
- def output_exists(self) -> bool:
- return Path(self.output_path()).exists()
- # def get_storage_dir(self, create=True, symlink=True):
- # date_str = self.snapshot.added.strftime('%Y%m%d')
- # domain_str = domain(self.snapshot.url)
- # abs_storage_dir = Path(settings.CONFIG.ARCHIVE_DIR) / 'results' / date_str / domain_str / self.extractor / str(self.ulid)
- # if create and not abs_storage_dir.is_dir():
- # abs_storage_dir.mkdir(parents=True, exist_ok=True)
- # if symlink:
- # LINK_PATHS = [
- # Path(settings.CONFIG.ARCHIVE_DIR).parent / 'index' / 'all_by_id' / str(self.ulid),
- # # Path(settings.CONFIG.ARCHIVE_DIR).parent / 'index' / 'results_by_id' / str(self.ulid),
- # # Path(settings.CONFIG.ARCHIVE_DIR).parent / 'index' / 'results_by_date' / date_str / domain_str / self.extractor / str(self.ulid),
- # Path(settings.CONFIG.ARCHIVE_DIR).parent / 'index' / 'results_by_domain' / domain_str / date_str / self.extractor / str(self.ulid),
- # Path(settings.CONFIG.ARCHIVE_DIR).parent / 'index' / 'results_by_type' / self.extractor / date_str / domain_str / str(self.ulid),
- # ]
- # for link_path in LINK_PATHS:
- # link_path.parent.mkdir(parents=True, exist_ok=True)
- # try:
- # link_path.symlink_to(abs_storage_dir)
- # except FileExistsError:
- # link_path.unlink()
- # link_path.symlink_to(abs_storage_dir)
- # return abs_storage_dir
- # def symlink_index(self, create=True):
- # abs_result_dir = self.get_storage_dir(create=create)
|