__init__.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613
  1. __package__ = 'archivebox.index'
  2. import os
  3. import shutil
  4. from pathlib import Path
  5. from itertools import chain
  6. from typing import List, Tuple, Dict, Optional, Iterable
  7. from collections import OrderedDict
  8. from contextlib import contextmanager
  9. from urllib.parse import urlparse
  10. from django.db.models import QuerySet, Q
  11. from archivebox.config import DATA_DIR, CONSTANTS
  12. from archivebox.config.common import ARCHIVING_CONFIG, STORAGE_CONFIG, SEARCH_BACKEND_CONFIG
  13. from archivebox.misc.util import scheme, enforce_types, ExtendedEncoder
  14. from archivebox.misc.logging import stderr
  15. from archivebox.misc.logging_util import (
  16. TimedProgress,
  17. log_indexing_process_started,
  18. log_indexing_process_finished,
  19. log_indexing_started,
  20. log_indexing_finished,
  21. log_parsing_finished,
  22. log_deduping_finished,
  23. )
  24. from .schema import Link, ArchiveResult
  25. from .html import (
  26. write_html_link_details,
  27. )
  28. from .json import (
  29. pyjson,
  30. parse_json_link_details,
  31. write_json_link_details,
  32. )
  33. from .sql import (
  34. write_sql_main_index,
  35. write_sql_link_details,
  36. )
  37. ### Link filtering and checking
  38. @enforce_types
  39. def merge_links(a: Link, b: Link) -> Link:
  40. """deterministially merge two links, favoring longer field values over shorter,
  41. and "cleaner" values over worse ones.
  42. """
  43. assert a.base_url == b.base_url, f'Cannot merge two links with different URLs ({a.base_url} != {b.base_url})'
  44. # longest url wins (because a fuzzy url will always be shorter)
  45. url = a.url if len(a.url) > len(b.url) else b.url
  46. # best title based on length and quality
  47. possible_titles = [
  48. title
  49. for title in (a.title, b.title)
  50. if title and title.strip() and '://' not in title
  51. ]
  52. title = None
  53. if len(possible_titles) == 2:
  54. title = max(possible_titles, key=lambda t: len(t))
  55. elif len(possible_titles) == 1:
  56. title = possible_titles[0]
  57. # earliest valid timestamp
  58. timestamp = (
  59. a.timestamp
  60. if float(a.timestamp or 0) < float(b.timestamp or 0) else
  61. b.timestamp
  62. )
  63. # all unique, truthy tags
  64. tags_set = (
  65. set(tag.strip() for tag in (a.tags or '').split(','))
  66. | set(tag.strip() for tag in (b.tags or '').split(','))
  67. )
  68. tags = ','.join(tags_set) or None
  69. # all unique source entries
  70. sources = list(set(a.sources + b.sources))
  71. # all unique history entries for the combined archive methods
  72. all_methods = set(list(a.history.keys()) + list(a.history.keys()))
  73. history = {
  74. method: (a.history.get(method) or []) + (b.history.get(method) or [])
  75. for method in all_methods
  76. }
  77. for method in all_methods:
  78. deduped_jsons = {
  79. pyjson.dumps(result, sort_keys=True, cls=ExtendedEncoder)
  80. for result in history[method]
  81. }
  82. history[method] = list(reversed(sorted(
  83. (ArchiveResult.from_json(pyjson.loads(result)) for result in deduped_jsons),
  84. key=lambda result: result.start_ts,
  85. )))
  86. return Link(
  87. url=url,
  88. timestamp=timestamp,
  89. title=title,
  90. tags=tags,
  91. sources=sources,
  92. history=history,
  93. )
  94. @enforce_types
  95. def validate_links(links: Iterable[Link]) -> List[Link]:
  96. timer = TimedProgress(ARCHIVING_CONFIG.TIMEOUT * 4)
  97. try:
  98. links = archivable_links(links) # remove chrome://, about:, mailto: etc.
  99. links = sorted_links(links) # deterministically sort the links based on timestamp, url
  100. links = fix_duplicate_links(links) # merge/dedupe duplicate timestamps & urls
  101. finally:
  102. timer.end()
  103. return list(links)
  104. @enforce_types
  105. def archivable_links(links: Iterable[Link]) -> Iterable[Link]:
  106. """remove chrome://, about:// or other schemed links that cant be archived"""
  107. for link in links:
  108. try:
  109. urlparse(link.url)
  110. except ValueError:
  111. continue
  112. if scheme(link.url) not in ('http', 'https', 'ftp'):
  113. continue
  114. if ARCHIVING_CONFIG.URL_DENYLIST_PTN and ARCHIVING_CONFIG.URL_DENYLIST_PTN.search(link.url):
  115. continue
  116. if ARCHIVING_CONFIG.URL_ALLOWLIST_PTN and (not ARCHIVING_CONFIG.URL_ALLOWLIST_PTN.search(link.url)):
  117. continue
  118. yield link
  119. @enforce_types
  120. def fix_duplicate_links(sorted_links: Iterable[Link]) -> Iterable[Link]:
  121. """
  122. ensures that all non-duplicate links have monotonically increasing timestamps
  123. """
  124. # from core.models import Snapshot
  125. unique_urls: OrderedDict[str, Link] = OrderedDict()
  126. for link in sorted_links:
  127. if link.url in unique_urls:
  128. # merge with any other links that share the same url
  129. link = merge_links(unique_urls[link.url], link)
  130. unique_urls[link.url] = link
  131. return unique_urls.values()
  132. @enforce_types
  133. def sorted_links(links: Iterable[Link]) -> Iterable[Link]:
  134. sort_func = lambda link: (link.timestamp.split('.', 1)[0], link.url)
  135. return sorted(links, key=sort_func, reverse=True)
  136. @enforce_types
  137. def links_after_timestamp(links: Iterable[Link], resume: Optional[float]=None) -> Iterable[Link]:
  138. if not resume:
  139. yield from links
  140. return
  141. for link in links:
  142. try:
  143. if float(link.timestamp) <= resume:
  144. yield link
  145. except (ValueError, TypeError):
  146. print('Resume value and all timestamp values must be valid numbers.')
  147. @enforce_types
  148. def lowest_uniq_timestamp(used_timestamps: OrderedDict, timestamp: str) -> str:
  149. """resolve duplicate timestamps by appending a decimal 1234, 1234 -> 1234.1, 1234.2"""
  150. timestamp = timestamp.split('.')[0]
  151. nonce = 0
  152. # first try 152323423 before 152323423.0
  153. if timestamp not in used_timestamps:
  154. return timestamp
  155. new_timestamp = '{}.{}'.format(timestamp, nonce)
  156. while new_timestamp in used_timestamps:
  157. nonce += 1
  158. new_timestamp = '{}.{}'.format(timestamp, nonce)
  159. return new_timestamp
  160. ### Main Links Index
  161. @contextmanager
  162. @enforce_types
  163. def timed_index_update(out_path: Path):
  164. log_indexing_started(out_path)
  165. timer = TimedProgress(ARCHIVING_CONFIG.TIMEOUT * 2, prefix=' ')
  166. try:
  167. yield
  168. finally:
  169. timer.end()
  170. assert out_path.exists(), f'Failed to write index file: {out_path}'
  171. log_indexing_finished(out_path)
  172. @enforce_types
  173. def write_main_index(links: List[Link], out_dir: Path=DATA_DIR, created_by_id: int | None=None) -> None:
  174. """Writes links to sqlite3 file for a given list of links"""
  175. log_indexing_process_started(len(links))
  176. try:
  177. with timed_index_update(CONSTANTS.DATABASE_FILE):
  178. write_sql_main_index(links, out_dir=out_dir, created_by_id=created_by_id)
  179. os.chmod(CONSTANTS.DATABASE_FILE, int(STORAGE_CONFIG.OUTPUT_PERMISSIONS, base=8)) # set here because we don't write it with atomic writes
  180. except (KeyboardInterrupt, SystemExit):
  181. stderr('[!] Warning: Still writing index to disk...', color='lightyellow')
  182. stderr(' Run archivebox init to fix any inconsistencies from an ungraceful exit.')
  183. with timed_index_update(CONSTANTS.DATABASE_FILE):
  184. write_sql_main_index(links, out_dir=out_dir, created_by_id=created_by_id)
  185. os.chmod(CONSTANTS.DATABASE_FILE, int(STORAGE_CONFIG.OUTPUT_PERMISSIONS, base=8)) # set here because we don't write it with atomic writes
  186. raise SystemExit(0)
  187. log_indexing_process_finished()
  188. @enforce_types
  189. def load_main_index(out_dir: Path | str=DATA_DIR, warn: bool=True) -> List[Link]:
  190. """parse and load existing index with any new links from import_path merged in"""
  191. from core.models import Snapshot
  192. try:
  193. return Snapshot.objects.all().only('id')
  194. except (KeyboardInterrupt, SystemExit):
  195. raise SystemExit(0)
  196. @enforce_types
  197. def load_main_index_meta(out_dir: Path=DATA_DIR) -> Optional[dict]:
  198. index_path = out_dir / CONSTANTS.JSON_INDEX_FILENAME
  199. if os.access(index_path, os.F_OK):
  200. with open(index_path, 'r', encoding='utf-8') as f:
  201. meta_dict = pyjson.load(f)
  202. meta_dict.pop('links')
  203. return meta_dict
  204. return None
  205. @enforce_types
  206. def parse_links_from_source(source_path: str, root_url: Optional[str]=None, parser: str="auto") -> List[Link]:
  207. from ..parsers import parse_links
  208. new_links: List[Link] = []
  209. # parse and validate the import file
  210. raw_links, parser_name = parse_links(source_path, root_url=root_url, parser=parser)
  211. new_links = validate_links(raw_links)
  212. if parser_name:
  213. num_parsed = len(raw_links)
  214. log_parsing_finished(num_parsed, parser_name)
  215. return new_links
  216. @enforce_types
  217. def fix_duplicate_links_in_index(snapshots: QuerySet, links: Iterable[Link]) -> Iterable[Link]:
  218. """
  219. Given a list of in-memory Links, dedupe and merge them with any conflicting Snapshots in the DB.
  220. """
  221. unique_urls: OrderedDict[str, Link] = OrderedDict()
  222. for link in links:
  223. index_link = snapshots.filter(url=link.url)
  224. if index_link:
  225. link = merge_links(index_link[0].as_link(), link)
  226. unique_urls[link.url] = link
  227. return unique_urls.values()
  228. @enforce_types
  229. def dedupe_links(snapshots: QuerySet,
  230. new_links: List[Link]) -> List[Link]:
  231. """
  232. The validation of links happened at a different stage. This method will
  233. focus on actual deduplication and timestamp fixing.
  234. """
  235. # merge existing links in out_dir and new links
  236. dedup_links = fix_duplicate_links_in_index(snapshots, new_links)
  237. new_links = [
  238. link for link in new_links
  239. if not snapshots.filter(url=link.url).exists()
  240. ]
  241. dedup_links_dict = {link.url: link for link in dedup_links}
  242. # Replace links in new_links with the dedup version
  243. for i in range(len(new_links)):
  244. if new_links[i].url in dedup_links_dict.keys():
  245. new_links[i] = dedup_links_dict[new_links[i].url]
  246. log_deduping_finished(len(new_links))
  247. return new_links
  248. ### Link Details Index
  249. @enforce_types
  250. def write_link_details(link: Link, out_dir: Optional[str]=None, skip_sql_index: bool=False) -> None:
  251. out_dir = out_dir or link.link_dir
  252. write_json_link_details(link, out_dir=out_dir)
  253. write_html_link_details(link, out_dir=out_dir)
  254. if not skip_sql_index:
  255. write_sql_link_details(link)
  256. @enforce_types
  257. def load_link_details(link: Link, out_dir: Optional[str]=None) -> Link:
  258. """check for an existing link archive in the given directory,
  259. and load+merge it into the given link dict
  260. """
  261. out_dir = out_dir or link.link_dir
  262. existing_link = parse_json_link_details(out_dir)
  263. if existing_link:
  264. return merge_links(existing_link, link)
  265. return link
  266. LINK_FILTERS = {
  267. 'exact': lambda pattern: Q(url=pattern),
  268. 'substring': lambda pattern: Q(url__icontains=pattern),
  269. 'regex': lambda pattern: Q(url__iregex=pattern),
  270. 'domain': lambda pattern: Q(url__istartswith=f"http://{pattern}") | Q(url__istartswith=f"https://{pattern}") | Q(url__istartswith=f"ftp://{pattern}"),
  271. 'tag': lambda pattern: Q(tags__name=pattern),
  272. 'timestamp': lambda pattern: Q(timestamp=pattern),
  273. }
  274. @enforce_types
  275. def q_filter(snapshots: QuerySet, filter_patterns: List[str], filter_type: str='exact') -> QuerySet:
  276. q_filter = Q()
  277. for pattern in filter_patterns:
  278. try:
  279. q_filter = q_filter | LINK_FILTERS[filter_type](pattern)
  280. except KeyError:
  281. stderr()
  282. stderr(
  283. f'[X] Got invalid pattern for --filter-type={filter_type}:',
  284. color='red',
  285. )
  286. stderr(f' {pattern}')
  287. raise SystemExit(2)
  288. return snapshots.filter(q_filter)
  289. def search_filter(snapshots: QuerySet, filter_patterns: List[str], filter_type: str='search') -> QuerySet:
  290. from ..search import query_search_index
  291. if not SEARCH_BACKEND_CONFIG.USE_SEARCHING_BACKEND:
  292. stderr()
  293. stderr(
  294. '[X] The search backend is not enabled, set config.USE_SEARCHING_BACKEND = True',
  295. color='red',
  296. )
  297. raise SystemExit(2)
  298. from core.models import Snapshot
  299. qsearch = Snapshot.objects.none()
  300. for pattern in filter_patterns:
  301. try:
  302. qsearch |= query_search_index(pattern)
  303. except:
  304. raise SystemExit(2)
  305. return snapshots & qsearch
  306. @enforce_types
  307. def snapshot_filter(snapshots: QuerySet, filter_patterns: List[str], filter_type: str='exact') -> QuerySet:
  308. if filter_type != 'search':
  309. return q_filter(snapshots, filter_patterns, filter_type)
  310. else:
  311. return search_filter(snapshots, filter_patterns, filter_type)
  312. def get_indexed_folders(snapshots, out_dir: Path=DATA_DIR) -> Dict[str, Optional[Link]]:
  313. """indexed links without checking archive status or data directory validity"""
  314. links = (snapshot.as_link() for snapshot in snapshots.iterator(chunk_size=500))
  315. return {
  316. link.link_dir: link
  317. for link in links
  318. }
  319. def get_archived_folders(snapshots, out_dir: Path=DATA_DIR) -> Dict[str, Optional[Link]]:
  320. """indexed links that are archived with a valid data directory"""
  321. links = (snapshot.as_link() for snapshot in snapshots.iterator(chunk_size=500))
  322. return {
  323. link.link_dir: link
  324. for link in filter(is_archived, links)
  325. }
  326. def get_unarchived_folders(snapshots, out_dir: Path=DATA_DIR) -> Dict[str, Optional[Link]]:
  327. """indexed links that are unarchived with no data directory or an empty data directory"""
  328. links = (snapshot.as_link() for snapshot in snapshots.iterator(chunk_size=500))
  329. return {
  330. link.link_dir: link
  331. for link in filter(is_unarchived, links)
  332. }
  333. def get_present_folders(snapshots, out_dir: Path=DATA_DIR) -> Dict[str, Optional[Link]]:
  334. """dirs that actually exist in the archive/ folder"""
  335. all_folders = {}
  336. for entry in (out_dir / CONSTANTS.ARCHIVE_DIR_NAME).iterdir():
  337. if entry.is_dir():
  338. link = None
  339. try:
  340. link = parse_json_link_details(entry.path)
  341. except Exception:
  342. pass
  343. all_folders[entry.name] = link
  344. return all_folders
  345. def get_valid_folders(snapshots, out_dir: Path=DATA_DIR) -> Dict[str, Optional[Link]]:
  346. """dirs with a valid index matched to the main index and archived content"""
  347. links = [snapshot.as_link_with_details() for snapshot in snapshots.iterator(chunk_size=500)]
  348. return {
  349. link.link_dir: link
  350. for link in filter(is_valid, links)
  351. }
  352. def get_invalid_folders(snapshots, out_dir: Path=DATA_DIR) -> Dict[str, Optional[Link]]:
  353. """dirs that are invalid for any reason: corrupted/duplicate/orphaned/unrecognized"""
  354. duplicate = get_duplicate_folders(snapshots, out_dir=out_dir)
  355. orphaned = get_orphaned_folders(snapshots, out_dir=out_dir)
  356. corrupted = get_corrupted_folders(snapshots, out_dir=out_dir)
  357. unrecognized = get_unrecognized_folders(snapshots, out_dir=out_dir)
  358. return {**duplicate, **orphaned, **corrupted, **unrecognized}
  359. def get_duplicate_folders(snapshots, out_dir: Path=DATA_DIR) -> Dict[str, Optional[Link]]:
  360. """dirs that conflict with other directories that have the same link URL or timestamp"""
  361. by_url = {}
  362. by_timestamp = {}
  363. duplicate_folders = {}
  364. data_folders = (
  365. str(entry)
  366. for entry in CONSTANTS.ARCHIVE_DIR.iterdir()
  367. if entry.is_dir() and not snapshots.filter(timestamp=entry.name).exists()
  368. )
  369. for path in chain(snapshots.iterator(chunk_size=500), data_folders):
  370. link = None
  371. if type(path) is not str:
  372. path = path.as_link().link_dir
  373. try:
  374. link = parse_json_link_details(path)
  375. except Exception:
  376. pass
  377. if link:
  378. # link folder has same timestamp as different link folder
  379. by_timestamp[link.timestamp] = by_timestamp.get(link.timestamp, 0) + 1
  380. if by_timestamp[link.timestamp] > 1:
  381. duplicate_folders[path] = link
  382. # link folder has same url as different link folder
  383. by_url[link.url] = by_url.get(link.url, 0) + 1
  384. if by_url[link.url] > 1:
  385. duplicate_folders[path] = link
  386. return duplicate_folders
  387. def get_orphaned_folders(snapshots, out_dir: Path=DATA_DIR) -> Dict[str, Optional[Link]]:
  388. """dirs that contain a valid index but aren't listed in the main index"""
  389. orphaned_folders = {}
  390. for entry in CONSTANTS.ARCHIVE_DIR.iterdir():
  391. if entry.is_dir():
  392. link = None
  393. try:
  394. link = parse_json_link_details(str(entry))
  395. except Exception:
  396. pass
  397. if link and not snapshots.filter(timestamp=entry.name).exists():
  398. # folder is a valid link data dir with index details, but it's not in the main index
  399. orphaned_folders[str(entry)] = link
  400. return orphaned_folders
  401. def get_corrupted_folders(snapshots, out_dir: Path=DATA_DIR) -> Dict[str, Optional[Link]]:
  402. """dirs that don't contain a valid index and aren't listed in the main index"""
  403. corrupted = {}
  404. for snapshot in snapshots.iterator(chunk_size=500):
  405. link = snapshot.as_link()
  406. if is_corrupt(link):
  407. corrupted[link.link_dir] = link
  408. return corrupted
  409. def get_unrecognized_folders(snapshots, out_dir: Path=DATA_DIR) -> Dict[str, Optional[Link]]:
  410. """dirs that don't contain recognizable archive data and aren't listed in the main index"""
  411. unrecognized_folders: Dict[str, Optional[Link]] = {}
  412. for entry in (Path(out_dir) / CONSTANTS.ARCHIVE_DIR_NAME).iterdir():
  413. if entry.is_dir():
  414. index_exists = (entry / "index.json").exists()
  415. link = None
  416. try:
  417. link = parse_json_link_details(str(entry))
  418. except KeyError:
  419. # Try to fix index
  420. if index_exists:
  421. try:
  422. # Last attempt to repair the detail index
  423. link_guessed = parse_json_link_details(str(entry), guess=True)
  424. write_json_link_details(link_guessed, out_dir=str(entry))
  425. link = parse_json_link_details(str(entry))
  426. except Exception:
  427. pass
  428. if index_exists and link is None:
  429. # index exists but it's corrupted or unparseable
  430. unrecognized_folders[str(entry)] = link
  431. elif not index_exists:
  432. # link details index doesn't exist and the folder isn't in the main index
  433. timestamp = entry.name
  434. if not snapshots.filter(timestamp=timestamp).exists():
  435. unrecognized_folders[str(entry)] = link
  436. return unrecognized_folders
  437. def is_valid(link: Link) -> bool:
  438. dir_exists = Path(link.link_dir).exists()
  439. index_exists = (Path(link.link_dir) / "index.json").exists()
  440. if not dir_exists:
  441. # unarchived links are not included in the valid list
  442. return False
  443. if dir_exists and not index_exists:
  444. return False
  445. if dir_exists and index_exists:
  446. try:
  447. parsed_link = parse_json_link_details(link.link_dir, guess=True)
  448. return link.url == parsed_link.url
  449. except Exception:
  450. pass
  451. return False
  452. def is_corrupt(link: Link) -> bool:
  453. if not Path(link.link_dir).exists():
  454. # unarchived links are not considered corrupt
  455. return False
  456. if is_valid(link):
  457. return False
  458. return True
  459. def is_archived(link: Link) -> bool:
  460. return is_valid(link) and link.is_archived
  461. def is_unarchived(link: Link) -> bool:
  462. if not Path(link.link_dir).exists():
  463. return True
  464. return not link.is_archived
  465. def fix_invalid_folder_locations(out_dir: Path=DATA_DIR) -> Tuple[List[str], List[str]]:
  466. fixed = []
  467. cant_fix = []
  468. for entry in os.scandir(out_dir / CONSTANTS.ARCHIVE_DIR_NAME):
  469. if entry.is_dir(follow_symlinks=True):
  470. if (Path(entry.path) / 'index.json').exists():
  471. try:
  472. link = parse_json_link_details(entry.path)
  473. except KeyError:
  474. link = None
  475. if not link:
  476. continue
  477. if not entry.path.endswith(f'/{link.timestamp}'):
  478. dest = out_dir /CONSTANTS.ARCHIVE_DIR_NAME / link.timestamp
  479. if dest.exists():
  480. cant_fix.append(entry.path)
  481. else:
  482. shutil.move(entry.path, dest)
  483. fixed.append(dest)
  484. timestamp = entry.path.rsplit('/', 1)[-1]
  485. assert link.link_dir == entry.path
  486. assert link.timestamp == timestamp
  487. write_json_link_details(link, out_dir=entry.path)
  488. return fixed, cant_fix