| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293 |
- from typing import Iterable
- from collections import OrderedDict
- from .schema import Link
- from .util import (
- scheme,
- fuzzy_url,
- merge_links,
- )
- from .config import URL_BLACKLIST_PTN
- def validate_links(links: Iterable[Link]) -> Iterable[Link]:
- links = archivable_links(links) # remove chrome://, about:, mailto: etc.
- links = sorted_links(links) # deterministically sort the links based on timstamp, url
- links = uniquefied_links(links) # merge/dedupe duplicate timestamps & urls
- if not links:
- print('[X] No links found :(')
- raise SystemExit(1)
- return links
- def archivable_links(links: Iterable[Link]) -> Iterable[Link]:
- """remove chrome://, about:// or other schemed links that cant be archived"""
- for link in links:
- scheme_is_valid = scheme(link.url) in ('http', 'https', 'ftp')
- not_blacklisted = (not URL_BLACKLIST_PTN.match(link.url)) if URL_BLACKLIST_PTN else True
- if scheme_is_valid and not_blacklisted:
- yield link
- def uniquefied_links(sorted_links: Iterable[Link]) -> Iterable[Link]:
- """
- ensures that all non-duplicate links have monotonically increasing timestamps
- """
- unique_urls: OrderedDict[str, Link] = OrderedDict()
- for link in sorted_links:
- fuzzy = fuzzy_url(link.url)
- if fuzzy in unique_urls:
- # merge with any other links that share the same url
- link = merge_links(unique_urls[fuzzy], link)
- unique_urls[fuzzy] = link
- unique_timestamps: OrderedDict[str, Link] = OrderedDict()
- for link in unique_urls.values():
- new_link = link.overwrite(
- timestamp=lowest_uniq_timestamp(unique_timestamps, link.timestamp),
- )
- unique_timestamps[new_link.timestamp] = new_link
- return unique_timestamps.values()
- def sorted_links(links: Iterable[Link]) -> Iterable[Link]:
- sort_func = lambda link: (link.timestamp.split('.', 1)[0], link.url)
- return sorted(links, key=sort_func, reverse=True)
- def links_after_timestamp(links: Iterable[Link], resume: float=None) -> Iterable[Link]:
- if not resume:
- yield from links
- return
- for link in links:
- try:
- if float(link.timestamp) <= resume:
- yield link
- except (ValueError, TypeError):
- print('Resume value and all timestamp values must be valid numbers.')
- def lowest_uniq_timestamp(used_timestamps: OrderedDict, timestamp: str) -> str:
- """resolve duplicate timestamps by appending a decimal 1234, 1234 -> 1234.1, 1234.2"""
- timestamp = timestamp.split('.')[0]
- nonce = 0
- # first try 152323423 before 152323423.0
- if timestamp not in used_timestamps:
- return timestamp
- new_timestamp = '{}.{}'.format(timestamp, nonce)
- while new_timestamp in used_timestamps:
- nonce += 1
- new_timestamp = '{}.{}'.format(timestamp, nonce)
- return new_timestamp
-
-
|