schema.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479
  1. """
  2. WARNING: THIS FILE IS ALL LEGACY CODE TO BE REMOVED.
  3. DO NOT ADD ANY NEW FEATURES TO THIS FILE, NEW CODE GOES HERE: core/models.py
  4. These are the old types we used to use before ArchiveBox v0.4 (before we switched to Django).
  5. """
  6. __package__ = 'archivebox.index'
  7. from pathlib import Path
  8. from datetime import datetime, timezone, timedelta
  9. from typing import List, Dict, Any, Optional, Union
  10. from dataclasses import dataclass, asdict, field, fields
  11. from django.utils.functional import cached_property
  12. from ..system import get_dir_size
  13. from ..util import ts_to_date_str, parse_date
  14. from ..config import OUTPUT_DIR, ARCHIVE_DIR_NAME, FAVICON_PROVIDER
  15. class ArchiveError(Exception):
  16. def __init__(self, message, hints=None):
  17. super().__init__(message)
  18. self.hints = hints
  19. LinkDict = Dict[str, Any]
  20. ArchiveOutput = Union[str, Exception, None]
  21. @dataclass(frozen=True)
  22. class ArchiveResult:
  23. cmd: List[str]
  24. pwd: Optional[str]
  25. cmd_version: Optional[str]
  26. output: ArchiveOutput
  27. status: str
  28. start_ts: datetime
  29. end_ts: datetime
  30. index_texts: Union[List[str], None] = None
  31. schema: str = 'ArchiveResult'
  32. def __post_init__(self):
  33. self.typecheck()
  34. def _asdict(self):
  35. return asdict(self)
  36. def typecheck(self) -> None:
  37. assert self.schema == self.__class__.__name__
  38. assert isinstance(self.status, str) and self.status
  39. assert isinstance(self.start_ts, datetime)
  40. assert isinstance(self.end_ts, datetime)
  41. assert isinstance(self.cmd, list)
  42. assert all(isinstance(arg, str) and arg for arg in self.cmd)
  43. # TODO: replace emptystrings in these three with None / remove them from the DB
  44. assert self.pwd is None or isinstance(self.pwd, str)
  45. assert self.cmd_version is None or isinstance(self.cmd_version, str)
  46. assert self.output is None or isinstance(self.output, (str, Exception))
  47. @classmethod
  48. def guess_ts(_cls, dict_info):
  49. from ..util import parse_date
  50. parsed_timestamp = parse_date(dict_info["timestamp"])
  51. start_ts = parsed_timestamp
  52. end_ts = parsed_timestamp + timedelta(seconds=int(dict_info["duration"]))
  53. return start_ts, end_ts
  54. @classmethod
  55. def from_json(cls, json_info, guess=False):
  56. from ..util import parse_date
  57. info = {
  58. key: val
  59. for key, val in json_info.items()
  60. if key in cls.field_names()
  61. }
  62. if guess:
  63. keys = info.keys()
  64. if "start_ts" not in keys:
  65. info["start_ts"], info["end_ts"] = cls.guess_ts(json_info)
  66. else:
  67. info['start_ts'] = parse_date(info['start_ts'])
  68. info['end_ts'] = parse_date(info['end_ts'])
  69. if "pwd" not in keys:
  70. info["pwd"] = str(Path(OUTPUT_DIR) / ARCHIVE_DIR_NAME / json_info["timestamp"])
  71. if "cmd_version" not in keys:
  72. info["cmd_version"] = "Undefined"
  73. if "cmd" not in keys:
  74. info["cmd"] = []
  75. else:
  76. info['start_ts'] = parse_date(info['start_ts'])
  77. info['end_ts'] = parse_date(info['end_ts'])
  78. info['cmd_version'] = info.get('cmd_version')
  79. if type(info["cmd"]) is str:
  80. info["cmd"] = [info["cmd"]]
  81. return cls(**info)
  82. def to_dict(self, *keys) -> dict:
  83. if keys:
  84. return {k: v for k, v in asdict(self).items() if k in keys}
  85. return asdict(self)
  86. def to_json(self, indent=4, sort_keys=True) -> str:
  87. from .json import to_json
  88. return to_json(self, indent=indent, sort_keys=sort_keys)
  89. def to_csv(self, cols: Optional[List[str]]=None, separator: str=',', ljust: int=0) -> str:
  90. from .csv import to_csv
  91. return to_csv(self, csv_col=cols or self.field_names(), separator=separator, ljust=ljust)
  92. @classmethod
  93. def field_names(cls):
  94. return [f.name for f in fields(cls)]
  95. @property
  96. def duration(self) -> int:
  97. return (self.end_ts - self.start_ts).seconds
  98. @dataclass(frozen=True)
  99. class Link:
  100. timestamp: str
  101. url: str
  102. title: Optional[str]
  103. tags: Optional[str]
  104. sources: List[str]
  105. history: Dict[str, List[ArchiveResult]] = field(default_factory=lambda: {})
  106. updated: Optional[datetime] = None
  107. schema: str = 'Link'
  108. def __str__(self) -> str:
  109. return f'[{self.timestamp}] {self.url} "{self.title}"'
  110. def __post_init__(self):
  111. self.typecheck()
  112. def overwrite(self, **kwargs):
  113. """pure functional version of dict.update that returns a new instance"""
  114. return Link(**{**self._asdict(), **kwargs})
  115. def __eq__(self, other):
  116. if not isinstance(other, Link):
  117. return NotImplemented
  118. return self.url == other.url
  119. def __gt__(self, other):
  120. if not isinstance(other, Link):
  121. return NotImplemented
  122. if not self.timestamp or not other.timestamp:
  123. return
  124. return float(self.timestamp) > float(other.timestamp)
  125. def typecheck(self) -> None:
  126. from ..config import stderr, ANSI
  127. try:
  128. assert self.schema == self.__class__.__name__
  129. assert isinstance(self.timestamp, str) and self.timestamp
  130. assert self.timestamp.replace('.', '').isdigit()
  131. assert isinstance(self.url, str) and '://' in self.url
  132. assert self.updated is None or isinstance(self.updated, datetime)
  133. assert self.title is None or (isinstance(self.title, str) and self.title)
  134. assert self.tags is None or isinstance(self.tags, str)
  135. assert isinstance(self.sources, list)
  136. assert all(isinstance(source, str) and source for source in self.sources)
  137. assert isinstance(self.history, dict)
  138. for method, results in self.history.items():
  139. assert isinstance(method, str) and method
  140. assert isinstance(results, list)
  141. assert all(isinstance(result, ArchiveResult) for result in results)
  142. except Exception:
  143. stderr('{red}[X] Error while loading link! [{}] {} "{}"{reset}'.format(self.timestamp, self.url, self.title, **ANSI))
  144. raise
  145. def _asdict(self, extended=False):
  146. info = {
  147. 'schema': 'Link',
  148. 'url': self.url,
  149. 'title': self.title or None,
  150. 'timestamp': self.timestamp,
  151. 'updated': self.updated or None,
  152. 'tags': self.tags or None,
  153. 'sources': self.sources or [],
  154. 'history': self.history or {},
  155. }
  156. if extended:
  157. info.update({
  158. 'snapshot_id': self.snapshot_id,
  159. 'snapshot_old_id': self.snapshot_old_id,
  160. 'snapshot_abid': self.snapshot_abid,
  161. 'link_dir': self.link_dir,
  162. 'archive_path': self.archive_path,
  163. 'hash': self.url_hash,
  164. 'base_url': self.base_url,
  165. 'scheme': self.scheme,
  166. 'domain': self.domain,
  167. 'path': self.path,
  168. 'basename': self.basename,
  169. 'extension': self.extension,
  170. 'is_static': self.is_static,
  171. 'tags_str': (self.tags or '').strip(','), # only used to render static index in index/html.py, remove if no longer needed there
  172. 'icons': None, # only used to render static index in index/html.py, remove if no longer needed there
  173. 'bookmarked_date': self.bookmarked_date,
  174. 'updated_date': self.updated_date,
  175. 'oldest_archive_date': self.oldest_archive_date,
  176. 'newest_archive_date': self.newest_archive_date,
  177. 'is_archived': self.is_archived,
  178. 'num_outputs': self.num_outputs,
  179. 'num_failures': self.num_failures,
  180. 'latest': self.latest_outputs(),
  181. 'canonical': self.canonical_outputs(),
  182. })
  183. return info
  184. def as_snapshot(self):
  185. from core.models import Snapshot
  186. return Snapshot.objects.get(url=self.url)
  187. @classmethod
  188. def from_json(cls, json_info, guess=False):
  189. from ..util import parse_date
  190. info = {
  191. key: val
  192. for key, val in json_info.items()
  193. if key in cls.field_names()
  194. }
  195. info['updated'] = parse_date(info.get('updated'))
  196. info['sources'] = info.get('sources') or []
  197. json_history = info.get('history') or {}
  198. cast_history = {}
  199. for method, method_history in json_history.items():
  200. cast_history[method] = []
  201. for json_result in method_history:
  202. assert isinstance(json_result, dict), 'Items in Link["history"][method] must be dicts'
  203. cast_result = ArchiveResult.from_json(json_result, guess)
  204. cast_history[method].append(cast_result)
  205. info['history'] = cast_history
  206. return cls(**info)
  207. def to_json(self, indent=4, sort_keys=True) -> str:
  208. from .json import to_json
  209. return to_json(self, indent=indent, sort_keys=sort_keys)
  210. def to_csv(self, cols: Optional[List[str]]=None, separator: str=',', ljust: int=0) -> str:
  211. from .csv import to_csv
  212. return to_csv(self, cols=cols or self.field_names(), separator=separator, ljust=ljust)
  213. @cached_property
  214. def snapshot(self):
  215. from core.models import Snapshot
  216. return Snapshot.objects.only('id', 'old_id', 'abid').get(url=self.url)
  217. @cached_property
  218. def snapshot_id(self):
  219. return str(self.snapshot.pk)
  220. @cached_property
  221. def snapshot_old_id(self):
  222. return str(self.snapshot.old_id)
  223. @cached_property
  224. def snapshot_abid(self):
  225. return str(self.snapshot.ABID)
  226. @classmethod
  227. def field_names(cls):
  228. return [f.name for f in fields(cls)]
  229. @property
  230. def link_dir(self) -> str:
  231. from ..config import CONFIG
  232. return str(Path(CONFIG['ARCHIVE_DIR']) / self.timestamp)
  233. @property
  234. def archive_path(self) -> str:
  235. from ..config import ARCHIVE_DIR_NAME
  236. return '{}/{}'.format(ARCHIVE_DIR_NAME, self.timestamp)
  237. @property
  238. def archive_size(self) -> float:
  239. try:
  240. return get_dir_size(self.archive_path)[0]
  241. except Exception:
  242. return 0
  243. ### URL Helpers
  244. @property
  245. def url_hash(self):
  246. from ..util import hashurl
  247. return hashurl(self.url)
  248. @property
  249. def scheme(self) -> str:
  250. from ..util import scheme
  251. return scheme(self.url)
  252. @property
  253. def extension(self) -> str:
  254. from ..util import extension
  255. return extension(self.url)
  256. @property
  257. def domain(self) -> str:
  258. from ..util import domain
  259. return domain(self.url)
  260. @property
  261. def path(self) -> str:
  262. from ..util import path
  263. return path(self.url)
  264. @property
  265. def basename(self) -> str:
  266. from ..util import basename
  267. return basename(self.url)
  268. @property
  269. def base_url(self) -> str:
  270. from ..util import base_url
  271. return base_url(self.url)
  272. ### Pretty Printing Helpers
  273. @property
  274. def bookmarked_date(self) -> Optional[str]:
  275. max_ts = (datetime.now(timezone.utc) + timedelta(days=30)).timestamp()
  276. if self.timestamp and self.timestamp.replace('.', '').isdigit():
  277. if 0 < float(self.timestamp) < max_ts:
  278. return ts_to_date_str(datetime.fromtimestamp(float(self.timestamp)))
  279. else:
  280. return str(self.timestamp)
  281. return None
  282. @property
  283. def updated_date(self) -> Optional[str]:
  284. return ts_to_date_str(self.updated) if self.updated else None
  285. @property
  286. def archive_dates(self) -> List[datetime]:
  287. return [
  288. parse_date(result.start_ts)
  289. for method in self.history.keys()
  290. for result in self.history[method]
  291. ]
  292. @property
  293. def oldest_archive_date(self) -> Optional[datetime]:
  294. return min(self.archive_dates, default=None)
  295. @property
  296. def newest_archive_date(self) -> Optional[datetime]:
  297. return max(self.archive_dates, default=None)
  298. ### Archive Status Helpers
  299. @property
  300. def num_outputs(self) -> int:
  301. return self.as_snapshot().num_outputs
  302. @property
  303. def num_failures(self) -> int:
  304. return sum(1
  305. for method in self.history.keys()
  306. for result in self.history[method]
  307. if result.status == 'failed')
  308. @property
  309. def is_static(self) -> bool:
  310. from ..util import is_static_file
  311. return is_static_file(self.url)
  312. @property
  313. def is_archived(self) -> bool:
  314. from ..config import ARCHIVE_DIR
  315. from ..util import domain
  316. output_paths = (
  317. domain(self.url),
  318. 'output.html',
  319. 'output.pdf',
  320. 'screenshot.png',
  321. 'singlefile.html',
  322. 'readability/content.html',
  323. 'mercury/content.html',
  324. 'htmltotext.txt',
  325. 'media',
  326. 'git',
  327. )
  328. return any(
  329. (Path(ARCHIVE_DIR) / self.timestamp / path).exists()
  330. for path in output_paths
  331. )
  332. def latest_outputs(self, status: str=None) -> Dict[str, ArchiveOutput]:
  333. """get the latest output that each archive method produced for link"""
  334. ARCHIVE_METHODS = (
  335. 'title', 'favicon', 'wget', 'warc', 'singlefile', 'pdf',
  336. 'screenshot', 'dom', 'git', 'media', 'archive_org',
  337. )
  338. latest: Dict[str, ArchiveOutput] = {}
  339. for archive_method in ARCHIVE_METHODS:
  340. # get most recent succesful result in history for each archive method
  341. history = self.history.get(archive_method) or []
  342. history = list(filter(lambda result: result.output, reversed(history)))
  343. if status is not None:
  344. history = list(filter(lambda result: result.status == status, history))
  345. history = list(history)
  346. if history:
  347. latest[archive_method] = history[0].output
  348. else:
  349. latest[archive_method] = None
  350. return latest
  351. def canonical_outputs(self) -> Dict[str, Optional[str]]:
  352. """predict the expected output paths that should be present after archiving"""
  353. from ..extractors.wget import wget_output_path
  354. # TODO: banish this awful duplication from the codebase and import these
  355. # from their respective extractor files
  356. canonical = {
  357. 'index_path': 'index.html',
  358. 'favicon_path': 'favicon.ico',
  359. 'google_favicon_path': FAVICON_PROVIDER.format(self.domain),
  360. 'wget_path': wget_output_path(self),
  361. 'warc_path': 'warc/',
  362. 'singlefile_path': 'singlefile.html',
  363. 'readability_path': 'readability/content.html',
  364. 'mercury_path': 'mercury/content.html',
  365. 'htmltotext_path': 'htmltotext.txt',
  366. 'pdf_path': 'output.pdf',
  367. 'screenshot_path': 'screenshot.png',
  368. 'dom_path': 'output.html',
  369. 'archive_org_path': 'https://web.archive.org/web/{}'.format(self.base_url),
  370. 'git_path': 'git/',
  371. 'media_path': 'media/',
  372. 'headers_path': 'headers.json',
  373. }
  374. if self.is_static:
  375. # static binary files like PDF and images are handled slightly differently.
  376. # they're just downloaded once and aren't archived separately multiple times,
  377. # so the wget, screenshot, & pdf urls should all point to the same file
  378. static_path = wget_output_path(self)
  379. canonical.update({
  380. 'title': self.basename,
  381. 'wget_path': static_path,
  382. 'pdf_path': static_path,
  383. 'screenshot_path': static_path,
  384. 'dom_path': static_path,
  385. 'singlefile_path': static_path,
  386. 'readability_path': static_path,
  387. 'mercury_path': static_path,
  388. 'htmltotext_path': static_path,
  389. })
  390. return canonical