util.py 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258
  1. import re
  2. import json as pyjson
  3. from typing import List, Optional, Any
  4. from inspect import signature
  5. from functools import wraps
  6. from hashlib import sha256
  7. from urllib.parse import urlparse, quote, unquote
  8. from html import escape, unescape
  9. from datetime import datetime
  10. from dateutil import parser as dateparser
  11. import requests
  12. from base32_crockford import encode as base32_encode # type: ignore
  13. try:
  14. import chardet
  15. detect_encoding = lambda rawdata: chardet.detect(rawdata)["encoding"]
  16. except ImportError:
  17. detect_encoding = lambda rawdata: "utf-8"
  18. ### Parsing Helpers
  19. # All of these are (str) -> str
  20. # shortcuts to: https://docs.python.org/3/library/urllib.parse.html#url-parsing
  21. scheme = lambda url: urlparse(url).scheme.lower()
  22. without_scheme = lambda url: urlparse(url)._replace(scheme='').geturl().strip('//')
  23. without_query = lambda url: urlparse(url)._replace(query='').geturl().strip('//')
  24. without_fragment = lambda url: urlparse(url)._replace(fragment='').geturl().strip('//')
  25. without_path = lambda url: urlparse(url)._replace(path='', fragment='', query='').geturl().strip('//')
  26. path = lambda url: urlparse(url).path
  27. basename = lambda url: urlparse(url).path.rsplit('/', 1)[-1]
  28. domain = lambda url: urlparse(url).netloc
  29. query = lambda url: urlparse(url).query
  30. fragment = lambda url: urlparse(url).fragment
  31. extension = lambda url: basename(url).rsplit('.', 1)[-1].lower() if '.' in basename(url) else ''
  32. base_url = lambda url: without_scheme(url) # uniq base url used to dedupe links
  33. without_www = lambda url: url.replace('://www.', '://', 1)
  34. without_trailing_slash = lambda url: url[:-1] if url[-1] == '/' else url.replace('/?', '?')
  35. hashurl = lambda url: base32_encode(int(sha256(base_url(url).encode('utf-8')).hexdigest(), 16))[:20]
  36. urlencode = lambda s: s and quote(s, encoding='utf-8', errors='replace')
  37. urldecode = lambda s: s and unquote(s)
  38. htmlencode = lambda s: s and escape(s, quote=True)
  39. htmldecode = lambda s: s and unescape(s)
  40. short_ts = lambda ts: str(parse_date(ts).timestamp()).split('.')[0]
  41. ts_to_date = lambda ts: ts and parse_date(ts).strftime('%Y-%m-%d %H:%M')
  42. ts_to_iso = lambda ts: ts and parse_date(ts).isoformat()
  43. URL_REGEX = re.compile(
  44. r'http[s]?://' # start matching from allowed schemes
  45. r'(?:[a-zA-Z]|[0-9]' # followed by allowed alphanum characters
  46. r'|[$-_@.&+]|[!*\(\),]' # or allowed symbols
  47. r'|(?:%[0-9a-fA-F][0-9a-fA-F]))' # or allowed unicode bytes
  48. r'[^\]\[\(\)<>\""\'\s]+', # stop parsing at these symbols
  49. re.IGNORECASE,
  50. )
  51. COLOR_REGEX = re.compile(r'\[(?P<arg_1>\d+)(;(?P<arg_2>\d+)(;(?P<arg_3>\d+))?)?m')
  52. def is_static_file(url: str):
  53. # TODO: the proper way is with MIME type detection + ext, not only extension
  54. from .config import STATICFILE_EXTENSIONS
  55. return extension(url).lower() in STATICFILE_EXTENSIONS
  56. def enforce_types(func):
  57. """
  58. Enforce function arg and kwarg types at runtime using its python3 type hints
  59. """
  60. # TODO: check return type as well
  61. @wraps(func)
  62. def typechecked_function(*args, **kwargs):
  63. sig = signature(func)
  64. def check_argument_type(arg_key, arg_val):
  65. try:
  66. annotation = sig.parameters[arg_key].annotation
  67. except KeyError:
  68. annotation = None
  69. if annotation is not None and annotation.__class__ is type:
  70. if not isinstance(arg_val, annotation):
  71. raise TypeError(
  72. '{}(..., {}: {}) got unexpected {} argument {}={}'.format(
  73. func.__name__,
  74. arg_key,
  75. annotation.__name__,
  76. type(arg_val).__name__,
  77. arg_key,
  78. str(arg_val)[:64],
  79. )
  80. )
  81. # check args
  82. for arg_val, arg_key in zip(args, sig.parameters):
  83. check_argument_type(arg_key, arg_val)
  84. # check kwargs
  85. for arg_key, arg_val in kwargs.items():
  86. check_argument_type(arg_key, arg_val)
  87. return func(*args, **kwargs)
  88. return typechecked_function
  89. def docstring(text: Optional[str]):
  90. """attach the given docstring to the decorated function"""
  91. def decorator(func):
  92. if text:
  93. func.__doc__ = text
  94. return func
  95. return decorator
  96. @enforce_types
  97. def str_between(string: str, start: str, end: str=None) -> str:
  98. """(<abc>12345</def>, <abc>, </def>) -> 12345"""
  99. content = string.split(start, 1)[-1]
  100. if end is not None:
  101. content = content.rsplit(end, 1)[0]
  102. return content
  103. @enforce_types
  104. def parse_date(date: Any) -> Optional[datetime]:
  105. """Parse unix timestamps, iso format, and human-readable strings"""
  106. if date is None:
  107. return None
  108. if isinstance(date, datetime):
  109. return date
  110. if isinstance(date, (float, int)):
  111. date = str(date)
  112. if isinstance(date, str):
  113. return dateparser.parse(date)
  114. raise ValueError('Tried to parse invalid date! {}'.format(date))
  115. @enforce_types
  116. def download_url(url: str, timeout: int=None) -> str:
  117. """Download the contents of a remote url and return the text"""
  118. from .config import TIMEOUT, CHECK_SSL_VALIDITY, WGET_USER_AGENT
  119. timeout = timeout or TIMEOUT
  120. response = requests.get(
  121. url,
  122. headers={'User-Agent': WGET_USER_AGENT},
  123. verify=CHECK_SSL_VALIDITY,
  124. timeout=timeout,
  125. )
  126. if response.headers.get('Content-Type') == 'application/rss+xml':
  127. # Based on https://github.com/scrapy/w3lib/blob/master/w3lib/encoding.py
  128. _TEMPLATE = r'''%s\s*=\s*["']?\s*%s\s*["']?'''
  129. _XML_ENCODING_RE = _TEMPLATE % ('encoding', r'(?P<xmlcharset>[\w-]+)')
  130. _BODY_ENCODING_PATTERN = r'<\s*(\?xml\s[^>]+%s)' % (_XML_ENCODING_RE)
  131. _BODY_ENCODING_STR_RE = re.compile(_BODY_ENCODING_PATTERN, re.I | re.VERBOSE)
  132. match = _BODY_ENCODING_STR_RE.search(response.text[:1024])
  133. if match:
  134. response.encoding = match.group('xmlcharset')
  135. return response.text
  136. @enforce_types
  137. def chrome_args(**options) -> List[str]:
  138. """helper to build up a chrome shell command with arguments"""
  139. from .config import CHROME_OPTIONS
  140. options = {**CHROME_OPTIONS, **options}
  141. cmd_args = [options['CHROME_BINARY']]
  142. if options['CHROME_HEADLESS']:
  143. cmd_args += ('--headless',)
  144. if not options['CHROME_SANDBOX']:
  145. # dont use GPU or sandbox when running inside docker container
  146. cmd_args += ('--no-sandbox', '--disable-gpu')
  147. if not options['CHECK_SSL_VALIDITY']:
  148. cmd_args += ('--disable-web-security', '--ignore-certificate-errors')
  149. if options['CHROME_USER_AGENT']:
  150. cmd_args += ('--user-agent={}'.format(options['CHROME_USER_AGENT']),)
  151. if options['RESOLUTION']:
  152. cmd_args += ('--window-size={}'.format(options['RESOLUTION']),)
  153. if options['TIMEOUT']:
  154. cmd_args += ('--timeout={}'.format((options['TIMEOUT']) * 1000),)
  155. if options['CHROME_USER_DATA_DIR']:
  156. cmd_args.append('--user-data-dir={}'.format(options['CHROME_USER_DATA_DIR']))
  157. return cmd_args
  158. def ansi_to_html(text):
  159. """
  160. Based on: https://stackoverflow.com/questions/19212665/python-converting-ansi-color-codes-to-html
  161. """
  162. from .config import COLOR_DICT
  163. TEMPLATE = '<span style="color: rgb{}"><br>'
  164. text = text.replace('[m', '</span>')
  165. def single_sub(match):
  166. argsdict = match.groupdict()
  167. if argsdict['arg_3'] is None:
  168. if argsdict['arg_2'] is None:
  169. bold, color = 0, argsdict['arg_1']
  170. else:
  171. bold, color = argsdict['arg_1'], argsdict['arg_2']
  172. else:
  173. bold, color = argsdict['arg_3'], argsdict['arg_2']
  174. return TEMPLATE.format(COLOR_DICT[color][0])
  175. return COLOR_REGEX.sub(single_sub, text)
  176. class ExtendedEncoder(pyjson.JSONEncoder):
  177. """
  178. Extended json serializer that supports serializing several model
  179. fields and objects
  180. """
  181. def default(self, obj):
  182. cls_name = obj.__class__.__name__
  183. if hasattr(obj, '_asdict'):
  184. return obj._asdict()
  185. elif isinstance(obj, bytes):
  186. return obj.decode()
  187. elif isinstance(obj, datetime):
  188. return obj.isoformat()
  189. elif isinstance(obj, Exception):
  190. return '{}: {}'.format(obj.__class__.__name__, obj)
  191. elif cls_name in ('dict_items', 'dict_keys', 'dict_values'):
  192. return tuple(obj)
  193. return pyjson.JSONEncoder.default(self, obj)