util.py 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. import re
  2. import ssl
  3. from typing import List, Optional, Any
  4. from inspect import signature
  5. from functools import wraps
  6. from hashlib import sha256
  7. from urllib.request import Request, urlopen
  8. from urllib.parse import urlparse, quote, unquote
  9. from html import escape, unescape
  10. from datetime import datetime
  11. from dateutil import parser as dateparser
  12. from base32_crockford import encode as base32_encode # type: ignore
  13. import json as pyjson
  14. from .config import (
  15. TIMEOUT,
  16. STATICFILE_EXTENSIONS,
  17. CHECK_SSL_VALIDITY,
  18. WGET_USER_AGENT,
  19. CHROME_OPTIONS,
  20. )
  21. ### Parsing Helpers
  22. # All of these are (str) -> str
  23. # shortcuts to: https://docs.python.org/3/library/urllib.parse.html#url-parsing
  24. scheme = lambda url: urlparse(url).scheme.lower()
  25. without_scheme = lambda url: urlparse(url)._replace(scheme='').geturl().strip('//')
  26. without_query = lambda url: urlparse(url)._replace(query='').geturl().strip('//')
  27. without_fragment = lambda url: urlparse(url)._replace(fragment='').geturl().strip('//')
  28. without_path = lambda url: urlparse(url)._replace(path='', fragment='', query='').geturl().strip('//')
  29. path = lambda url: urlparse(url).path
  30. basename = lambda url: urlparse(url).path.rsplit('/', 1)[-1]
  31. domain = lambda url: urlparse(url).netloc
  32. query = lambda url: urlparse(url).query
  33. fragment = lambda url: urlparse(url).fragment
  34. extension = lambda url: basename(url).rsplit('.', 1)[-1].lower() if '.' in basename(url) else ''
  35. base_url = lambda url: without_scheme(url) # uniq base url used to dedupe links
  36. without_www = lambda url: url.replace('://www.', '://', 1)
  37. without_trailing_slash = lambda url: url[:-1] if url[-1] == '/' else url.replace('/?', '?')
  38. hashurl = lambda url: base32_encode(int(sha256(base_url(url).encode('utf-8')).hexdigest(), 16))[:20]
  39. is_static_file = lambda url: extension(url).lower() in STATICFILE_EXTENSIONS # TODO: the proper way is with MIME type detection, not using extension
  40. urlencode = lambda s: s and quote(s, encoding='utf-8', errors='replace')
  41. urldecode = lambda s: s and unquote(s)
  42. htmlencode = lambda s: s and escape(s, quote=True)
  43. htmldecode = lambda s: s and unescape(s)
  44. short_ts = lambda ts: str(parse_date(ts).timestamp()).split('.')[0]
  45. ts_to_date = lambda ts: ts and parse_date(ts).strftime('%Y-%m-%d %H:%M')
  46. ts_to_iso = lambda ts: ts and parse_date(ts).isoformat()
  47. URL_REGEX = re.compile(
  48. r'http[s]?://' # start matching from allowed schemes
  49. r'(?:[a-zA-Z]|[0-9]' # followed by allowed alphanum characters
  50. r'|[$-_@.&+]|[!*\(\),]' # or allowed symbols
  51. r'|(?:%[0-9a-fA-F][0-9a-fA-F]))' # or allowed unicode bytes
  52. r'[^\]\[\(\)<>\""\'\s]+', # stop parsing at these symbols
  53. re.IGNORECASE,
  54. )
  55. def enforce_types(func):
  56. """
  57. Enforce function arg and kwarg types at runtime using its python3 type hints
  58. """
  59. # TODO: check return type as well
  60. @wraps(func)
  61. def typechecked_function(*args, **kwargs):
  62. sig = signature(func)
  63. def check_argument_type(arg_key, arg_val):
  64. try:
  65. annotation = sig.parameters[arg_key].annotation
  66. except KeyError:
  67. annotation = None
  68. if annotation is not None and annotation.__class__ is type:
  69. if not isinstance(arg_val, annotation):
  70. raise TypeError(
  71. '{}(..., {}: {}) got unexpected {} argument {}={}'.format(
  72. func.__name__,
  73. arg_key,
  74. annotation.__name__,
  75. type(arg_val).__name__,
  76. arg_key,
  77. str(arg_val)[:64],
  78. )
  79. )
  80. # check args
  81. for arg_val, arg_key in zip(args, sig.parameters):
  82. check_argument_type(arg_key, arg_val)
  83. # check kwargs
  84. for arg_key, arg_val in kwargs.items():
  85. check_argument_type(arg_key, arg_val)
  86. return func(*args, **kwargs)
  87. return typechecked_function
  88. def docstring(text: Optional[str]):
  89. """attach the given docstring to the decorated function"""
  90. def decorator(func):
  91. if text:
  92. func.__doc__ = text
  93. return func
  94. return decorator
  95. @enforce_types
  96. def str_between(string: str, start: str, end: str=None) -> str:
  97. """(<abc>12345</def>, <abc>, </def>) -> 12345"""
  98. content = string.split(start, 1)[-1]
  99. if end is not None:
  100. content = content.rsplit(end, 1)[0]
  101. return content
  102. @enforce_types
  103. def parse_date(date: Any) -> Optional[datetime]:
  104. """Parse unix timestamps, iso format, and human-readable strings"""
  105. if date is None:
  106. return None
  107. if isinstance(date, datetime):
  108. return date
  109. if isinstance(date, (float, int)):
  110. date = str(date)
  111. if isinstance(date, str):
  112. return dateparser.parse(date)
  113. raise ValueError('Tried to parse invalid date! {}'.format(date))
  114. @enforce_types
  115. def download_url(url: str, timeout: int=TIMEOUT) -> str:
  116. """Download the contents of a remote url and return the text"""
  117. req = Request(url, headers={'User-Agent': WGET_USER_AGENT})
  118. if CHECK_SSL_VALIDITY:
  119. resp = urlopen(req, timeout=timeout)
  120. else:
  121. insecure = ssl._create_unverified_context()
  122. resp = urlopen(req, timeout=timeout, context=insecure)
  123. encoding = resp.headers.get_content_charset() or 'utf-8' # type: ignore
  124. return resp.read().decode(encoding)
  125. @enforce_types
  126. def chrome_args(**options) -> List[str]:
  127. """helper to build up a chrome shell command with arguments"""
  128. options = {**CHROME_OPTIONS, **options}
  129. cmd_args = [options['CHROME_BINARY']]
  130. if options['CHROME_HEADLESS']:
  131. cmd_args += ('--headless',)
  132. if not options['CHROME_SANDBOX']:
  133. # dont use GPU or sandbox when running inside docker container
  134. cmd_args += ('--no-sandbox', '--disable-gpu')
  135. if not options['CHECK_SSL_VALIDITY']:
  136. cmd_args += ('--disable-web-security', '--ignore-certificate-errors')
  137. if options['CHROME_USER_AGENT']:
  138. cmd_args += ('--user-agent={}'.format(options['CHROME_USER_AGENT']),)
  139. if options['RESOLUTION']:
  140. cmd_args += ('--window-size={}'.format(options['RESOLUTION']),)
  141. if options['TIMEOUT']:
  142. cmd_args += ('--timeout={}'.format((options['TIMEOUT']) * 1000),)
  143. if options['CHROME_USER_DATA_DIR']:
  144. cmd_args.append('--user-data-dir={}'.format(options['CHROME_USER_DATA_DIR']))
  145. return cmd_args
  146. class ExtendedEncoder(pyjson.JSONEncoder):
  147. """
  148. Extended json serializer that supports serializing several model
  149. fields and objects
  150. """
  151. def default(self, obj):
  152. cls_name = obj.__class__.__name__
  153. if hasattr(obj, '_asdict'):
  154. return obj._asdict()
  155. elif isinstance(obj, bytes):
  156. return obj.decode()
  157. elif isinstance(obj, datetime):
  158. return obj.isoformat()
  159. elif isinstance(obj, Exception):
  160. return '{}: {}'.format(obj.__class__.__name__, obj)
  161. elif cls_name in ('dict_items', 'dict_keys', 'dict_values'):
  162. return tuple(obj)
  163. return pyjson.JSONEncoder.default(self, obj)