abid.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. __package__ = 'archivebox.base_models'
  2. from typing import NamedTuple, Any, Union, Dict
  3. import ulid
  4. import uuid6
  5. import hashlib
  6. from urllib.parse import urlparse
  7. from uuid import UUID
  8. from typeid import TypeID # type: ignore[import-untyped]
  9. from datetime import datetime
  10. from archivebox.misc.util import enforce_types
  11. ABID_PREFIX_LEN = 4
  12. ABID_SUFFIX_LEN = 26
  13. ABID_LEN = 30
  14. ABID_TS_LEN = 10
  15. ABID_URI_LEN = 8
  16. ABID_SUBTYPE_LEN = 2
  17. ABID_RAND_LEN = 6
  18. DEFAULT_ABID_PREFIX = 'obj_'
  19. # allows people to keep their uris secret on a per-instance basis by changing the salt.
  20. # the default means everyone can share the same namespace for URI hashes,
  21. # meaning anyone who has a URI and wants to check if you have it can guess the ABID
  22. DEFAULT_ABID_URI_SALT = '687c2fff14e3a7780faa5a40c237b19b5b51b089'
  23. class ABID(NamedTuple):
  24. """
  25. e.g. ABID('obj_01HX9FPYTRE4A5CCD901ZYEBQE')
  26. """
  27. prefix: str # e.g. obj_
  28. ts: str # e.g. 01HX9FPYTR
  29. uri: str # e.g. E4A5CCD9
  30. subtype: str # e.g. 01
  31. rand: str # e.g. ZYEBQE
  32. # salt: str = DEFAULT_ABID_URI_SALT
  33. def __getattr__(self, attr: str) -> Any:
  34. return getattr(self.ulid, attr)
  35. def __eq__(self, other: Any) -> bool:
  36. try:
  37. return self.ulid == other.ulid
  38. except AttributeError:
  39. return NotImplemented
  40. def __str__(self) -> str:
  41. return self.prefix + self.suffix
  42. def __len__(self) -> int:
  43. return len(self.prefix + self.suffix)
  44. @classmethod
  45. def parse(cls, buffer: Union[str, UUID, ulid.ULID, TypeID, 'ABID'], prefix=DEFAULT_ABID_PREFIX) -> 'ABID':
  46. assert buffer, f'Attempted to create ABID from null value {buffer}'
  47. buffer = str(buffer)
  48. if '_' in buffer:
  49. prefix, suffix = buffer.split('_')
  50. else:
  51. prefix, suffix = prefix.strip('_'), buffer
  52. assert len(prefix) == ABID_PREFIX_LEN - 1 # length without trailing _
  53. assert len(suffix) == ABID_SUFFIX_LEN, f'Suffix {suffix} from {buffer} was not {ABID_SUFFIX_LEN} chars long'
  54. return cls(
  55. prefix=abid_part_from_prefix(prefix),
  56. ts=suffix[0:10].upper(),
  57. uri=suffix[10:18].upper(),
  58. subtype=suffix[18:20].upper(),
  59. rand=suffix[20:26].upper(),
  60. )
  61. @property
  62. def uri_salt(self) -> str:
  63. return DEFAULT_ABID_URI_SALT
  64. @property
  65. def suffix(self):
  66. return ''.join((self.ts, self.uri, self.subtype, self.rand))
  67. @property
  68. def ulid(self) -> ulid.ULID:
  69. return ulid.parse(self.suffix)
  70. @property
  71. def uuid(self) -> UUID:
  72. return self.ulid.uuid
  73. @property
  74. def uuid6(self) -> uuid6.UUID:
  75. return uuid6.UUID(hex=self.uuid.hex)
  76. @property
  77. def typeid(self) -> TypeID:
  78. return TypeID.from_uuid(prefix=self.prefix.strip('_'), suffix=self.uuid6)
  79. @property
  80. def datetime(self) -> datetime:
  81. return self.ulid.timestamp().datetime
  82. ####################################################
  83. @enforce_types
  84. def uri_hash(uri: Union[str, bytes], salt: str=DEFAULT_ABID_URI_SALT) -> str:
  85. """
  86. https://example.com -> 'E4A5CCD9AF4ED2A6E0954DF19FD274E9CDDB4853051F033FD518BFC90AA1AC25' (example.com)
  87. """
  88. if isinstance(uri, bytes):
  89. uri_str: str = uri.decode()
  90. else:
  91. uri_str = str(uri)
  92. # only hash the domain part of URLs
  93. if '://' in uri_str:
  94. try:
  95. domain = urlparse(uri_str).netloc
  96. if domain:
  97. uri_str = domain
  98. except AttributeError:
  99. pass
  100. # the uri hash is the sha256 of the domain + salt
  101. uri_bytes = uri_str.encode('utf-8') + salt.encode('utf-8')
  102. return hashlib.sha256(uri_bytes).hexdigest().upper()
  103. @enforce_types
  104. def abid_part_from_prefix(prefix: str) -> str:
  105. """
  106. 'snp_'
  107. """
  108. # if prefix is None:
  109. # return 'obj_'
  110. prefix = prefix.strip('_').lower()
  111. assert len(prefix) == 3
  112. return prefix + '_'
  113. @enforce_types
  114. def abid_part_from_uri(uri: Any, salt: str=DEFAULT_ABID_URI_SALT) -> str:
  115. """
  116. 'E4A5CCD9' # takes first 8 characters of sha256(url)
  117. """
  118. uri = str(uri).strip()
  119. assert uri not in ('None', '')
  120. return uri_hash(uri, salt=salt)[:ABID_URI_LEN]
  121. @enforce_types
  122. def abid_part_from_ts(ts: datetime) -> str:
  123. """
  124. '01HX9FPYTR' # produces 10 character Timestamp section of ulid based on added date
  125. """
  126. return str(ulid.from_timestamp(ts))[:ABID_TS_LEN]
  127. @enforce_types
  128. def ts_from_abid(abid: str) -> datetime:
  129. return ulid.parse(abid.split('_', 1)[-1]).timestamp().datetime
  130. @enforce_types
  131. def abid_part_from_subtype(subtype: str | int) -> str:
  132. """
  133. Snapshots have 01 type, other objects have other subtypes like wget/media/etc.
  134. Also allows us to change the ulid spec later by putting special sigil values here.
  135. """
  136. subtype = str(subtype)
  137. if len(subtype) == ABID_SUBTYPE_LEN:
  138. return subtype
  139. return hashlib.sha256(subtype.encode('utf-8')).hexdigest()[:ABID_SUBTYPE_LEN].upper()
  140. @enforce_types
  141. def abid_part_from_rand(rand: Union[str, UUID, None, int]) -> str:
  142. """
  143. 'ZYEBQE' # takes last 6 characters of randomness from existing legacy uuid db field
  144. """
  145. if rand is None:
  146. # if it's None we generate a new random 6 character hex string
  147. return str(ulid.new())[-ABID_RAND_LEN:]
  148. elif isinstance(rand, UUID):
  149. # if it's a uuid we take the last 6 characters of the ULID represation of it
  150. return str(ulid.from_uuid(rand))[-ABID_RAND_LEN:]
  151. elif isinstance(rand, int):
  152. # if it's a BigAutoInteger field we convert it from an int to a 0-padded string
  153. rand_str = str(rand)[-ABID_RAND_LEN:]
  154. padding_needed = ABID_RAND_LEN - len(rand_str)
  155. rand_str = ('0'*padding_needed) + rand_str
  156. return rand_str
  157. # otherwise treat it as a string, take the last 6 characters of it verbatim
  158. return str(rand)[-ABID_RAND_LEN:].upper()
  159. @enforce_types
  160. def abid_hashes_from_values(prefix: str, ts: datetime, uri: Any, subtype: str | int, rand: Union[str, UUID, None, int], salt: str=DEFAULT_ABID_URI_SALT) -> Dict[str, str]:
  161. return {
  162. 'prefix': abid_part_from_prefix(prefix),
  163. 'ts': abid_part_from_ts(ts),
  164. 'uri': abid_part_from_uri(uri, salt=salt),
  165. 'subtype': abid_part_from_subtype(subtype),
  166. 'rand': abid_part_from_rand(rand),
  167. # 'salt': don't add this, salt combined with uri above to form a single hash
  168. }
  169. @enforce_types
  170. def abid_from_values(prefix: str, ts: datetime, uri: str, subtype: str, rand: Union[str, UUID, None, int], salt: str=DEFAULT_ABID_URI_SALT) -> ABID:
  171. """
  172. Return a freshly derived ABID (assembled from attrs defined in ABIDModel.abid_*_src).
  173. """
  174. abid = ABID(**abid_hashes_from_values(prefix, ts, uri, subtype, rand, salt=salt))
  175. assert abid.ulid and abid.uuid and abid.typeid, f'Failed to calculate {prefix}_ABID for ts={ts} uri={uri} subtyp={subtype} rand={rand}'
  176. return abid