schema.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454
  1. """
  2. WARNING: THIS FILE IS ALL LEGACY CODE TO BE REMOVED.
  3. DO NOT ADD ANY NEW FEATURES TO THIS FILE, NEW CODE GOES HERE: core/models.py
  4. These are the old types we used to use before ArchiveBox v0.4 (before we switched to Django).
  5. """
  6. __package__ = 'archivebox.index'
  7. from pathlib import Path
  8. from datetime import datetime, timezone, timedelta
  9. from typing import List, Dict, Any, Optional, Union, ClassVar
  10. from pydantic import BaseModel, ConfigDict, Field, field_validator, computed_field
  11. from benedict import benedict
  12. from archivebox.config import ARCHIVE_DIR, CONSTANTS
  13. from archivebox.misc.util import parse_date
  14. class ArchiveError(Exception):
  15. def __init__(self, message, hints=None):
  16. super().__init__(message)
  17. self.hints = hints
  18. # Type aliases
  19. LinkDict = Dict[str, Any]
  20. ArchiveOutput = Union[str, Exception, None]
  21. class ArchiveResult(BaseModel):
  22. model_config = ConfigDict(arbitrary_types_allowed=True)
  23. TYPE: str = 'index.schema.ArchiveResult'
  24. cmd: list[str]
  25. pwd: str | None = None
  26. cmd_version: str | None = None
  27. output: ArchiveOutput | None = None
  28. status: str
  29. start_ts: datetime
  30. end_ts: datetime
  31. index_texts: list[str] | None = None
  32. # Class variables for compatibility
  33. _field_names: ClassVar[list[str] | None] = None
  34. @field_validator('status')
  35. @classmethod
  36. def validate_status(cls, v: str) -> str:
  37. if not v:
  38. raise ValueError('status must be a non-empty string')
  39. return v
  40. @field_validator('cmd')
  41. @classmethod
  42. def validate_cmd(cls, v: List[str]) -> List[str]:
  43. if not all(isinstance(arg, str) and arg for arg in v):
  44. raise ValueError('all command arguments must be non-empty strings')
  45. return v
  46. @field_validator('pwd')
  47. @classmethod
  48. def validate_pwd(cls, v: Optional[str]) -> Optional[str]:
  49. if v == '': # Convert empty string to None for consistency
  50. return None
  51. return v
  52. @field_validator('cmd_version')
  53. @classmethod
  54. def validate_cmd_version(cls, v: Optional[str]) -> Optional[str]:
  55. if v == '': # Convert empty string to None for consistency
  56. return None
  57. return v
  58. def model_dump(self, **kwargs) -> dict:
  59. """Backwards compatible with _asdict()"""
  60. return super().model_dump(**kwargs)
  61. @classmethod
  62. def field_names(cls) -> List[str]:
  63. """Get all field names of the model"""
  64. if cls._field_names is None:
  65. cls._field_names = list(cls.model_fields.keys())
  66. return cls._field_names
  67. @classmethod
  68. def guess_ts(cls, dict_info: dict) -> tuple[datetime, datetime]:
  69. """Guess timestamps from dictionary info"""
  70. parsed_timestamp = parse_date(dict_info["timestamp"])
  71. start_ts = parsed_timestamp
  72. end_ts = parsed_timestamp + timedelta(seconds=int(dict_info["duration"]))
  73. return start_ts, end_ts
  74. @classmethod
  75. def from_json(cls, json_info: dict, guess: bool = False) -> 'ArchiveResult':
  76. """Create instance from JSON data"""
  77. info = {
  78. key: val
  79. for key, val in json_info.items()
  80. if key in cls.field_names()
  81. }
  82. if guess:
  83. if "start_ts" not in info:
  84. info["start_ts"], info["end_ts"] = cls.guess_ts(json_info)
  85. else:
  86. info['start_ts'] = parse_date(info['start_ts'])
  87. info['end_ts'] = parse_date(info['end_ts'])
  88. if "pwd" not in info:
  89. info["pwd"] = str(ARCHIVE_DIR / json_info["timestamp"])
  90. if "cmd_version" not in info:
  91. info["cmd_version"] = "Undefined"
  92. if "cmd" not in info:
  93. info["cmd"] = []
  94. else:
  95. info['start_ts'] = parse_date(info['start_ts'])
  96. info['end_ts'] = parse_date(info['end_ts'])
  97. info['cmd_version'] = info.get('cmd_version')
  98. # Handle string command as list
  99. if isinstance(info.get("cmd"), str):
  100. info["cmd"] = [info["cmd"]]
  101. return cls(**info)
  102. def to_dict(self, *keys: str) -> dict:
  103. """Convert to dictionary, optionally filtering by keys"""
  104. data = self.model_dump()
  105. if keys:
  106. return {k: v for k, v in data.items() if k in keys}
  107. return data
  108. def to_json(self, indent: int = 4, sort_keys: bool = True) -> str:
  109. """Convert to JSON string"""
  110. return self.model_dump_json(indent=indent, exclude_none=True)
  111. def to_csv(self, cols: Optional[List[str]] = None, separator: str = ',', ljust: int = 0) -> str:
  112. """Convert to CSV string"""
  113. data = self.model_dump()
  114. cols = cols or self.field_names()
  115. return separator.join(str(data.get(col, '')).ljust(ljust) for col in cols)
  116. @computed_field
  117. def duration(self) -> int:
  118. """Calculate duration in seconds between start and end timestamps"""
  119. return int((self.end_ts - self.start_ts).total_seconds())
  120. class Link(BaseModel):
  121. model_config = ConfigDict(arbitrary_types_allowed=True)
  122. TYPE: str = 'index.schema.Link'
  123. timestamp: str
  124. url: str
  125. title: str | None = None
  126. tags: str | None = None
  127. sources: list[str] = Field(default_factory=list)
  128. history: dict[str, list[ArchiveResult]] = Field(default_factory=dict)
  129. downloaded_at: datetime | None = None
  130. # Class variables for compatibility
  131. _field_names: ClassVar[list[str] | None] = None
  132. def __str__(self) -> str:
  133. return f'[{self.timestamp}] {self.url} "{self.title}"'
  134. def __eq__(self, other: Any) -> bool:
  135. if not isinstance(other, Link):
  136. return NotImplemented
  137. return self.url == other.url
  138. def __gt__(self, other: Any) -> bool:
  139. if not isinstance(other, Link):
  140. return NotImplemented
  141. if not self.timestamp or not other.timestamp:
  142. return NotImplemented
  143. return float(self.timestamp) > float(other.timestamp)
  144. @field_validator('timestamp')
  145. @classmethod
  146. def validate_timestamp(cls, v: str) -> str:
  147. if not v:
  148. raise ValueError('timestamp must be a non-empty string')
  149. if not v.replace('.', '').isdigit():
  150. raise ValueError('timestamp must be a float str')
  151. return v
  152. @field_validator('url')
  153. @classmethod
  154. def validate_url(cls, v: str) -> str:
  155. if not v or '://' not in v:
  156. raise ValueError('url must be a valid URL string')
  157. return v
  158. @field_validator('title')
  159. @classmethod
  160. def validate_title(cls, v: Optional[str]) -> Optional[str]:
  161. if v is not None and not v:
  162. raise ValueError('title must be a non-empty string if provided')
  163. return v
  164. @field_validator('sources')
  165. @classmethod
  166. def validate_sources(cls, v: List[str]) -> List[str]:
  167. if not all(isinstance(source, str) and source for source in v):
  168. raise ValueError('all sources must be non-empty strings')
  169. return v
  170. # Backwards compatibility methods
  171. def _asdict(self, extended: bool = False) -> dict:
  172. return benedict(self)
  173. def overwrite(self, **kwargs) -> 'Link':
  174. """Pure functional version of dict.update that returns a new instance"""
  175. current_data = self.model_dump()
  176. current_data.update(kwargs)
  177. return Link(**current_data)
  178. @classmethod
  179. def field_names(cls) -> list[str]:
  180. if cls._field_names is None:
  181. cls._field_names = list(cls.model_fields.keys())
  182. return cls._field_names
  183. @classmethod
  184. def from_json(cls, json_info: dict, guess: bool = False) -> 'Link':
  185. info = {
  186. key: val
  187. for key, val in json_info.items()
  188. if key in cls.field_names()
  189. }
  190. # Handle downloaded_at
  191. info['downloaded_at'] = cls._parse_date(info.get('updated') or info.get('downloaded_at'))
  192. info['sources'] = info.get('sources') or []
  193. # Handle history
  194. json_history = info.get('history') or {}
  195. cast_history = {}
  196. for method, method_history in json_history.items():
  197. cast_history[method] = []
  198. for json_result in method_history:
  199. assert isinstance(json_result, dict), 'Items in Link["history"][method] must be dicts'
  200. cast_result = ArchiveResult.from_json(json_result, guess)
  201. cast_history[method].append(cast_result)
  202. info['history'] = cast_history
  203. return cls(**info)
  204. def to_json(self, indent: int = 4, sort_keys: bool = True) -> str:
  205. return self.model_dump_json(indent=indent)
  206. def to_csv(self, cols: Optional[List[str]] = None, separator: str = ',', ljust: int = 0) -> str:
  207. data = self.model_dump()
  208. cols = cols or self.field_names()
  209. return separator.join(str(data.get(col, '')).ljust(ljust) for col in cols)
  210. # Properties for compatibility
  211. @property
  212. def link_dir(self) -> str:
  213. return str(ARCHIVE_DIR / self.timestamp)
  214. @property
  215. def archive_path(self) -> str:
  216. return f'{CONSTANTS.ARCHIVE_DIR_NAME}/{self.timestamp}'
  217. @computed_field
  218. def bookmarked_date(self) -> Optional[str]:
  219. max_ts = (datetime.now(timezone.utc) + timedelta(days=30)).timestamp()
  220. if self.timestamp and self.timestamp.replace('.', '').isdigit():
  221. if 0 < float(self.timestamp) < max_ts:
  222. return self._ts_to_date_str(datetime.fromtimestamp(float(self.timestamp)))
  223. return str(self.timestamp)
  224. return None
  225. @computed_field
  226. def downloaded_datestr(self) -> Optional[str]:
  227. return self._ts_to_date_str(self.downloaded_at) if self.downloaded_at else None
  228. @property
  229. def archive_dates(self) -> list[datetime]:
  230. return [
  231. self._parse_date(result.start_ts) # type: ignore
  232. for results in self.history.values()
  233. for result in results
  234. ]
  235. @property
  236. def oldest_archive_date(self) -> Optional[datetime]:
  237. dates = self.archive_dates
  238. return min(dates) if dates else None
  239. @property
  240. def newest_archive_date(self) -> Optional[datetime]:
  241. dates = self.archive_dates
  242. return max(dates) if dates else None
  243. @property
  244. def num_outputs(self) -> int:
  245. try:
  246. return self.as_snapshot().num_outputs
  247. except Exception:
  248. return 0
  249. @property
  250. def num_failures(self) -> int:
  251. return sum(
  252. 1 for results in self.history.values()
  253. for result in results
  254. if result.status == 'failed')
  255. def latest_outputs(self, status: Optional[str] = None) -> dict[str, Any]:
  256. """Get the latest output that each archive method produced for link"""
  257. ARCHIVE_METHODS = (
  258. 'title', 'favicon', 'wget', 'warc', 'singlefile', 'pdf',
  259. 'screenshot', 'dom', 'git', 'media', 'archive_org',
  260. )
  261. latest: Dict[str, Any] = {}
  262. for archive_method in ARCHIVE_METHODS:
  263. # get most recent succesful result in history for each archive method
  264. history = self.history.get(archive_method) or []
  265. history = list(filter(lambda result: result.output, reversed(history)))
  266. if status is not None:
  267. history = list(filter(lambda result: result.status == status, history))
  268. history = list(history)
  269. latest[archive_method] = history[0].output if history else None
  270. return latest
  271. def canonical_outputs(self) -> Dict[str, Optional[str]]:
  272. """Predict the expected output paths that should be present after archiving"""
  273. # You'll need to implement the actual logic based on your requirements
  274. # TODO: banish this awful duplication from the codebase and import these
  275. # from their respective extractor files
  276. from abx_plugin_favicon.config import FAVICON_CONFIG
  277. canonical = {
  278. 'index_path': 'index.html',
  279. 'favicon_path': 'favicon.ico',
  280. 'google_favicon_path': FAVICON_CONFIG.FAVICON_PROVIDER.format(self.domain),
  281. 'wget_path': f'warc/{self.timestamp}',
  282. 'warc_path': 'warc/',
  283. 'singlefile_path': 'singlefile.html',
  284. 'readability_path': 'readability/content.html',
  285. 'mercury_path': 'mercury/content.html',
  286. 'htmltotext_path': 'htmltotext.txt',
  287. 'pdf_path': 'output.pdf',
  288. 'screenshot_path': 'screenshot.png',
  289. 'dom_path': 'output.html',
  290. 'archive_org_path': f'https://web.archive.org/web/{self.base_url}',
  291. 'git_path': 'git/',
  292. 'media_path': 'media/',
  293. 'headers_path': 'headers.json',
  294. }
  295. if self.is_static:
  296. static_path = f'warc/{self.timestamp}'
  297. canonical.update({
  298. 'title': self.basename,
  299. 'wget_path': static_path,
  300. 'pdf_path': static_path,
  301. 'screenshot_path': static_path,
  302. 'dom_path': static_path,
  303. 'singlefile_path': static_path,
  304. 'readability_path': static_path,
  305. 'mercury_path': static_path,
  306. 'htmltotext_path': static_path,
  307. })
  308. return canonical
  309. # URL helper properties
  310. @property
  311. def url_hash(self) -> str:
  312. # Implement your URL hashing logic here
  313. from hashlib import sha256
  314. return sha256(self.url.encode()).hexdigest()[:8]
  315. @property
  316. def scheme(self) -> str:
  317. return self.url.split('://')[0]
  318. @property
  319. def domain(self) -> str:
  320. return self.url.split('://')[1].split('/')[0]
  321. @property
  322. def path(self) -> str:
  323. parts = self.url.split('://', 1)
  324. return '/' + parts[1].split('/', 1)[1] if len(parts) > 1 and '/' in parts[1] else '/'
  325. @property
  326. def basename(self) -> str:
  327. return self.path.split('/')[-1]
  328. @property
  329. def extension(self) -> str:
  330. basename = self.basename
  331. return basename.split('.')[-1] if '.' in basename else ''
  332. @property
  333. def base_url(self) -> str:
  334. return f'{self.scheme}://{self.domain}'
  335. @property
  336. def is_static(self) -> bool:
  337. static_extensions = {'.pdf', '.jpg', '.jpeg', '.png', '.gif', '.webp', '.svg', '.mp4', '.mp3', '.wav', '.webm'}
  338. return any(self.url.lower().endswith(ext) for ext in static_extensions)
  339. @property
  340. def is_archived(self) -> bool:
  341. output_paths = (
  342. self.domain,
  343. 'output.html',
  344. 'output.pdf',
  345. 'screenshot.png',
  346. 'singlefile.html',
  347. 'readability/content.html',
  348. 'mercury/content.html',
  349. 'htmltotext.txt',
  350. 'media',
  351. 'git',
  352. )
  353. return any((Path(ARCHIVE_DIR) / self.timestamp / path).exists() for path in output_paths)
  354. def as_snapshot(self):
  355. """Implement this based on your Django model requirements"""
  356. from core.models import Snapshot
  357. return Snapshot.objects.get(url=self.url)
  358. # Helper methods
  359. @staticmethod
  360. def _ts_to_date_str(dt: Optional[datetime]) -> Optional[str]:
  361. return dt.strftime('%Y-%m-%d %H:%M:%S') if dt else None
  362. @staticmethod
  363. def _parse_date(date_str: Optional[str]) -> Optional[datetime]:
  364. if not date_str:
  365. return None
  366. try:
  367. return datetime.fromisoformat(date_str.replace('Z', '+00:00'))
  368. except ValueError:
  369. try:
  370. return datetime.fromtimestamp(float(date_str))
  371. except (ValueError, TypeError):
  372. return None