| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454 |
- __package__ = 'archivebox.misc'
- import re
- import requests
- import json as pyjson
- import http.cookiejar
- from typing import List, Optional, Any, Callable
- from pathlib import Path
- from inspect import signature
- from functools import wraps
- from hashlib import sha256
- from urllib.parse import urlparse, quote, unquote
- from html import escape, unescape
- from datetime import datetime, timezone
- from dateparser import parse as dateparser
- from requests.exceptions import RequestException, ReadTimeout
- from base32_crockford import encode as base32_encode # type: ignore
- from w3lib.encoding import html_body_declared_encoding, http_content_type_encoding
- try:
- import chardet # type:ignore
- detect_encoding = lambda rawdata: chardet.detect(rawdata)["encoding"]
- except ImportError:
- detect_encoding = lambda rawdata: "utf-8"
- from archivebox.config.constants import CONSTANTS
- from .logging import COLOR_DICT
- ### Parsing Helpers
- # All of these are (str) -> str
- # shortcuts to: https://docs.python.org/3/library/urllib.parse.html#url-parsing
- scheme = lambda url: urlparse(url).scheme.lower()
- without_scheme = lambda url: urlparse(url)._replace(scheme='').geturl().strip('//')
- without_query = lambda url: urlparse(url)._replace(query='').geturl().strip('//')
- without_fragment = lambda url: urlparse(url)._replace(fragment='').geturl().strip('//')
- without_path = lambda url: urlparse(url)._replace(path='', fragment='', query='').geturl().strip('//')
- path = lambda url: urlparse(url).path
- basename = lambda url: urlparse(url).path.rsplit('/', 1)[-1]
- domain = lambda url: urlparse(url).netloc
- query = lambda url: urlparse(url).query
- fragment = lambda url: urlparse(url).fragment
- extension = lambda url: basename(url).rsplit('.', 1)[-1].lower() if '.' in basename(url) else ''
- base_url = lambda url: without_scheme(url) # uniq base url used to dedupe links
- without_www = lambda url: url.replace('://www.', '://', 1)
- without_trailing_slash = lambda url: url[:-1] if url[-1] == '/' else url.replace('/?', '?')
- hashurl = lambda url: base32_encode(int(sha256(base_url(url).encode('utf-8')).hexdigest(), 16))[:20]
- urlencode = lambda s: s and quote(s, encoding='utf-8', errors='replace')
- urldecode = lambda s: s and unquote(s)
- htmlencode = lambda s: s and escape(s, quote=True)
- htmldecode = lambda s: s and unescape(s)
- short_ts = lambda ts: str(parse_date(ts).timestamp()).split('.')[0]
- ts_to_date_str = lambda ts: ts and parse_date(ts).strftime('%Y-%m-%d %H:%M')
- ts_to_iso = lambda ts: ts and parse_date(ts).isoformat()
- COLOR_REGEX = re.compile(r'\[(?P<arg_1>\d+)(;(?P<arg_2>\d+)(;(?P<arg_3>\d+))?)?m')
- # https://mathiasbynens.be/demo/url-regex
- URL_REGEX = re.compile(
- r'(?=('
- r'http[s]?://' # start matching from allowed schemes
- r'(?:[a-zA-Z]|[0-9]' # followed by allowed alphanum characters
- r'|[-_$@.&+!*\(\),]' # or allowed symbols (keep hyphen first to match literal hyphen)
- r'|[^\u0000-\u007F])+' # or allowed unicode bytes
- r'[^\]\[<>"\'\s]+' # stop parsing at these symbols
- r'))',
- re.IGNORECASE | re.UNICODE,
- )
- def parens_are_matched(string: str, open_char='(', close_char=')'):
- """check that all parentheses in a string are balanced and nested properly"""
- count = 0
- for c in string:
- if c == open_char:
- count += 1
- elif c == close_char:
- count -= 1
- if count < 0:
- return False
- return count == 0
- def fix_url_from_markdown(url_str: str) -> str:
- """
- cleanup a regex-parsed url that may contain dangling trailing parens from markdown link syntax
- helpful to fix URLs parsed from markdown e.g.
- input: https://wikipedia.org/en/some_article_(Disambiguation).html?abc=def).somemoretext
- result: https://wikipedia.org/en/some_article_(Disambiguation).html?abc=def
- IMPORTANT ASSUMPTION: valid urls wont have unbalanced or incorrectly nested parentheses
- e.g. this will fail the user actually wants to ingest a url like 'https://example.com/some_wei)(rd_url'
- in that case it will return https://example.com/some_wei (truncated up to the first unbalanced paren)
- This assumption is true 99.9999% of the time, and for the rare edge case the user can use url_list parser.
- """
- trimmed_url = url_str
- # cut off one trailing character at a time
- # until parens are balanced e.g. /a(b)c).x(y)z -> /a(b)c
- while not parens_are_matched(trimmed_url):
- trimmed_url = trimmed_url[:-1]
-
- # make sure trimmed url is still valid
- if re.findall(URL_REGEX, trimmed_url):
- return trimmed_url
-
- return url_str
- def find_all_urls(urls_str: str):
- for url in re.findall(URL_REGEX, urls_str):
- yield fix_url_from_markdown(url)
- def is_static_file(url: str):
- # TODO: the proper way is with MIME type detection + ext, not only extension
- return extension(url).lower() in CONSTANTS.STATICFILE_EXTENSIONS
- def enforce_types(func):
- """
- Enforce function arg and kwarg types at runtime using its python3 type hints
- Simpler version of pydantic @validate_call decorator
- """
- # TODO: check return type as well
- @wraps(func)
- def typechecked_function(*args, **kwargs):
- sig = signature(func)
- def check_argument_type(arg_key, arg_val):
- try:
- annotation = sig.parameters[arg_key].annotation
- except KeyError:
- annotation = None
- if annotation is not None and annotation.__class__ is type:
- if not isinstance(arg_val, annotation):
- raise TypeError(
- '{}(..., {}: {}) got unexpected {} argument {}={}'.format(
- func.__name__,
- arg_key,
- annotation.__name__,
- type(arg_val).__name__,
- arg_key,
- str(arg_val)[:64],
- )
- )
- # check args
- for arg_val, arg_key in zip(args, sig.parameters):
- check_argument_type(arg_key, arg_val)
- # check kwargs
- for arg_key, arg_val in kwargs.items():
- check_argument_type(arg_key, arg_val)
- return func(*args, **kwargs)
- return typechecked_function
- def docstring(text: Optional[str]):
- """attach the given docstring to the decorated function"""
- def decorator(func):
- if text:
- func.__doc__ = text
- return func
- return decorator
- @enforce_types
- def str_between(string: str, start: str, end: str=None) -> str:
- """(<abc>12345</def>, <abc>, </def>) -> 12345"""
- content = string.split(start, 1)[-1]
- if end is not None:
- content = content.rsplit(end, 1)[0]
- return content
- @enforce_types
- def parse_date(date: Any) -> datetime:
- """Parse unix timestamps, iso format, and human-readable strings"""
-
- if date is None:
- return None # type: ignore
- if isinstance(date, datetime):
- if date.tzinfo is None:
- return date.replace(tzinfo=timezone.utc)
- assert date.tzinfo.utcoffset(datetime.now()).seconds == 0, 'Refusing to load a non-UTC date!'
- return date
-
- if isinstance(date, (float, int)):
- date = str(date)
- if isinstance(date, str):
- return dateparser(date, settings={'TIMEZONE': 'UTC'}).astimezone(timezone.utc)
- raise ValueError('Tried to parse invalid date! {}'.format(date))
- @enforce_types
- def download_url(url: str, timeout: int=None) -> str:
- """Download the contents of a remote url and return the text"""
- from archivebox.config.common import ARCHIVING_CONFIG
- timeout = timeout or ARCHIVING_CONFIG.TIMEOUT
- session = requests.Session()
- if ARCHIVING_CONFIG.COOKIES_FILE and Path(ARCHIVING_CONFIG.COOKIES_FILE).is_file():
- cookie_jar = http.cookiejar.MozillaCookieJar(ARCHIVING_CONFIG.COOKIES_FILE)
- cookie_jar.load(ignore_discard=True, ignore_expires=True)
- for cookie in cookie_jar:
- session.cookies.set(cookie.name, cookie.value, domain=cookie.domain, path=cookie.path)
- response = session.get(
- url,
- headers={'User-Agent': ARCHIVING_CONFIG.USER_AGENT},
- verify=ARCHIVING_CONFIG.CHECK_SSL_VALIDITY,
- timeout=timeout,
- )
- content_type = response.headers.get('Content-Type', '')
- encoding = http_content_type_encoding(content_type) or html_body_declared_encoding(response.text)
- if encoding is not None:
- response.encoding = encoding
- try:
- return response.text
- except UnicodeDecodeError:
- # if response is non-test (e.g. image or other binary files), just return the filename instead
- return url.rsplit('/', 1)[-1]
- @enforce_types
- def get_headers(url: str, timeout: int | None=None) -> str:
- """Download the contents of a remote url and return the headers"""
- # TODO: get rid of this and use an abx pluggy hook instead
-
- from archivebox.config.common import ARCHIVING_CONFIG
-
- timeout = timeout or ARCHIVING_CONFIG.TIMEOUT
- try:
- response = requests.head(
- url,
- headers={'User-Agent': ARCHIVING_CONFIG.USER_AGENT},
- verify=ARCHIVING_CONFIG.CHECK_SSL_VALIDITY,
- timeout=timeout,
- allow_redirects=True,
- )
- if response.status_code >= 400:
- raise RequestException
- except ReadTimeout:
- raise
- except RequestException:
- response = requests.get(
- url,
- headers={'User-Agent': ARCHIVING_CONFIG.USER_AGENT},
- verify=ARCHIVING_CONFIG.CHECK_SSL_VALIDITY,
- timeout=timeout,
- stream=True
- )
-
- return pyjson.dumps(
- {
- 'URL': url,
- 'Status-Code': response.status_code,
- 'Elapsed': response.elapsed.total_seconds()*1000,
- 'Encoding': str(response.encoding),
- 'Apparent-Encoding': response.apparent_encoding,
- **dict(response.headers),
- },
- indent=4,
- )
- @enforce_types
- def ansi_to_html(text: str) -> str:
- """
- Based on: https://stackoverflow.com/questions/19212665/python-converting-ansi-color-codes-to-html
- Simple way to render colored CLI stdout/stderr in HTML properly, Textual/rich is probably better though.
- """
- TEMPLATE = '<span style="color: rgb{}"><br>'
- text = text.replace('[m', '</span>')
- def single_sub(match):
- argsdict = match.groupdict()
- if argsdict['arg_3'] is None:
- if argsdict['arg_2'] is None:
- _, color = 0, argsdict['arg_1']
- else:
- _, color = argsdict['arg_1'], argsdict['arg_2']
- else:
- _, color = argsdict['arg_3'], argsdict['arg_2']
- return TEMPLATE.format(COLOR_DICT[color][0])
- return COLOR_REGEX.sub(single_sub, text)
- @enforce_types
- def dedupe(options: List[str]) -> List[str]:
- """
- Deduplicates the given CLI args by key=value. Options that come later override earlier.
- """
- deduped = {}
- for option in options:
- key = option.split('=')[0]
- deduped[key] = option
- return list(deduped.values())
- class ExtendedEncoder(pyjson.JSONEncoder):
- """
- Extended json serializer that supports serializing several model
- fields and objects
- """
- def default(self, obj):
- cls_name = obj.__class__.__name__
- if hasattr(obj, '_asdict'):
- return obj._asdict()
- elif isinstance(obj, bytes):
- return obj.decode()
- elif isinstance(obj, datetime):
- return obj.isoformat()
- elif isinstance(obj, Exception):
- return '{}: {}'.format(obj.__class__.__name__, obj)
-
- elif isinstance(obj, Path):
- return str(obj)
-
- elif cls_name in ('dict_items', 'dict_keys', 'dict_values'):
- return tuple(obj)
-
- elif isinstance(obj, Callable):
- return str(obj)
- return pyjson.JSONEncoder.default(self, obj)
- ### URL PARSING TESTS / ASSERTIONS
- # Check that plain text regex URL parsing works as expected
- # this is last-line-of-defense to make sure the URL_REGEX isn't
- # misbehaving due to some OS-level or environment level quirks (e.g. regex engine / cpython / locale differences)
- # the consequences of bad URL parsing could be disastrous and lead to many
- # incorrect/badly parsed links being added to the archive, so this is worth the cost of checking
- assert fix_url_from_markdown('http://example.com/a(b)c).x(y)z') == 'http://example.com/a(b)c'
- assert fix_url_from_markdown('https://wikipedia.org/en/some_article_(Disambiguation).html?abc=def).link(with)_trailingtext') == 'https://wikipedia.org/en/some_article_(Disambiguation).html?abc=def'
- URL_REGEX_TESTS = [
- ('https://example.com', ['https://example.com']),
- ('http://abc-file234example.com/abc?def=abc&23423=sdfsdf#abc=234&234=a234', ['http://abc-file234example.com/abc?def=abc&23423=sdfsdf#abc=234&234=a234']),
- ('https://twitter.com/share?url=https://akaao.success-corp.co.jp&text=ア@サ!ト&hashtags=ア%オ,元+ア.ア-オ_イ*シ$ロ abc', ['https://twitter.com/share?url=https://akaao.success-corp.co.jp&text=ア@サ!ト&hashtags=ア%オ,元+ア.ア-オ_イ*シ$ロ', 'https://akaao.success-corp.co.jp&text=ア@サ!ト&hashtags=ア%オ,元+ア.ア-オ_イ*シ$ロ']),
- ('<a href="https://twitter.com/share#url=https://akaao.success-corp.co.jp&text=ア@サ!ト?hashtags=ア%オ,元+ア&abc=.ア-オ_イ*シ$ロ"> abc', ['https://twitter.com/share#url=https://akaao.success-corp.co.jp&text=ア@サ!ト?hashtags=ア%オ,元+ア&abc=.ア-オ_イ*シ$ロ', 'https://akaao.success-corp.co.jp&text=ア@サ!ト?hashtags=ア%オ,元+ア&abc=.ア-オ_イ*シ$ロ']),
- ('///a', []),
- ('http://', []),
- ('http://../', ['http://../']),
- ('http://-error-.invalid/', ['http://-error-.invalid/']),
- ('https://a(b)c+1#2?3&4/', ['https://a(b)c+1#2?3&4/']),
- ('http://उदाहरण.परीक्षा', ['http://उदाहरण.परीक्षा']),
- ('http://例子.测试', ['http://例子.测试']),
- ('http://➡.ws/䨹 htps://abc.1243?234', ['http://➡.ws/䨹']),
- ('http://⌘.ws">https://exa+mple.com//:abc ', ['http://⌘.ws', 'https://exa+mple.com//:abc']),
- ('http://مثال.إختبار/abc?def=ت&ب=abc#abc=234', ['http://مثال.إختبار/abc?def=ت&ب=abc#abc=234']),
- ('http://-.~_!$&()*+,;=:%40:80%2f::::::@example.c\'om', ['http://-.~_!$&()*+,;=:%40:80%2f::::::@example.c']),
-
- ('http://us:[email protected]:42/http://ex.co:19/a?_d=4#-a=2.3', ['http://us:[email protected]:42/http://ex.co:19/a?_d=4#-a=2.3', 'http://ex.co:19/a?_d=4#-a=2.3']),
- ('http://code.google.com/events/#&product=browser', ['http://code.google.com/events/#&product=browser']),
- ('http://foo.bar?q=Spaces should be encoded', ['http://foo.bar?q=Spaces']),
- ('http://foo.com/blah_(wikipedia)#c(i)t[e]-1', ['http://foo.com/blah_(wikipedia)#c(i)t']),
- ('http://foo.com/(something)?after=parens', ['http://foo.com/(something)?after=parens']),
- ('http://foo.com/unicode_(✪)_in_parens) abc', ['http://foo.com/unicode_(✪)_in_parens']),
- ('http://foo.bar/?q=Test%20URL-encoded%20stuff', ['http://foo.bar/?q=Test%20URL-encoded%20stuff']),
- ('[xyz](http://a.b/?q=(Test)%20U)RL-encoded%20stuff', ['http://a.b/?q=(Test)%20U']),
- ('[xyz](http://a.b/?q=(Test)%20U)-ab https://abc+123', ['http://a.b/?q=(Test)%20U', 'https://abc+123']),
- ('[xyz](http://a.b/?q=(Test)%20U) https://a(b)c+12)3', ['http://a.b/?q=(Test)%20U', 'https://a(b)c+12']),
- ('[xyz](http://a.b/?q=(Test)a\nabchttps://a(b)c+12)3', ['http://a.b/?q=(Test)a', 'https://a(b)c+12']),
- ('http://foo.bar/?q=Test%20URL-encoded%20stuff', ['http://foo.bar/?q=Test%20URL-encoded%20stuff']),
- ]
- for urls_str, expected_url_matches in URL_REGEX_TESTS:
- url_matches = list(find_all_urls(urls_str))
- assert url_matches == expected_url_matches, 'FAILED URL_REGEX CHECK!'
- # More test cases
- _test_url_strs = {
- 'example.com': 0,
- '/example.com': 0,
- '//example.com': 0,
- ':/example.com': 0,
- '://example.com': 0,
- 'htt://example8.com': 0,
- '/htt://example.com': 0,
- 'https://example': 1,
- 'https://localhost/2345': 1,
- 'https://localhost:1234/123': 1,
- '://': 0,
- 'https://': 0,
- 'http://': 0,
- 'ftp://': 0,
- 'ftp://example.com': 0,
- 'https://example.com': 1,
- 'https://example.com/': 1,
- 'https://a.example.com': 1,
- 'https://a.example.com/': 1,
- 'https://a.example.com/what/is/happening.html': 1,
- 'https://a.example.com/what/ís/happening.html': 1,
- 'https://a.example.com/what/is/happening.html?what=1&2%20b#höw-about-this=1a': 1,
- 'https://a.example.com/what/is/happéning/?what=1&2%20b#how-aboüt-this=1a': 1,
- 'HTtpS://a.example.com/what/is/happening/?what=1&2%20b#how-about-this=1af&2f%20b': 1,
- 'https://example.com/?what=1#how-about-this=1&2%20baf': 1,
- 'https://example.com?what=1#how-about-this=1&2%20baf': 1,
- '<test>http://example7.com</test>': 1,
- 'https://<test>': 0,
- 'https://[test]': 0,
- 'http://"test"': 0,
- 'http://\'test\'': 0,
- '[https://example8.com/what/is/this.php?what=1]': 1,
- '[and http://example9.com?what=1&other=3#and-thing=2]': 1,
- '<what>https://example10.com#and-thing=2 "</about>': 1,
- 'abc<this["https://example11.com/what/is#and-thing=2?whoami=23&where=1"]that>def': 1,
- 'sdflkf[what](https://example12.com/who/what.php?whoami=1#whatami=2)?am=hi': 1,
- '<or>http://examplehttp://15.badc</that>': 2,
- 'https://a.example.com/one.html?url=http://example.com/inside/of/another?=http://': 2,
- '[https://a.example.com/one.html?url=http://example.com/inside/of/another?=](http://a.example.com)': 3,
- }
- for url_str, num_urls in _test_url_strs.items():
- assert len(list(find_all_urls(url_str))) == num_urls, (
- f'{url_str} does not contain {num_urls} urls')
|