links.py 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. from typing import Iterable
  2. from collections import OrderedDict
  3. from .schema import Link
  4. from .util import (
  5. scheme,
  6. fuzzy_url,
  7. merge_links,
  8. )
  9. from .config import URL_BLACKLIST_PTN
  10. def validate_links(links: Iterable[Link]) -> Iterable[Link]:
  11. links = archivable_links(links) # remove chrome://, about:, mailto: etc.
  12. links = sorted_links(links) # deterministically sort the links based on timstamp, url
  13. links = uniquefied_links(links) # merge/dedupe duplicate timestamps & urls
  14. if not links:
  15. print('[X] No links found :(')
  16. raise SystemExit(1)
  17. return links
  18. def archivable_links(links: Iterable[Link]) -> Iterable[Link]:
  19. """remove chrome://, about:// or other schemed links that cant be archived"""
  20. for link in links:
  21. scheme_is_valid = scheme(link.url) in ('http', 'https', 'ftp')
  22. not_blacklisted = (not URL_BLACKLIST_PTN.match(link.url)) if URL_BLACKLIST_PTN else True
  23. if scheme_is_valid and not_blacklisted:
  24. yield link
  25. def uniquefied_links(sorted_links: Iterable[Link]) -> Iterable[Link]:
  26. """
  27. ensures that all non-duplicate links have monotonically increasing timestamps
  28. """
  29. unique_urls: OrderedDict[str, Link] = OrderedDict()
  30. for link in sorted_links:
  31. fuzzy = fuzzy_url(link.url)
  32. if fuzzy in unique_urls:
  33. # merge with any other links that share the same url
  34. link = merge_links(unique_urls[fuzzy], link)
  35. unique_urls[fuzzy] = link
  36. unique_timestamps: OrderedDict[str, Link] = OrderedDict()
  37. for link in unique_urls.values():
  38. new_link = link.overwrite(
  39. timestamp=lowest_uniq_timestamp(unique_timestamps, link.timestamp),
  40. )
  41. unique_timestamps[new_link.timestamp] = new_link
  42. return unique_timestamps.values()
  43. def sorted_links(links: Iterable[Link]) -> Iterable[Link]:
  44. sort_func = lambda link: (link.timestamp.split('.', 1)[0], link.url)
  45. return sorted(links, key=sort_func, reverse=True)
  46. def links_after_timestamp(links: Iterable[Link], resume: float=None) -> Iterable[Link]:
  47. if not resume:
  48. yield from links
  49. return
  50. for link in links:
  51. try:
  52. if float(link.timestamp) <= resume:
  53. yield link
  54. except (ValueError, TypeError):
  55. print('Resume value and all timestamp values must be valid numbers.')
  56. def lowest_uniq_timestamp(used_timestamps: OrderedDict, timestamp: str) -> str:
  57. """resolve duplicate timestamps by appending a decimal 1234, 1234 -> 1234.1, 1234.2"""
  58. timestamp = timestamp.split('.')[0]
  59. nonce = 0
  60. # first try 152323423 before 152323423.0
  61. if timestamp not in used_timestamps:
  62. return timestamp
  63. new_timestamp = '{}.{}'.format(timestamp, nonce)
  64. while new_timestamp in used_timestamps:
  65. nonce += 1
  66. new_timestamp = '{}.{}'.format(timestamp, nonce)
  67. return new_timestamp