schema.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454
  1. """
  2. WARNING: THIS FILE IS ALL LEGACY CODE TO BE REMOVED.
  3. DO NOT ADD ANY NEW FEATURES TO THIS FILE, NEW CODE GOES HERE: core/models.py
  4. """
  5. __package__ = 'archivebox.index'
  6. from pathlib import Path
  7. from datetime import datetime, timedelta
  8. from typing import List, Dict, Any, Optional, Union
  9. from dataclasses import dataclass, asdict, field, fields
  10. from ..system import get_dir_size
  11. from ..config import OUTPUT_DIR, ARCHIVE_DIR_NAME
  12. class ArchiveError(Exception):
  13. def __init__(self, message, hints=None):
  14. super().__init__(message)
  15. self.hints = hints
  16. LinkDict = Dict[str, Any]
  17. ArchiveOutput = Union[str, Exception, None]
  18. @dataclass(frozen=True)
  19. class ArchiveResult:
  20. cmd: List[str]
  21. pwd: Optional[str]
  22. cmd_version: Optional[str]
  23. output: ArchiveOutput
  24. status: str
  25. start_ts: datetime
  26. end_ts: datetime
  27. index_texts: Union[List[str], None] = None
  28. schema: str = 'ArchiveResult'
  29. def __post_init__(self):
  30. self.typecheck()
  31. def _asdict(self):
  32. return asdict(self)
  33. def typecheck(self) -> None:
  34. assert self.schema == self.__class__.__name__
  35. assert isinstance(self.status, str) and self.status
  36. assert isinstance(self.start_ts, datetime)
  37. assert isinstance(self.end_ts, datetime)
  38. assert isinstance(self.cmd, list)
  39. assert all(isinstance(arg, str) and arg for arg in self.cmd)
  40. # TODO: replace emptystrings in these three with None / remove them from the DB
  41. assert self.pwd is None or isinstance(self.pwd, str)
  42. assert self.cmd_version is None or isinstance(self.cmd_version, str)
  43. assert self.output is None or isinstance(self.output, (str, Exception))
  44. @classmethod
  45. def guess_ts(_cls, dict_info):
  46. from ..util import parse_date
  47. parsed_timestamp = parse_date(dict_info["timestamp"])
  48. start_ts = parsed_timestamp
  49. end_ts = parsed_timestamp + timedelta(seconds=int(dict_info["duration"]))
  50. return start_ts, end_ts
  51. @classmethod
  52. def from_json(cls, json_info, guess=False):
  53. from ..util import parse_date
  54. info = {
  55. key: val
  56. for key, val in json_info.items()
  57. if key in cls.field_names()
  58. }
  59. if guess:
  60. keys = info.keys()
  61. if "start_ts" not in keys:
  62. info["start_ts"], info["end_ts"] = cls.guess_ts(json_info)
  63. else:
  64. info['start_ts'] = parse_date(info['start_ts'])
  65. info['end_ts'] = parse_date(info['end_ts'])
  66. if "pwd" not in keys:
  67. info["pwd"] = str(Path(OUTPUT_DIR) / ARCHIVE_DIR_NAME / json_info["timestamp"])
  68. if "cmd_version" not in keys:
  69. info["cmd_version"] = "Undefined"
  70. if "cmd" not in keys:
  71. info["cmd"] = []
  72. else:
  73. info['start_ts'] = parse_date(info['start_ts'])
  74. info['end_ts'] = parse_date(info['end_ts'])
  75. info['cmd_version'] = info.get('cmd_version')
  76. if type(info["cmd"]) is str:
  77. info["cmd"] = [info["cmd"]]
  78. return cls(**info)
  79. def to_dict(self, *keys) -> dict:
  80. if keys:
  81. return {k: v for k, v in asdict(self).items() if k in keys}
  82. return asdict(self)
  83. def to_json(self, indent=4, sort_keys=True) -> str:
  84. from .json import to_json
  85. return to_json(self, indent=indent, sort_keys=sort_keys)
  86. def to_csv(self, cols: Optional[List[str]]=None, separator: str=',', ljust: int=0) -> str:
  87. from .csv import to_csv
  88. return to_csv(self, csv_col=cols or self.field_names(), separator=separator, ljust=ljust)
  89. @classmethod
  90. def field_names(cls):
  91. return [f.name for f in fields(cls)]
  92. @property
  93. def duration(self) -> int:
  94. return (self.end_ts - self.start_ts).seconds
  95. @dataclass(frozen=True)
  96. class Link:
  97. timestamp: str
  98. url: str
  99. title: Optional[str]
  100. tags: Optional[str]
  101. sources: List[str]
  102. history: Dict[str, List[ArchiveResult]] = field(default_factory=lambda: {})
  103. updated: Optional[datetime] = None
  104. schema: str = 'Link'
  105. def __str__(self) -> str:
  106. return f'[{self.timestamp}] {self.url} "{self.title}"'
  107. def __post_init__(self):
  108. self.typecheck()
  109. def overwrite(self, **kwargs):
  110. """pure functional version of dict.update that returns a new instance"""
  111. return Link(**{**self._asdict(), **kwargs})
  112. def __eq__(self, other):
  113. if not isinstance(other, Link):
  114. return NotImplemented
  115. return self.url == other.url
  116. def __gt__(self, other):
  117. if not isinstance(other, Link):
  118. return NotImplemented
  119. if not self.timestamp or not other.timestamp:
  120. return
  121. return float(self.timestamp) > float(other.timestamp)
  122. def typecheck(self) -> None:
  123. from ..config import stderr, ANSI
  124. try:
  125. assert self.schema == self.__class__.__name__
  126. assert isinstance(self.timestamp, str) and self.timestamp
  127. assert self.timestamp.replace('.', '').isdigit()
  128. assert isinstance(self.url, str) and '://' in self.url
  129. assert self.updated is None or isinstance(self.updated, datetime)
  130. assert self.title is None or (isinstance(self.title, str) and self.title)
  131. assert self.tags is None or isinstance(self.tags, str)
  132. assert isinstance(self.sources, list)
  133. assert all(isinstance(source, str) and source for source in self.sources)
  134. assert isinstance(self.history, dict)
  135. for method, results in self.history.items():
  136. assert isinstance(method, str) and method
  137. assert isinstance(results, list)
  138. assert all(isinstance(result, ArchiveResult) for result in results)
  139. except Exception:
  140. stderr('{red}[X] Error while loading link! [{}] {} "{}"{reset}'.format(self.timestamp, self.url, self.title, **ANSI))
  141. raise
  142. def _asdict(self, extended=False):
  143. info = {
  144. 'schema': 'Link',
  145. 'url': self.url,
  146. 'title': self.title or None,
  147. 'timestamp': self.timestamp,
  148. 'updated': self.updated or None,
  149. 'tags': self.tags or None,
  150. 'sources': self.sources or [],
  151. 'history': self.history or {},
  152. }
  153. if extended:
  154. info.update({
  155. 'link_dir': self.link_dir,
  156. 'archive_path': self.archive_path,
  157. 'hash': self.url_hash,
  158. 'base_url': self.base_url,
  159. 'scheme': self.scheme,
  160. 'domain': self.domain,
  161. 'path': self.path,
  162. 'basename': self.basename,
  163. 'extension': self.extension,
  164. 'is_static': self.is_static,
  165. 'tags_str': self.tags, # only used to render static index in index/html.py, remove if no longer needed there
  166. 'icons': None, # only used to render static index in index/html.py, remove if no longer needed there
  167. 'bookmarked_date': self.bookmarked_date,
  168. 'updated_date': self.updated_date,
  169. 'oldest_archive_date': self.oldest_archive_date,
  170. 'newest_archive_date': self.newest_archive_date,
  171. 'is_archived': self.is_archived,
  172. 'num_outputs': self.num_outputs,
  173. 'num_failures': self.num_failures,
  174. 'latest': self.latest_outputs(),
  175. 'canonical': self.canonical_outputs(),
  176. })
  177. return info
  178. def as_snapshot(self):
  179. from core.models import Snapshot
  180. return Snapshot.objects.get(url=self.url)
  181. @classmethod
  182. def from_json(cls, json_info, guess=False):
  183. from ..util import parse_date
  184. info = {
  185. key: val
  186. for key, val in json_info.items()
  187. if key in cls.field_names()
  188. }
  189. info['updated'] = parse_date(info.get('updated'))
  190. info['sources'] = info.get('sources') or []
  191. json_history = info.get('history') or {}
  192. cast_history = {}
  193. for method, method_history in json_history.items():
  194. cast_history[method] = []
  195. for json_result in method_history:
  196. assert isinstance(json_result, dict), 'Items in Link["history"][method] must be dicts'
  197. cast_result = ArchiveResult.from_json(json_result, guess)
  198. cast_history[method].append(cast_result)
  199. info['history'] = cast_history
  200. return cls(**info)
  201. def to_json(self, indent=4, sort_keys=True) -> str:
  202. from .json import to_json
  203. return to_json(self, indent=indent, sort_keys=sort_keys)
  204. def to_csv(self, cols: Optional[List[str]]=None, separator: str=',', ljust: int=0) -> str:
  205. from .csv import to_csv
  206. return to_csv(self, cols=cols or self.field_names(), separator=separator, ljust=ljust)
  207. @classmethod
  208. def field_names(cls):
  209. return [f.name for f in fields(cls)]
  210. @property
  211. def link_dir(self) -> str:
  212. from ..config import CONFIG
  213. return str(Path(CONFIG['ARCHIVE_DIR']) / self.timestamp)
  214. @property
  215. def archive_path(self) -> str:
  216. from ..config import ARCHIVE_DIR_NAME
  217. return '{}/{}'.format(ARCHIVE_DIR_NAME, self.timestamp)
  218. @property
  219. def archive_size(self) -> float:
  220. try:
  221. return get_dir_size(self.archive_path)[0]
  222. except Exception:
  223. return 0
  224. ### URL Helpers
  225. @property
  226. def url_hash(self):
  227. from ..util import hashurl
  228. return hashurl(self.url)
  229. @property
  230. def scheme(self) -> str:
  231. from ..util import scheme
  232. return scheme(self.url)
  233. @property
  234. def extension(self) -> str:
  235. from ..util import extension
  236. return extension(self.url)
  237. @property
  238. def domain(self) -> str:
  239. from ..util import domain
  240. return domain(self.url)
  241. @property
  242. def path(self) -> str:
  243. from ..util import path
  244. return path(self.url)
  245. @property
  246. def basename(self) -> str:
  247. from ..util import basename
  248. return basename(self.url)
  249. @property
  250. def base_url(self) -> str:
  251. from ..util import base_url
  252. return base_url(self.url)
  253. ### Pretty Printing Helpers
  254. @property
  255. def bookmarked_date(self) -> Optional[str]:
  256. from ..util import ts_to_date
  257. max_ts = (datetime.now() + timedelta(days=30)).timestamp()
  258. if self.timestamp and self.timestamp.replace('.', '').isdigit():
  259. if 0 < float(self.timestamp) < max_ts:
  260. return ts_to_date(datetime.fromtimestamp(float(self.timestamp)))
  261. else:
  262. return str(self.timestamp)
  263. return None
  264. @property
  265. def updated_date(self) -> Optional[str]:
  266. from ..util import ts_to_date
  267. return ts_to_date(self.updated) if self.updated else None
  268. @property
  269. def archive_dates(self) -> List[datetime]:
  270. return [
  271. result.start_ts
  272. for method in self.history.keys()
  273. for result in self.history[method]
  274. ]
  275. @property
  276. def oldest_archive_date(self) -> Optional[datetime]:
  277. return min(self.archive_dates, default=None)
  278. @property
  279. def newest_archive_date(self) -> Optional[datetime]:
  280. return max(self.archive_dates, default=None)
  281. ### Archive Status Helpers
  282. @property
  283. def num_outputs(self) -> int:
  284. return self.as_snapshot().num_outputs
  285. @property
  286. def num_failures(self) -> int:
  287. return sum(1
  288. for method in self.history.keys()
  289. for result in self.history[method]
  290. if result.status == 'failed')
  291. @property
  292. def is_static(self) -> bool:
  293. from ..util import is_static_file
  294. return is_static_file(self.url)
  295. @property
  296. def is_archived(self) -> bool:
  297. from ..config import ARCHIVE_DIR
  298. from ..util import domain
  299. output_paths = (
  300. domain(self.url),
  301. 'output.pdf',
  302. 'screenshot.png',
  303. 'output.html',
  304. 'media',
  305. 'singlefile.html'
  306. )
  307. return any(
  308. (Path(ARCHIVE_DIR) / self.timestamp / path).exists()
  309. for path in output_paths
  310. )
  311. def latest_outputs(self, status: str=None) -> Dict[str, ArchiveOutput]:
  312. """get the latest output that each archive method produced for link"""
  313. ARCHIVE_METHODS = (
  314. 'title', 'favicon', 'wget', 'warc', 'singlefile', 'pdf',
  315. 'screenshot', 'dom', 'git', 'media', 'archive_org',
  316. )
  317. latest: Dict[str, ArchiveOutput] = {}
  318. for archive_method in ARCHIVE_METHODS:
  319. # get most recent succesful result in history for each archive method
  320. history = self.history.get(archive_method) or []
  321. history = list(filter(lambda result: result.output, reversed(history)))
  322. if status is not None:
  323. history = list(filter(lambda result: result.status == status, history))
  324. history = list(history)
  325. if history:
  326. latest[archive_method] = history[0].output
  327. else:
  328. latest[archive_method] = None
  329. return latest
  330. def canonical_outputs(self) -> Dict[str, Optional[str]]:
  331. """predict the expected output paths that should be present after archiving"""
  332. from ..extractors.wget import wget_output_path
  333. # TODO: banish this awful duplication from the codebase and import these
  334. # from their respective extractor files
  335. canonical = {
  336. 'index_path': 'index.html',
  337. 'favicon_path': 'favicon.ico',
  338. 'google_favicon_path': 'https://www.google.com/s2/favicons?domain={}'.format(self.domain),
  339. 'wget_path': wget_output_path(self),
  340. 'warc_path': 'warc/',
  341. 'singlefile_path': 'singlefile.html',
  342. 'readability_path': 'readability/content.html',
  343. 'mercury_path': 'mercury/content.html',
  344. 'pdf_path': 'output.pdf',
  345. 'screenshot_path': 'screenshot.png',
  346. 'dom_path': 'output.html',
  347. 'archive_org_path': 'https://web.archive.org/web/{}'.format(self.base_url),
  348. 'git_path': 'git/',
  349. 'media_path': 'media/',
  350. 'headers_path': 'headers.json',
  351. }
  352. if self.is_static:
  353. # static binary files like PDF and images are handled slightly differently.
  354. # they're just downloaded once and aren't archived separately multiple times,
  355. # so the wget, screenshot, & pdf urls should all point to the same file
  356. static_path = wget_output_path(self)
  357. canonical.update({
  358. 'title': self.basename,
  359. 'wget_path': static_path,
  360. 'pdf_path': static_path,
  361. 'screenshot_path': static_path,
  362. 'dom_path': static_path,
  363. 'singlefile_path': static_path,
  364. 'readability_path': static_path,
  365. 'mercury_path': static_path,
  366. })
  367. return canonical