schema.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431
  1. __package__ = 'archivebox.index'
  2. import os
  3. from pathlib import Path
  4. from datetime import datetime, timedelta
  5. from typing import List, Dict, Any, Optional, Union
  6. from dataclasses import dataclass, asdict, field, fields
  7. from ..system import get_dir_size
  8. from ..config import OUTPUT_DIR, ARCHIVE_DIR_NAME
  9. class ArchiveError(Exception):
  10. def __init__(self, message, hints=None):
  11. super().__init__(message)
  12. self.hints = hints
  13. LinkDict = Dict[str, Any]
  14. ArchiveOutput = Union[str, Exception, None]
  15. @dataclass(frozen=True)
  16. class ArchiveResult:
  17. cmd: List[str]
  18. pwd: Optional[str]
  19. cmd_version: Optional[str]
  20. output: ArchiveOutput
  21. status: str
  22. start_ts: datetime
  23. end_ts: datetime
  24. schema: str = 'ArchiveResult'
  25. def __post_init__(self):
  26. self.typecheck()
  27. def _asdict(self):
  28. return asdict(self)
  29. def typecheck(self) -> None:
  30. assert self.schema == self.__class__.__name__
  31. assert isinstance(self.status, str) and self.status
  32. assert isinstance(self.start_ts, datetime)
  33. assert isinstance(self.end_ts, datetime)
  34. assert isinstance(self.cmd, list)
  35. assert all(isinstance(arg, str) and arg for arg in self.cmd)
  36. assert self.pwd is None or isinstance(self.pwd, str) and self.pwd
  37. assert self.cmd_version is None or isinstance(self.cmd_version, str) and self.cmd_version
  38. assert self.output is None or isinstance(self.output, (str, Exception))
  39. if isinstance(self.output, str):
  40. assert self.output
  41. @classmethod
  42. def guess_ts(_cls, dict_info):
  43. from ..util import parse_date
  44. parsed_timestamp = parse_date(dict_info["timestamp"])
  45. start_ts = parsed_timestamp
  46. end_ts = parsed_timestamp + timedelta(seconds=int(dict_info["duration"]))
  47. return start_ts, end_ts
  48. @classmethod
  49. def from_json(cls, json_info, guess=False):
  50. from ..util import parse_date
  51. info = {
  52. key: val
  53. for key, val in json_info.items()
  54. if key in cls.field_names()
  55. }
  56. if guess:
  57. keys = info.keys()
  58. if "start_ts" not in keys:
  59. info["start_ts"], info["end_ts"] = cls.guess_ts(json_info)
  60. else:
  61. info['start_ts'] = parse_date(info['start_ts'])
  62. info['end_ts'] = parse_date(info['end_ts'])
  63. if "pwd" not in keys:
  64. info["pwd"] = str(Path(OUTPUT_DIR) / ARCHIVE_DIR_NAME / json_info["timestamp"])
  65. if "cmd_version" not in keys:
  66. info["cmd_version"] = "Undefined"
  67. if "cmd" not in keys:
  68. info["cmd"] = []
  69. else:
  70. info['start_ts'] = parse_date(info['start_ts'])
  71. info['end_ts'] = parse_date(info['end_ts'])
  72. info['cmd_version'] = info.get('cmd_version')
  73. if type(info["cmd"]) is str:
  74. info["cmd"] = [info["cmd"]]
  75. return cls(**info)
  76. def to_dict(self, *keys) -> dict:
  77. if keys:
  78. return {k: v for k, v in asdict(self).items() if k in keys}
  79. return asdict(self)
  80. def to_json(self, indent=4, sort_keys=True) -> str:
  81. from .json import to_json
  82. return to_json(self, indent=indent, sort_keys=sort_keys)
  83. def to_csv(self, cols: Optional[List[str]]=None, separator: str=',', ljust: int=0) -> str:
  84. from .csv import to_csv
  85. return to_csv(self, csv_col=cols or self.field_names(), separator=separator, ljust=ljust)
  86. @classmethod
  87. def field_names(cls):
  88. return [f.name for f in fields(cls)]
  89. @property
  90. def duration(self) -> int:
  91. return (self.end_ts - self.start_ts).seconds
  92. @dataclass(frozen=True)
  93. class Link:
  94. timestamp: str
  95. url: str
  96. title: Optional[str]
  97. tags: Optional[str]
  98. sources: List[str]
  99. history: Dict[str, List[ArchiveResult]] = field(default_factory=lambda: {})
  100. updated: Optional[datetime] = None
  101. schema: str = 'Link'
  102. def __str__(self) -> str:
  103. return f'[{self.timestamp}] {self.base_url} "{self.title}"'
  104. def __post_init__(self):
  105. self.typecheck()
  106. def overwrite(self, **kwargs):
  107. """pure functional version of dict.update that returns a new instance"""
  108. return Link(**{**self._asdict(), **kwargs})
  109. def __eq__(self, other):
  110. if not isinstance(other, Link):
  111. return NotImplemented
  112. return self.url == other.url
  113. def __gt__(self, other):
  114. if not isinstance(other, Link):
  115. return NotImplemented
  116. if not self.timestamp or not other.timestamp:
  117. return
  118. return float(self.timestamp) > float(other.timestamp)
  119. def typecheck(self) -> None:
  120. from ..config import stderr, ANSI
  121. try:
  122. assert self.schema == self.__class__.__name__
  123. assert isinstance(self.timestamp, str) and self.timestamp
  124. assert self.timestamp.replace('.', '').isdigit()
  125. assert isinstance(self.url, str) and '://' in self.url
  126. assert self.updated is None or isinstance(self.updated, datetime)
  127. assert self.title is None or (isinstance(self.title, str) and self.title)
  128. assert self.tags is None or isinstance(self.tags, str)
  129. assert isinstance(self.sources, list)
  130. assert all(isinstance(source, str) and source for source in self.sources)
  131. assert isinstance(self.history, dict)
  132. for method, results in self.history.items():
  133. assert isinstance(method, str) and method
  134. assert isinstance(results, list)
  135. assert all(isinstance(result, ArchiveResult) for result in results)
  136. except Exception:
  137. stderr('{red}[X] Error while loading link! [{}] {} "{}"{reset}'.format(self.timestamp, self.url, self.title, **ANSI))
  138. raise
  139. def _asdict(self, extended=False):
  140. info = {
  141. 'schema': 'Link',
  142. 'url': self.url,
  143. 'title': self.title or None,
  144. 'timestamp': self.timestamp,
  145. 'updated': self.updated or None,
  146. 'tags': self.tags or None,
  147. 'sources': self.sources or [],
  148. 'history': self.history or {},
  149. }
  150. if extended:
  151. info.update({
  152. 'link_dir': self.link_dir,
  153. 'archive_path': self.archive_path,
  154. 'hash': self.url_hash,
  155. 'base_url': self.base_url,
  156. 'scheme': self.scheme,
  157. 'domain': self.domain,
  158. 'path': self.path,
  159. 'basename': self.basename,
  160. 'extension': self.extension,
  161. 'is_static': self.is_static,
  162. 'bookmarked_date': self.bookmarked_date,
  163. 'updated_date': self.updated_date,
  164. 'oldest_archive_date': self.oldest_archive_date,
  165. 'newest_archive_date': self.newest_archive_date,
  166. 'is_archived': self.is_archived,
  167. 'num_outputs': self.num_outputs,
  168. 'num_failures': self.num_failures,
  169. 'latest': self.latest_outputs(),
  170. 'canonical': self.canonical_outputs(),
  171. })
  172. return info
  173. @classmethod
  174. def from_json(cls, json_info, guess=False):
  175. from ..util import parse_date
  176. info = {
  177. key: val
  178. for key, val in json_info.items()
  179. if key in cls.field_names()
  180. }
  181. info['updated'] = parse_date(info.get('updated'))
  182. info['sources'] = info.get('sources') or []
  183. json_history = info.get('history') or {}
  184. cast_history = {}
  185. for method, method_history in json_history.items():
  186. cast_history[method] = []
  187. for json_result in method_history:
  188. assert isinstance(json_result, dict), 'Items in Link["history"][method] must be dicts'
  189. cast_result = ArchiveResult.from_json(json_result, guess)
  190. cast_history[method].append(cast_result)
  191. info['history'] = cast_history
  192. return cls(**info)
  193. def to_json(self, indent=4, sort_keys=True) -> str:
  194. from .json import to_json
  195. return to_json(self, indent=indent, sort_keys=sort_keys)
  196. def to_csv(self, cols: Optional[List[str]]=None, separator: str=',', ljust: int=0) -> str:
  197. from .csv import to_csv
  198. return to_csv(self, cols=cols or self.field_names(), separator=separator, ljust=ljust)
  199. @classmethod
  200. def field_names(cls):
  201. return [f.name for f in fields(cls)]
  202. @property
  203. def link_dir(self) -> str:
  204. from ..config import CONFIG
  205. return os.path.join(CONFIG['ARCHIVE_DIR'], self.timestamp)
  206. @property
  207. def archive_path(self) -> str:
  208. from ..config import ARCHIVE_DIR_NAME
  209. return '{}/{}'.format(ARCHIVE_DIR_NAME, self.timestamp)
  210. @property
  211. def archive_size(self) -> float:
  212. try:
  213. return get_dir_size(self.archive_path)[0]
  214. except Exception:
  215. return 0
  216. ### URL Helpers
  217. @property
  218. def url_hash(self):
  219. from ..util import hashurl
  220. return hashurl(self.url)
  221. @property
  222. def scheme(self) -> str:
  223. from ..util import scheme
  224. return scheme(self.url)
  225. @property
  226. def extension(self) -> str:
  227. from ..util import extension
  228. return extension(self.url)
  229. @property
  230. def domain(self) -> str:
  231. from ..util import domain
  232. return domain(self.url)
  233. @property
  234. def path(self) -> str:
  235. from ..util import path
  236. return path(self.url)
  237. @property
  238. def basename(self) -> str:
  239. from ..util import basename
  240. return basename(self.url)
  241. @property
  242. def base_url(self) -> str:
  243. from ..util import base_url
  244. return base_url(self.url)
  245. ### Pretty Printing Helpers
  246. @property
  247. def bookmarked_date(self) -> Optional[str]:
  248. from ..util import ts_to_date
  249. max_ts = (datetime.now() + timedelta(days=30)).timestamp()
  250. if self.timestamp and self.timestamp.replace('.', '').isdigit():
  251. if 0 < float(self.timestamp) < max_ts:
  252. return ts_to_date(datetime.fromtimestamp(float(self.timestamp)))
  253. else:
  254. return str(self.timestamp)
  255. return None
  256. @property
  257. def updated_date(self) -> Optional[str]:
  258. from ..util import ts_to_date
  259. return ts_to_date(self.updated) if self.updated else None
  260. @property
  261. def archive_dates(self) -> List[datetime]:
  262. return [
  263. result.start_ts
  264. for method in self.history.keys()
  265. for result in self.history[method]
  266. ]
  267. @property
  268. def oldest_archive_date(self) -> Optional[datetime]:
  269. return min(self.archive_dates, default=None)
  270. @property
  271. def newest_archive_date(self) -> Optional[datetime]:
  272. return max(self.archive_dates, default=None)
  273. ### Archive Status Helpers
  274. @property
  275. def num_outputs(self) -> int:
  276. return len(tuple(filter(None, self.latest_outputs().values())))
  277. @property
  278. def num_failures(self) -> int:
  279. return sum(1
  280. for method in self.history.keys()
  281. for result in self.history[method]
  282. if result.status == 'failed')
  283. @property
  284. def is_static(self) -> bool:
  285. from ..util import is_static_file
  286. return is_static_file(self.url)
  287. @property
  288. def is_archived(self) -> bool:
  289. from ..config import ARCHIVE_DIR
  290. from ..util import domain
  291. output_paths = (
  292. domain(self.url),
  293. 'output.pdf',
  294. 'screenshot.png',
  295. 'output.html',
  296. 'media',
  297. )
  298. return any(
  299. os.path.exists(os.path.join(ARCHIVE_DIR, self.timestamp, path))
  300. for path in output_paths
  301. )
  302. def latest_outputs(self, status: str=None) -> Dict[str, ArchiveOutput]:
  303. """get the latest output that each archive method produced for link"""
  304. ARCHIVE_METHODS = (
  305. 'title', 'favicon', 'wget', 'warc', 'pdf',
  306. 'screenshot', 'dom', 'git', 'media', 'archive_org',
  307. )
  308. latest: Dict[str, ArchiveOutput] = {}
  309. for archive_method in ARCHIVE_METHODS:
  310. # get most recent succesful result in history for each archive method
  311. history = self.history.get(archive_method) or []
  312. history = list(filter(lambda result: result.output, reversed(history)))
  313. if status is not None:
  314. history = list(filter(lambda result: result.status == status, history))
  315. history = list(history)
  316. if history:
  317. latest[archive_method] = history[0].output
  318. else:
  319. latest[archive_method] = None
  320. return latest
  321. def canonical_outputs(self) -> Dict[str, Optional[str]]:
  322. """predict the expected output paths that should be present after archiving"""
  323. from ..extractors.wget import wget_output_path
  324. canonical = {
  325. 'index_path': 'index.html',
  326. 'favicon_path': 'favicon.ico',
  327. 'google_favicon_path': 'https://www.google.com/s2/favicons?domain={}'.format(self.domain),
  328. 'wget_path': wget_output_path(self),
  329. 'warc_path': 'warc',
  330. 'pdf_path': 'output.pdf',
  331. 'screenshot_path': 'screenshot.png',
  332. 'dom_path': 'output.html',
  333. 'archive_org_path': 'https://web.archive.org/web/{}'.format(self.base_url),
  334. 'git_path': 'git',
  335. 'media_path': 'media',
  336. }
  337. if self.is_static:
  338. # static binary files like PDF and images are handled slightly differently.
  339. # they're just downloaded once and aren't archived separately multiple times,
  340. # so the wget, screenshot, & pdf urls should all point to the same file
  341. static_path = wget_output_path(self)
  342. canonical.update({
  343. 'title': self.basename,
  344. 'wget_path': static_path,
  345. 'pdf_path': static_path,
  346. 'screenshot_path': static_path,
  347. 'dom_path': static_path,
  348. })
  349. return canonical