2
0

schema.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404
  1. __package__ = 'archivebox.index'
  2. import os
  3. from datetime import datetime, timedelta
  4. from typing import List, Dict, Any, Optional, Union
  5. from dataclasses import dataclass, asdict, field, fields
  6. from ..system import get_dir_size
  7. class ArchiveError(Exception):
  8. def __init__(self, message, hints=None):
  9. super().__init__(message)
  10. self.hints = hints
  11. LinkDict = Dict[str, Any]
  12. ArchiveOutput = Union[str, Exception, None]
  13. @dataclass(frozen=True)
  14. class ArchiveResult:
  15. cmd: List[str]
  16. pwd: Optional[str]
  17. cmd_version: Optional[str]
  18. output: ArchiveOutput
  19. status: str
  20. start_ts: datetime
  21. end_ts: datetime
  22. schema: str = 'ArchiveResult'
  23. def __post_init__(self):
  24. self.typecheck()
  25. def _asdict(self):
  26. return asdict(self)
  27. def typecheck(self) -> None:
  28. assert self.schema == self.__class__.__name__
  29. assert isinstance(self.status, str) and self.status
  30. assert isinstance(self.start_ts, datetime)
  31. assert isinstance(self.end_ts, datetime)
  32. assert isinstance(self.cmd, list)
  33. assert all(isinstance(arg, str) and arg for arg in self.cmd)
  34. assert self.pwd is None or isinstance(self.pwd, str) and self.pwd
  35. assert self.cmd_version is None or isinstance(self.cmd_version, str) and self.cmd_version
  36. assert self.output is None or isinstance(self.output, (str, Exception))
  37. if isinstance(self.output, str):
  38. assert self.output
  39. @classmethod
  40. def from_json(cls, json_info):
  41. from ..util import parse_date
  42. info = {
  43. key: val
  44. for key, val in json_info.items()
  45. if key in cls.field_names()
  46. }
  47. info['start_ts'] = parse_date(info['start_ts'])
  48. info['end_ts'] = parse_date(info['end_ts'])
  49. info['cmd_version'] = info.get('cmd_version')
  50. return cls(**info)
  51. def to_dict(self, *keys) -> dict:
  52. if keys:
  53. return {k: v for k, v in asdict(self).items() if k in keys}
  54. return asdict(self)
  55. def to_json(self, indent=4, sort_keys=True) -> str:
  56. from .json import to_json
  57. return to_json(self, indent=indent, sort_keys=sort_keys)
  58. def to_csv(self, cols: Optional[List[str]]=None, separator: str=',', ljust: int=0) -> str:
  59. from .csv import to_csv
  60. return to_csv(self, csv_col=cols or self.field_names(), separator=separator, ljust=ljust)
  61. @classmethod
  62. def field_names(cls):
  63. return [f.name for f in fields(cls)]
  64. @property
  65. def duration(self) -> int:
  66. return (self.end_ts - self.start_ts).seconds
  67. @dataclass(frozen=True)
  68. class Link:
  69. timestamp: str
  70. url: str
  71. title: Optional[str]
  72. tags: Optional[str]
  73. sources: List[str]
  74. history: Dict[str, List[ArchiveResult]] = field(default_factory=lambda: {})
  75. updated: Optional[datetime] = None
  76. schema: str = 'Link'
  77. def __str__(self) -> str:
  78. return f'[{self.timestamp}] {self.base_url} "{self.title}"'
  79. def __post_init__(self):
  80. self.typecheck()
  81. def overwrite(self, **kwargs):
  82. """pure functional version of dict.update that returns a new instance"""
  83. return Link(**{**self._asdict(), **kwargs})
  84. def __eq__(self, other):
  85. if not isinstance(other, Link):
  86. return NotImplemented
  87. return self.url == other.url
  88. def __gt__(self, other):
  89. if not isinstance(other, Link):
  90. return NotImplemented
  91. if not self.timestamp or not other.timestamp:
  92. return
  93. return float(self.timestamp) > float(other.timestamp)
  94. def typecheck(self) -> None:
  95. from ..config import stderr, ANSI
  96. try:
  97. assert self.schema == self.__class__.__name__
  98. assert isinstance(self.timestamp, str) and self.timestamp
  99. assert self.timestamp.replace('.', '').isdigit()
  100. assert isinstance(self.url, str) and '://' in self.url
  101. assert self.updated is None or isinstance(self.updated, datetime)
  102. assert self.title is None or (isinstance(self.title, str) and self.title)
  103. assert self.tags is None or isinstance(self.tags, str)
  104. assert isinstance(self.sources, list)
  105. assert all(isinstance(source, str) and source for source in self.sources)
  106. assert isinstance(self.history, dict)
  107. for method, results in self.history.items():
  108. assert isinstance(method, str) and method
  109. assert isinstance(results, list)
  110. assert all(isinstance(result, ArchiveResult) for result in results)
  111. except Exception:
  112. stderr('{red}[X] Error while loading link! [{}] {} "{}"{reset}'.format(self.timestamp, self.url, self.title, **ANSI))
  113. raise
  114. def _asdict(self, extended=False):
  115. info = {
  116. 'schema': 'Link',
  117. 'url': self.url,
  118. 'title': self.title or None,
  119. 'timestamp': self.timestamp,
  120. 'updated': self.updated or None,
  121. 'tags': self.tags or None,
  122. 'sources': self.sources or [],
  123. 'history': self.history or {},
  124. }
  125. if extended:
  126. info.update({
  127. 'link_dir': self.link_dir,
  128. 'archive_path': self.archive_path,
  129. 'hash': self.url_hash,
  130. 'base_url': self.base_url,
  131. 'scheme': self.scheme,
  132. 'domain': self.domain,
  133. 'path': self.path,
  134. 'basename': self.basename,
  135. 'extension': self.extension,
  136. 'is_static': self.is_static,
  137. 'bookmarked_date': self.bookmarked_date,
  138. 'updated_date': self.updated_date,
  139. 'oldest_archive_date': self.oldest_archive_date,
  140. 'newest_archive_date': self.newest_archive_date,
  141. 'is_archived': self.is_archived,
  142. 'num_outputs': self.num_outputs,
  143. 'num_failures': self.num_failures,
  144. 'latest': self.latest_outputs(),
  145. 'canonical': self.canonical_outputs(),
  146. })
  147. return info
  148. @classmethod
  149. def from_json(cls, json_info):
  150. from ..util import parse_date
  151. info = {
  152. key: val
  153. for key, val in json_info.items()
  154. if key in cls.field_names()
  155. }
  156. info['updated'] = parse_date(info.get('updated'))
  157. info['sources'] = info.get('sources') or []
  158. json_history = info.get('history') or {}
  159. cast_history = {}
  160. for method, method_history in json_history.items():
  161. cast_history[method] = []
  162. for json_result in method_history:
  163. assert isinstance(json_result, dict), 'Items in Link["history"][method] must be dicts'
  164. cast_result = ArchiveResult.from_json(json_result)
  165. cast_history[method].append(cast_result)
  166. info['history'] = cast_history
  167. return cls(**info)
  168. def to_json(self, indent=4, sort_keys=True) -> str:
  169. from .json import to_json
  170. return to_json(self, indent=indent, sort_keys=sort_keys)
  171. def to_csv(self, cols: Optional[List[str]]=None, separator: str=',', ljust: int=0) -> str:
  172. from .csv import to_csv
  173. return to_csv(self, cols=cols or self.field_names(), separator=separator, ljust=ljust)
  174. @classmethod
  175. def field_names(cls):
  176. return [f.name for f in fields(cls)]
  177. @property
  178. def link_dir(self) -> str:
  179. from ..config import CONFIG
  180. return os.path.join(CONFIG['ARCHIVE_DIR'], self.timestamp)
  181. @property
  182. def archive_path(self) -> str:
  183. from ..config import ARCHIVE_DIR_NAME
  184. return '{}/{}'.format(ARCHIVE_DIR_NAME, self.timestamp)
  185. @property
  186. def archive_size(self) -> float:
  187. try:
  188. return get_dir_size(self.archive_path)[0]
  189. except Exception:
  190. return 0
  191. ### URL Helpers
  192. @property
  193. def url_hash(self):
  194. from ..util import hashurl
  195. return hashurl(self.url)
  196. @property
  197. def scheme(self) -> str:
  198. from ..util import scheme
  199. return scheme(self.url)
  200. @property
  201. def extension(self) -> str:
  202. from ..util import extension
  203. return extension(self.url)
  204. @property
  205. def domain(self) -> str:
  206. from ..util import domain
  207. return domain(self.url)
  208. @property
  209. def path(self) -> str:
  210. from ..util import path
  211. return path(self.url)
  212. @property
  213. def basename(self) -> str:
  214. from ..util import basename
  215. return basename(self.url)
  216. @property
  217. def base_url(self) -> str:
  218. from ..util import base_url
  219. return base_url(self.url)
  220. ### Pretty Printing Helpers
  221. @property
  222. def bookmarked_date(self) -> Optional[str]:
  223. from ..util import ts_to_date
  224. max_ts = (datetime.now() + timedelta(days=30)).timestamp()
  225. if self.timestamp and self.timestamp.replace('.', '').isdigit():
  226. if 0 < float(self.timestamp) < max_ts:
  227. return ts_to_date(datetime.fromtimestamp(float(self.timestamp)))
  228. else:
  229. return str(self.timestamp)
  230. return None
  231. @property
  232. def updated_date(self) -> Optional[str]:
  233. from ..util import ts_to_date
  234. return ts_to_date(self.updated) if self.updated else None
  235. @property
  236. def archive_dates(self) -> List[datetime]:
  237. return [
  238. result.start_ts
  239. for method in self.history.keys()
  240. for result in self.history[method]
  241. ]
  242. @property
  243. def oldest_archive_date(self) -> Optional[datetime]:
  244. return min(self.archive_dates, default=None)
  245. @property
  246. def newest_archive_date(self) -> Optional[datetime]:
  247. return max(self.archive_dates, default=None)
  248. ### Archive Status Helpers
  249. @property
  250. def num_outputs(self) -> int:
  251. return len(tuple(filter(None, self.latest_outputs().values())))
  252. @property
  253. def num_failures(self) -> int:
  254. return sum(1
  255. for method in self.history.keys()
  256. for result in self.history[method]
  257. if result.status == 'failed')
  258. @property
  259. def is_static(self) -> bool:
  260. from ..util import is_static_file
  261. return is_static_file(self.url)
  262. @property
  263. def is_archived(self) -> bool:
  264. from ..config import ARCHIVE_DIR
  265. from ..util import domain
  266. output_paths = (
  267. domain(self.url),
  268. 'output.pdf',
  269. 'screenshot.png',
  270. 'output.html',
  271. 'media',
  272. )
  273. return any(
  274. os.path.exists(os.path.join(ARCHIVE_DIR, self.timestamp, path))
  275. for path in output_paths
  276. )
  277. def latest_outputs(self, status: str=None) -> Dict[str, ArchiveOutput]:
  278. """get the latest output that each archive method produced for link"""
  279. ARCHIVE_METHODS = (
  280. 'title', 'favicon', 'wget', 'warc', 'pdf',
  281. 'screenshot', 'dom', 'git', 'media', 'archive_org',
  282. )
  283. latest: Dict[str, ArchiveOutput] = {}
  284. for archive_method in ARCHIVE_METHODS:
  285. # get most recent succesful result in history for each archive method
  286. history = self.history.get(archive_method) or []
  287. history = list(filter(lambda result: result.output, reversed(history)))
  288. if status is not None:
  289. history = list(filter(lambda result: result.status == status, history))
  290. history = list(history)
  291. if history:
  292. latest[archive_method] = history[0].output
  293. else:
  294. latest[archive_method] = None
  295. return latest
  296. def canonical_outputs(self) -> Dict[str, Optional[str]]:
  297. """predict the expected output paths that should be present after archiving"""
  298. from ..extractors.wget import wget_output_path
  299. canonical = {
  300. 'index_path': 'index.html',
  301. 'favicon_path': 'favicon.ico',
  302. 'google_favicon_path': 'https://www.google.com/s2/favicons?domain={}'.format(self.domain),
  303. 'wget_path': wget_output_path(self),
  304. 'warc_path': 'warc',
  305. 'pdf_path': 'output.pdf',
  306. 'screenshot_path': 'screenshot.png',
  307. 'dom_path': 'output.html',
  308. 'archive_org_path': 'https://web.archive.org/web/{}'.format(self.base_url),
  309. 'git_path': 'git',
  310. 'media_path': 'media',
  311. }
  312. if self.is_static:
  313. # static binary files like PDF and images are handled slightly differently.
  314. # they're just downloaded once and aren't archived separately multiple times,
  315. # so the wget, screenshot, & pdf urls should all point to the same file
  316. static_path = wget_output_path(self)
  317. canonical.update({
  318. 'title': self.basename,
  319. 'wget_path': static_path,
  320. 'pdf_path': static_path,
  321. 'screenshot_path': static_path,
  322. 'dom_path': static_path,
  323. })
  324. return canonical