util.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355
  1. __package__ = 'archivebox'
  2. import re
  3. import requests
  4. import json as pyjson
  5. from typing import List, Optional, Any
  6. from pathlib import Path
  7. from inspect import signature
  8. from functools import wraps
  9. from hashlib import sha256
  10. from urllib.parse import urlparse, quote, unquote
  11. from html import escape, unescape
  12. from datetime import datetime, timezone
  13. from dateparser import parse as dateparser
  14. from requests.exceptions import RequestException, ReadTimeout
  15. from .vendor.base32_crockford import encode as base32_encode # type: ignore
  16. from w3lib.encoding import html_body_declared_encoding, http_content_type_encoding
  17. from os.path import lexists
  18. from os import remove as remove_file
  19. try:
  20. import chardet
  21. detect_encoding = lambda rawdata: chardet.detect(rawdata)["encoding"]
  22. except ImportError:
  23. detect_encoding = lambda rawdata: "utf-8"
  24. ### Parsing Helpers
  25. # All of these are (str) -> str
  26. # shortcuts to: https://docs.python.org/3/library/urllib.parse.html#url-parsing
  27. scheme = lambda url: urlparse(url).scheme.lower()
  28. without_scheme = lambda url: urlparse(url)._replace(scheme='').geturl().strip('//')
  29. without_query = lambda url: urlparse(url)._replace(query='').geturl().strip('//')
  30. without_fragment = lambda url: urlparse(url)._replace(fragment='').geturl().strip('//')
  31. without_path = lambda url: urlparse(url)._replace(path='', fragment='', query='').geturl().strip('//')
  32. path = lambda url: urlparse(url).path
  33. basename = lambda url: urlparse(url).path.rsplit('/', 1)[-1]
  34. domain = lambda url: urlparse(url).netloc
  35. query = lambda url: urlparse(url).query
  36. fragment = lambda url: urlparse(url).fragment
  37. extension = lambda url: basename(url).rsplit('.', 1)[-1].lower() if '.' in basename(url) else ''
  38. base_url = lambda url: without_scheme(url) # uniq base url used to dedupe links
  39. without_www = lambda url: url.replace('://www.', '://', 1)
  40. without_trailing_slash = lambda url: url[:-1] if url[-1] == '/' else url.replace('/?', '?')
  41. hashurl = lambda url: base32_encode(int(sha256(base_url(url).encode('utf-8')).hexdigest(), 16))[:20]
  42. urlencode = lambda s: s and quote(s, encoding='utf-8', errors='replace')
  43. urldecode = lambda s: s and unquote(s)
  44. htmlencode = lambda s: s and escape(s, quote=True)
  45. htmldecode = lambda s: s and unescape(s)
  46. short_ts = lambda ts: str(parse_date(ts).timestamp()).split('.')[0]
  47. ts_to_date_str = lambda ts: ts and parse_date(ts).strftime('%Y-%m-%d %H:%M')
  48. ts_to_iso = lambda ts: ts and parse_date(ts).isoformat()
  49. URL_REGEX = re.compile(
  50. r'(?=('
  51. r'https?://' #match schemes http and https,but can't match ftp
  52. r'(?:[A-Za-z0-9-]+\.)*[A-Za-z0-9-]+'#match domain
  53. r'[^\\#\f\n\r\t\v?&]*' #exclude '#' because don't need fragment,
  54. #exclude '?' and '&' because url is invalid when '&' appear before '?'
  55. r'(?:\?[^\\#\f\n\r\t\v]*)*'
  56. r'))',
  57. )
  58. COLOR_REGEX = re.compile(r'\[(?P<arg_1>\d+)(;(?P<arg_2>\d+)(;(?P<arg_3>\d+))?)?m')
  59. def is_static_file(url: str):
  60. # TODO: the proper way is with MIME type detection + ext, not only extension
  61. from .config import STATICFILE_EXTENSIONS
  62. return extension(url).lower() in STATICFILE_EXTENSIONS
  63. def enforce_types(func):
  64. """
  65. Enforce function arg and kwarg types at runtime using its python3 type hints
  66. """
  67. # TODO: check return type as well
  68. @wraps(func)
  69. def typechecked_function(*args, **kwargs):
  70. sig = signature(func)
  71. def check_argument_type(arg_key, arg_val):
  72. try:
  73. annotation = sig.parameters[arg_key].annotation
  74. except KeyError:
  75. annotation = None
  76. if annotation is not None and annotation.__class__ is type:
  77. if not isinstance(arg_val, annotation):
  78. raise TypeError(
  79. '{}(..., {}: {}) got unexpected {} argument {}={}'.format(
  80. func.__name__,
  81. arg_key,
  82. annotation.__name__,
  83. type(arg_val).__name__,
  84. arg_key,
  85. str(arg_val)[:64],
  86. )
  87. )
  88. # check args
  89. for arg_val, arg_key in zip(args, sig.parameters):
  90. check_argument_type(arg_key, arg_val)
  91. # check kwargs
  92. for arg_key, arg_val in kwargs.items():
  93. check_argument_type(arg_key, arg_val)
  94. return func(*args, **kwargs)
  95. return typechecked_function
  96. def docstring(text: Optional[str]):
  97. """attach the given docstring to the decorated function"""
  98. def decorator(func):
  99. if text:
  100. func.__doc__ = text
  101. return func
  102. return decorator
  103. @enforce_types
  104. def str_between(string: str, start: str, end: str=None) -> str:
  105. """(<abc>12345</def>, <abc>, </def>) -> 12345"""
  106. content = string.split(start, 1)[-1]
  107. if end is not None:
  108. content = content.rsplit(end, 1)[0]
  109. return content
  110. @enforce_types
  111. def parse_date(date: Any) -> Optional[datetime]:
  112. """Parse unix timestamps, iso format, and human-readable strings"""
  113. if date is None:
  114. return None
  115. if isinstance(date, datetime):
  116. if date.tzinfo is None:
  117. return date.replace(tzinfo=timezone.utc)
  118. assert date.tzinfo.utcoffset(datetime.now()).seconds == 0, 'Refusing to load a non-UTC date!'
  119. return date
  120. if isinstance(date, (float, int)):
  121. date = str(date)
  122. if isinstance(date, str):
  123. return dateparser(date, settings={'TIMEZONE': 'UTC'}).replace(tzinfo=timezone.utc)
  124. raise ValueError('Tried to parse invalid date! {}'.format(date))
  125. @enforce_types
  126. def download_url(url: str, timeout: int=None) -> str:
  127. """Download the contents of a remote url and return the text"""
  128. from .config import TIMEOUT, CHECK_SSL_VALIDITY, WGET_USER_AGENT
  129. timeout = timeout or TIMEOUT
  130. response = requests.get(
  131. url,
  132. headers={'User-Agent': WGET_USER_AGENT},
  133. verify=CHECK_SSL_VALIDITY,
  134. timeout=timeout,
  135. )
  136. content_type = response.headers.get('Content-Type', '')
  137. encoding = http_content_type_encoding(content_type) or html_body_declared_encoding(response.text)
  138. if encoding is not None:
  139. response.encoding = encoding
  140. return response.text
  141. @enforce_types
  142. def get_headers(url: str, timeout: int=None) -> str:
  143. """Download the contents of a remote url and return the headers"""
  144. from .config import TIMEOUT, CHECK_SSL_VALIDITY, WGET_USER_AGENT
  145. timeout = timeout or TIMEOUT
  146. try:
  147. response = requests.head(
  148. url,
  149. headers={'User-Agent': WGET_USER_AGENT},
  150. verify=CHECK_SSL_VALIDITY,
  151. timeout=timeout,
  152. allow_redirects=True,
  153. )
  154. if response.status_code >= 400:
  155. raise RequestException
  156. except ReadTimeout:
  157. raise
  158. except RequestException:
  159. response = requests.get(
  160. url,
  161. headers={'User-Agent': WGET_USER_AGENT},
  162. verify=CHECK_SSL_VALIDITY,
  163. timeout=timeout,
  164. stream=True
  165. )
  166. return pyjson.dumps(
  167. {
  168. 'Status-Code': response.status_code,
  169. **dict(response.headers),
  170. },
  171. indent=4,
  172. )
  173. @enforce_types
  174. def chrome_args(**options) -> List[str]:
  175. """helper to build up a chrome shell command with arguments"""
  176. from .config import CHROME_OPTIONS, CHROME_VERSION
  177. options = {**CHROME_OPTIONS, **options}
  178. if not options['CHROME_BINARY']:
  179. raise Exception('Could not find any CHROME_BINARY installed on your system')
  180. cmd_args = [options['CHROME_BINARY']]
  181. if options['CHROME_HEADLESS']:
  182. chrome_major_version = int(re.search(r'\s(\d+)\.\d', CHROME_VERSION)[1])
  183. if chrome_major_version >= 111:
  184. cmd_args += ("--headless=new",)
  185. else:
  186. cmd_args += ('--headless',)
  187. if not options['CHROME_SANDBOX']:
  188. # assume this means we are running inside a docker container
  189. # in docker, GPU support is limited, sandboxing is unecessary,
  190. # and SHM is limited to 64MB by default (which is too low to be usable).
  191. cmd_args += (
  192. "--no-sandbox",
  193. "--no-zygote",
  194. "--disable-dev-shm-usage",
  195. "--disable-software-rasterizer",
  196. "--run-all-compositor-stages-before-draw",
  197. "--hide-scrollbars",
  198. "--window-size=1440,2000",
  199. "--autoplay-policy=no-user-gesture-required",
  200. "--no-first-run",
  201. "--use-fake-ui-for-media-stream",
  202. "--use-fake-device-for-media-stream",
  203. "--disable-sync",
  204. )
  205. if not options['CHECK_SSL_VALIDITY']:
  206. cmd_args += ('--disable-web-security', '--ignore-certificate-errors')
  207. if options['CHROME_USER_AGENT']:
  208. cmd_args += ('--user-agent={}'.format(options['CHROME_USER_AGENT']),)
  209. if options['RESOLUTION']:
  210. cmd_args += ('--window-size={}'.format(options['RESOLUTION']),)
  211. if options['CHROME_TIMEOUT']:
  212. cmd_args += ('--timeout={}'.format(options['CHROME_TIMEOUT'] * 1000),)
  213. if options['CHROME_USER_DATA_DIR']:
  214. cmd_args.append('--user-data-dir={}'.format(options['CHROME_USER_DATA_DIR']))
  215. return cmd_args
  216. def chrome_cleanup():
  217. """
  218. Cleans up any state or runtime files that chrome leaves behind when killed by
  219. a timeout or other error
  220. """
  221. from .config import IN_DOCKER
  222. if IN_DOCKER and lexists("/home/archivebox/.config/chromium/SingletonLock"):
  223. remove_file("/home/archivebox/.config/chromium/SingletonLock")
  224. def ansi_to_html(text):
  225. """
  226. Based on: https://stackoverflow.com/questions/19212665/python-converting-ansi-color-codes-to-html
  227. """
  228. from .config import COLOR_DICT
  229. TEMPLATE = '<span style="color: rgb{}"><br>'
  230. text = text.replace('[m', '</span>')
  231. def single_sub(match):
  232. argsdict = match.groupdict()
  233. if argsdict['arg_3'] is None:
  234. if argsdict['arg_2'] is None:
  235. _, color = 0, argsdict['arg_1']
  236. else:
  237. _, color = argsdict['arg_1'], argsdict['arg_2']
  238. else:
  239. _, color = argsdict['arg_3'], argsdict['arg_2']
  240. return TEMPLATE.format(COLOR_DICT[color][0])
  241. return COLOR_REGEX.sub(single_sub, text)
  242. class AttributeDict(dict):
  243. """Helper to allow accessing dict values via Example.key or Example['key']"""
  244. def __init__(self, *args, **kwargs):
  245. super().__init__(*args, **kwargs)
  246. # Recursively convert nested dicts to AttributeDicts (optional):
  247. # for key, val in self.items():
  248. # if isinstance(val, dict) and type(val) is not AttributeDict:
  249. # self[key] = AttributeDict(val)
  250. def __getattr__(self, attr: str) -> Any:
  251. return dict.__getitem__(self, attr)
  252. def __setattr__(self, attr: str, value: Any) -> None:
  253. return dict.__setitem__(self, attr, value)
  254. class ExtendedEncoder(pyjson.JSONEncoder):
  255. """
  256. Extended json serializer that supports serializing several model
  257. fields and objects
  258. """
  259. def default(self, obj):
  260. cls_name = obj.__class__.__name__
  261. if hasattr(obj, '_asdict'):
  262. return obj._asdict()
  263. elif isinstance(obj, bytes):
  264. return obj.decode()
  265. elif isinstance(obj, datetime):
  266. return obj.isoformat()
  267. elif isinstance(obj, Exception):
  268. return '{}: {}'.format(obj.__class__.__name__, obj)
  269. elif isinstance(obj, Path):
  270. return str(obj)
  271. elif cls_name in ('dict_items', 'dict_keys', 'dict_values'):
  272. return tuple(obj)
  273. return pyjson.JSONEncoder.default(self, obj)