util.py 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288
  1. __package__ = 'archivebox'
  2. import re
  3. from pathlib import Path
  4. import json as pyjson
  5. from typing import List, Optional, Any
  6. from inspect import signature
  7. from functools import wraps
  8. from hashlib import sha256
  9. from urllib.parse import urlparse, quote, unquote
  10. from html import escape, unescape
  11. from datetime import datetime
  12. from dateparser import parse as dateparser
  13. import requests
  14. from base32_crockford import encode as base32_encode # type: ignore
  15. from w3lib.encoding import html_body_declared_encoding, http_content_type_encoding
  16. try:
  17. import chardet
  18. detect_encoding = lambda rawdata: chardet.detect(rawdata)["encoding"]
  19. except ImportError:
  20. detect_encoding = lambda rawdata: "utf-8"
  21. ### Parsing Helpers
  22. # All of these are (str) -> str
  23. # shortcuts to: https://docs.python.org/3/library/urllib.parse.html#url-parsing
  24. scheme = lambda url: urlparse(url).scheme.lower()
  25. without_scheme = lambda url: urlparse(url)._replace(scheme='').geturl().strip('//')
  26. without_query = lambda url: urlparse(url)._replace(query='').geturl().strip('//')
  27. without_fragment = lambda url: urlparse(url)._replace(fragment='').geturl().strip('//')
  28. without_path = lambda url: urlparse(url)._replace(path='', fragment='', query='').geturl().strip('//')
  29. path = lambda url: urlparse(url).path
  30. basename = lambda url: urlparse(url).path.rsplit('/', 1)[-1]
  31. domain = lambda url: urlparse(url).netloc
  32. query = lambda url: urlparse(url).query
  33. fragment = lambda url: urlparse(url).fragment
  34. extension = lambda url: basename(url).rsplit('.', 1)[-1].lower() if '.' in basename(url) else ''
  35. base_url = lambda url: without_scheme(url) # uniq base url used to dedupe links
  36. without_www = lambda url: url.replace('://www.', '://', 1)
  37. without_trailing_slash = lambda url: url[:-1] if url[-1] == '/' else url.replace('/?', '?')
  38. hashurl = lambda url: base32_encode(int(sha256(base_url(url).encode('utf-8')).hexdigest(), 16))[:20]
  39. urlencode = lambda s: s and quote(s, encoding='utf-8', errors='replace')
  40. urldecode = lambda s: s and unquote(s)
  41. htmlencode = lambda s: s and escape(s, quote=True)
  42. htmldecode = lambda s: s and unescape(s)
  43. short_ts = lambda ts: str(parse_date(ts).timestamp()).split('.')[0]
  44. ts_to_date = lambda ts: ts and parse_date(ts).strftime('%Y-%m-%d %H:%M')
  45. ts_to_iso = lambda ts: ts and parse_date(ts).isoformat()
  46. URL_REGEX = re.compile(
  47. r'http[s]?://' # start matching from allowed schemes
  48. r'(?:[a-zA-Z]|[0-9]' # followed by allowed alphanum characters
  49. r'|[$-_@.&+]|[!*\(\),]' # or allowed symbols
  50. r'|(?:%[0-9a-fA-F][0-9a-fA-F]))' # or allowed unicode bytes
  51. r'[^\]\[\(\)<>"\'\s]+', # stop parsing at these symbols
  52. re.IGNORECASE,
  53. )
  54. COLOR_REGEX = re.compile(r'\[(?P<arg_1>\d+)(;(?P<arg_2>\d+)(;(?P<arg_3>\d+))?)?m')
  55. def is_static_file(url: str):
  56. # TODO: the proper way is with MIME type detection + ext, not only extension
  57. from .config import STATICFILE_EXTENSIONS
  58. return extension(url).lower() in STATICFILE_EXTENSIONS
  59. def enforce_types(func):
  60. """
  61. Enforce function arg and kwarg types at runtime using its python3 type hints
  62. """
  63. # TODO: check return type as well
  64. @wraps(func)
  65. def typechecked_function(*args, **kwargs):
  66. sig = signature(func)
  67. def check_argument_type(arg_key, arg_val):
  68. try:
  69. annotation = sig.parameters[arg_key].annotation
  70. except KeyError:
  71. annotation = None
  72. if annotation is not None and annotation.__class__ is type:
  73. if not isinstance(arg_val, annotation):
  74. raise TypeError(
  75. '{}(..., {}: {}) got unexpected {} argument {}={}'.format(
  76. func.__name__,
  77. arg_key,
  78. annotation.__name__,
  79. type(arg_val).__name__,
  80. arg_key,
  81. str(arg_val)[:64],
  82. )
  83. )
  84. # check args
  85. for arg_val, arg_key in zip(args, sig.parameters):
  86. check_argument_type(arg_key, arg_val)
  87. # check kwargs
  88. for arg_key, arg_val in kwargs.items():
  89. check_argument_type(arg_key, arg_val)
  90. return func(*args, **kwargs)
  91. return typechecked_function
  92. def docstring(text: Optional[str]):
  93. """attach the given docstring to the decorated function"""
  94. def decorator(func):
  95. if text:
  96. func.__doc__ = text
  97. return func
  98. return decorator
  99. @enforce_types
  100. def str_between(string: str, start: str, end: str=None) -> str:
  101. """(<abc>12345</def>, <abc>, </def>) -> 12345"""
  102. content = string.split(start, 1)[-1]
  103. if end is not None:
  104. content = content.rsplit(end, 1)[0]
  105. return content
  106. @enforce_types
  107. def parse_date(date: Any) -> Optional[datetime]:
  108. """Parse unix timestamps, iso format, and human-readable strings"""
  109. if date is None:
  110. return None
  111. if isinstance(date, datetime):
  112. return date
  113. if isinstance(date, (float, int)):
  114. date = str(date)
  115. if isinstance(date, str):
  116. return dateparser(date)
  117. raise ValueError('Tried to parse invalid date! {}'.format(date))
  118. @enforce_types
  119. def download_url(url: str, timeout: int=None) -> str:
  120. """Download the contents of a remote url and return the text"""
  121. from .config import TIMEOUT, CHECK_SSL_VALIDITY, WGET_USER_AGENT
  122. timeout = timeout or TIMEOUT
  123. response = requests.get(
  124. url,
  125. headers={'User-Agent': WGET_USER_AGENT},
  126. verify=CHECK_SSL_VALIDITY,
  127. timeout=timeout,
  128. )
  129. content_type = response.headers.get('Content-Type', '')
  130. encoding = http_content_type_encoding(content_type) or html_body_declared_encoding(response.text)
  131. if encoding is not None:
  132. response.encoding = encoding
  133. return response.text
  134. @enforce_types
  135. def chrome_args(**options) -> List[str]:
  136. """helper to build up a chrome shell command with arguments"""
  137. from .config import CHROME_OPTIONS
  138. options = {**CHROME_OPTIONS, **options}
  139. cmd_args = [options['CHROME_BINARY']]
  140. if options['CHROME_HEADLESS']:
  141. cmd_args += ('--headless',)
  142. if not options['CHROME_SANDBOX']:
  143. # assume this means we are running inside a docker container
  144. # in docker, GPU support is limited, sandboxing is unecessary,
  145. # and SHM is limited to 64MB by default (which is too low to be usable).
  146. cmd_args += (
  147. '--no-sandbox',
  148. '--disable-gpu',
  149. '--disable-dev-shm-usage',
  150. '--disable-software-rasterizer',
  151. )
  152. if not options['CHECK_SSL_VALIDITY']:
  153. cmd_args += ('--disable-web-security', '--ignore-certificate-errors')
  154. if options['CHROME_USER_AGENT']:
  155. cmd_args += ('--user-agent={}'.format(options['CHROME_USER_AGENT']),)
  156. if options['RESOLUTION']:
  157. cmd_args += ('--window-size={}'.format(options['RESOLUTION']),)
  158. if options['TIMEOUT']:
  159. cmd_args += ('--timeout={}'.format((options['TIMEOUT']) * 1000),)
  160. if options['CHROME_USER_DATA_DIR']:
  161. cmd_args.append('--user-data-dir={}'.format(options['CHROME_USER_DATA_DIR']))
  162. return cmd_args
  163. def ansi_to_html(text):
  164. """
  165. Based on: https://stackoverflow.com/questions/19212665/python-converting-ansi-color-codes-to-html
  166. """
  167. from .config import COLOR_DICT
  168. TEMPLATE = '<span style="color: rgb{}"><br>'
  169. text = text.replace('[m', '</span>')
  170. def single_sub(match):
  171. argsdict = match.groupdict()
  172. if argsdict['arg_3'] is None:
  173. if argsdict['arg_2'] is None:
  174. _, color = 0, argsdict['arg_1']
  175. else:
  176. _, color = argsdict['arg_1'], argsdict['arg_2']
  177. else:
  178. _, color = argsdict['arg_3'], argsdict['arg_2']
  179. return TEMPLATE.format(COLOR_DICT[color][0])
  180. return COLOR_REGEX.sub(single_sub, text)
  181. class AttributeDict(dict):
  182. """Helper to allow accessing dict values via Example.key or Example['key']"""
  183. def __init__(self, *args, **kwargs):
  184. super().__init__(*args, **kwargs)
  185. # Recursively convert nested dicts to AttributeDicts (optional):
  186. # for key, val in self.items():
  187. # if isinstance(val, dict) and type(val) is not AttributeDict:
  188. # self[key] = AttributeDict(val)
  189. def __getattr__(self, attr: str) -> Any:
  190. return dict.__getitem__(self, attr)
  191. def __setattr__(self, attr: str, value: Any) -> None:
  192. return dict.__setitem__(self, attr, value)
  193. class ExtendedEncoder(pyjson.JSONEncoder):
  194. """
  195. Extended json serializer that supports serializing several model
  196. fields and objects
  197. """
  198. def default(self, obj):
  199. cls_name = obj.__class__.__name__
  200. if hasattr(obj, '_asdict'):
  201. return obj._asdict()
  202. elif isinstance(obj, bytes):
  203. return obj.decode()
  204. elif isinstance(obj, datetime):
  205. return obj.isoformat()
  206. elif isinstance(obj, Exception):
  207. return '{}: {}'.format(obj.__class__.__name__, obj)
  208. elif isinstance(obj, Path):
  209. return str(obj)
  210. elif cls_name in ('dict_items', 'dict_keys', 'dict_values'):
  211. return tuple(obj)
  212. return pyjson.JSONEncoder.default(self, obj)