| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353 |
- __package__ = 'archivebox.core'
- import uuid
- from pathlib import Path
- from typing import Dict, Optional, List
- from datetime import datetime, timedelta
- from collections import defaultdict
- from django.db import models, transaction
- from django.utils.functional import cached_property
- from django.utils.text import slugify
- from django.db.models import Case, When, Value, IntegerField
- from ..util import parse_date
- from ..index.schema import Link
- from ..config import CONFIG
- from ..system import get_dir_size
- #EXTRACTORS = [(extractor[0], extractor[0]) for extractor in get_default_archive_methods()]
- EXTRACTORS = [("title", "title"), ("wget", "wget")]
- STATUS_CHOICES = [
- ("succeeded", "succeeded"),
- ("failed", "failed"),
- ("skipped", "skipped")
- ]
- try:
- JSONField = models.JSONField
- except AttributeError:
- import jsonfield
- JSONField = jsonfield.JSONField
- class Tag(models.Model):
- """
- Based on django-taggit model
- """
- name = models.CharField(verbose_name="name", unique=True, blank=False, max_length=100)
- slug = models.SlugField(verbose_name="slug", unique=True, max_length=100)
- class Meta:
- verbose_name = "Tag"
- verbose_name_plural = "Tags"
- def __str__(self):
- return self.name
- def slugify(self, tag, i=None):
- slug = slugify(tag)
- if i is not None:
- slug += "_%d" % i
- return slug
- def save(self, *args, **kwargs):
- if self._state.adding and not self.slug:
- self.slug = self.slugify(self.name)
- with transaction.atomic():
- slugs = set(
- type(self)
- ._default_manager.filter(slug__startswith=self.slug)
- .values_list("slug", flat=True)
- )
- i = None
- while True:
- slug = self.slugify(self.name, i)
- if slug not in slugs:
- self.slug = slug
- return super().save(*args, **kwargs)
- i = 1 if i is None else i+1
- else:
- return super().save(*args, **kwargs)
- class Snapshot(models.Model):
- id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
- url = models.URLField(unique=True)
- timestamp = models.CharField(max_length=32, unique=True, db_index=True)
- title = models.CharField(max_length=128, null=True, blank=True, db_index=True)
- added = models.DateTimeField(auto_now_add=True, db_index=True)
- updated = models.DateTimeField(null=True, blank=True, db_index=True)
- tags = models.ManyToManyField(Tag)
- keys = ('id', 'url', 'timestamp', 'title', 'tags', 'updated', 'base_url')
- def __repr__(self) -> str:
- title = self.title or '-'
- return f'[{self.timestamp}] {self.url[:64]} ({title[:64]})'
- def __str__(self) -> str:
- title = self.title or '-'
- return f'[{self.timestamp}] {self.url[:64]} ({title[:64]})'
- def field_names():
- fields = self._meta.get_field_names()
- exclude = ["tags", "archiveresult"] # Exclude relationships for now
- return [field.name for field in fields if field.name not in exclude]
- @classmethod
- def from_json(cls, info: dict):
- info = {k: v for k, v in info.items() if k in cls.keys}
- if "tags" in info:
- # TODO: Handle tags
- info.pop("tags")
- info.pop("base_url", None)
- return cls(**info)
- def get_history(self) -> dict:
- """
- Generates the history dictionary out of the stored ArchiveResults
- """
- history_list = self.archiveresult_set.all()
- history = defaultdict(list)
- for history_item in history_list:
- history[history_item.extractor].append(
- {
- "cmd": history_item.cmd,
- "cmd_version": history_item.cmd_version,
- "end_ts": history_item.end_ts.isoformat(),
- "start_ts": history_item.start_ts.isoformat(),
- "pwd": history_item.pwd,
- "output": history_item.output,
- "schema": "ArchiveResult",
- "status": history_item.status
- }
- )
- return dict(history)
- def as_json(self, *args) -> dict:
- """
- Returns the snapshot in json format.
- id is converted to str
- history is extracted from ArchiveResult
- """
- args = args or self.keys
- output = {
- key: getattr(self, key)
- if key != 'tags' else self.tags_str()
- for key in args
- }
- if "id" in output.keys():
- output["id"] = str(output["id"])
- output["history"] = self.get_history()
- return output
- def as_csv(self, cols: Optional[List[str]]=None, separator: str=',', ljust: int=0) -> str:
- from ..index.csv import to_csv
- return to_csv(self, cols=cols or self.field_names(), separator=separator, ljust=ljust)
- def as_link(self) -> Link:
- return Link.from_json(self.as_json())
- def as_link_with_details(self) -> Link:
- from ..index import load_link_details
- return load_link_details(self.as_link())
- def tags_str(self) -> str:
- return ','.join(self.tags.order_by('name').values_list('name', flat=True))
- @cached_property
- def bookmarked(self):
- return parse_date(self.timestamp)
- @cached_property
- def bookmarked_date(self) -> Optional[str]:
- from ..util import ts_to_date
- max_ts = (datetime.now() + timedelta(days=30)).timestamp()
- if self.timestamp and self.timestamp.replace('.', '').isdigit():
- if 0 < float(self.timestamp) < max_ts:
- return ts_to_date(datetime.fromtimestamp(float(self.timestamp)))
- else:
- return str(self.timestamp)
- return None
- @cached_property
- def is_archived(self) -> bool:
- from ..config import ARCHIVE_DIR
- from ..util import domain
- output_paths = (
- domain(self.url),
- 'output.pdf',
- 'screenshot.png',
- 'output.html',
- 'media',
- 'singlefile.html'
- )
- return any(
- (Path(ARCHIVE_DIR) / self.timestamp / path).exists()
- for path in output_paths
- )
- @cached_property
- def archive_dates(self) -> List[datetime]:
- return [
- result.start_ts
- for result in self.archiveresult_set.all()
- ]
- @cached_property
- def oldest_archive_date(self) -> Optional[datetime]:
- oldest = self.archiveresult_set.all().order_by("-start_ts")[:1]
- if len(oldest) > 0:
- return oldest[0].start_ts
- @cached_property
- def num_outputs(self):
- return self.archiveresult_set.filter(status='succeeded').count()
- @cached_property
- def url_hash(self):
- return self.as_link().url_hash
- @cached_property
- def base_url(self) -> str:
- from ..util import base_url
- return base_url(self.url)
- @cached_property
- def snapshot_dir(self):
- from ..config import CONFIG
- return Path(CONFIG['ARCHIVE_DIR']) / self.timestamp
- @cached_property
- def archive_path(self):
- from ..config import ARCHIVE_DIR_NAME
- return '{}/{}'.format(ARCHIVE_DIR_NAME, self.timestamp)
- @cached_property
- def archive_size(self) -> float:
- try:
- return get_dir_size(self.archive_path)[0]
- except Exception:
- return 0
- @cached_property
- def history(self):
- # TODO: use ArchiveResult for this instead of json
- return self.as_link_with_details().history
- @cached_property
- def latest_title(self):
- if ('title' in self.history
- and self.history['title']
- and (self.history['title'][-1].status == 'succeeded')
- and self.history['title'][-1].output.strip()):
- return self.history['title'][-1].output.strip()
- return None
- @cached_property
- def domain(self) -> str:
- from ..util import domain
- return domain(self.url)
- @cached_property
- def is_static(self) -> bool:
- from ..util import is_static_file
- return is_static_file(self.url)
- @cached_property
- def details(self) -> Dict:
- # TODO: Define what details are, and return them accordingly
- return {"history": {}}
- @property
- def extension(self) -> str:
- from ..util import extension
- return extension(self.url)
- def canonical_outputs(self) -> Dict[str, Optional[str]]:
- """predict the expected output paths that should be present after archiving"""
- from ..extractors.wget import wget_output_path
- canonical = {
- 'index_path': 'index.html',
- 'favicon_path': 'favicon.ico',
- 'google_favicon_path': 'https://www.google.com/s2/favicons?domain={}'.format(self.domain),
- 'wget_path': wget_output_path(self),
- 'warc_path': 'warc',
- 'singlefile_path': 'singlefile.html',
- 'readability_path': 'readability/content.html',
- 'mercury_path': 'mercury/content.html',
- 'pdf_path': 'output.pdf',
- 'screenshot_path': 'screenshot.png',
- 'dom_path': 'output.html',
- 'archive_org_path': 'https://web.archive.org/web/{}'.format(self.base_url),
- 'git_path': 'git',
- 'media_path': 'media',
- }
- if self.is_static:
- # static binary files like PDF and images are handled slightly differently.
- # they're just downloaded once and aren't archived separately multiple times,
- # so the wget, screenshot, & pdf urls should all point to the same file
- static_path = wget_output_path(self)
- canonical.update({
- 'title': self.basename,
- 'wget_path': static_path,
- 'pdf_path': static_path,
- 'screenshot_path': static_path,
- 'dom_path': static_path,
- 'singlefile_path': static_path,
- 'readability_path': static_path,
- 'mercury_path': static_path,
- })
- return canonical
- def save_tags(self, tags=()):
- tags_id = []
- for tag in tags:
- tags_id.append(Tag.objects.get_or_create(name=tag)[0].id)
- self.tags.clear()
- self.tags.add(*tags_id)
- class ArchiveResultManager(models.Manager):
- def indexable(self, sorted: bool = True):
- from ..extractors import ARCHIVE_METHODS_INDEXING_PRECEDENCE
- INDEXABLE_METHODS = [ r[0] for r in ARCHIVE_METHODS_INDEXING_PRECEDENCE ]
- qs = self.get_queryset().filter(extractor__in=INDEXABLE_METHODS,status='succeeded')
- if sorted:
- precedence = [ When(extractor=method, then=Value(precedence)) for method, precedence in ARCHIVE_METHODS_INDEXING_PRECEDENCE ]
- qs = qs.annotate(indexing_precedence=Case(*precedence, default=Value(1000),output_field=IntegerField())).order_by('indexing_precedence')
- return qs
- class ArchiveResult(models.Model):
- snapshot = models.ForeignKey(Snapshot, on_delete=models.CASCADE)
- cmd = JSONField()
- pwd = models.CharField(max_length=256)
- cmd_version = models.CharField(max_length=32, default=None, null=True, blank=True)
- output = models.CharField(max_length=512)
- start_ts = models.DateTimeField()
- end_ts = models.DateTimeField()
- status = models.CharField(max_length=16, choices=STATUS_CHOICES)
- extractor = models.CharField(choices=EXTRACTORS, max_length=32)
- objects = ArchiveResultManager()
- def __str__(self):
- return self.extractor
- class Meta:
- ordering = ["-start_ts"]
|