| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344 |
- """
- This file provides the Django ABIDField and ABIDModel base model to inherit from.
- It implements the ArchiveBox ID (ABID) interfaces including abid_values, generate_abid, .abid, .uuid, .id.
- """
- from typing import Any, Dict, Union, List, Set, NamedTuple, cast
- from ulid import ULID
- from uuid import uuid4, UUID
- from typeid import TypeID # type: ignore[import-untyped]
- from datetime import datetime
- from functools import partial
- from charidfield import CharIDField # type: ignore[import-untyped]
- from django.conf import settings
- from django.db import models
- from django.utils import timezone
- from django.db.utils import OperationalError
- from django.contrib.auth import get_user_model
- from django_stubs_ext.db.models import TypedModelMeta
- from .abid import (
- ABID,
- ABID_LEN,
- ABID_RAND_LEN,
- ABID_SUFFIX_LEN,
- DEFAULT_ABID_PREFIX,
- DEFAULT_ABID_URI_SALT,
- abid_part_from_prefix,
- abid_from_values
- )
- ####################################################
- # Database Field for typeid/ulid style IDs with a prefix, e.g. snp_01BJQMF54D093DXEAWZ6JYRPAQ
- ABIDField = partial(
- CharIDField,
- max_length=ABID_LEN,
- help_text="ABID-format identifier for this entity (e.g. snp_01BJQMF54D093DXEAWZ6JYRPAQ)",
- default=None,
- null=True,
- blank=True,
- db_index=True,
- unique=True,
- )
- def get_or_create_system_user_pk(username='system'):
- """Get or create a system user with is_superuser=True to be the default owner for new DB rows"""
- User = get_user_model()
- # if only one user exists total, return that user
- if User.objects.filter(is_superuser=True).count() == 1:
- return User.objects.filter(is_superuser=True).values_list('pk', flat=True)[0]
- # otherwise, create a dedicated "system" user
- user, created = User.objects.get_or_create(username=username, is_staff=True, is_superuser=True, defaults={'email': '', 'password': ''})
- return user.pk
- class ABIDModel(models.Model):
- """
- Abstract Base Model for other models to depend on. Provides ArchiveBox ID (ABID) interface.
- """
- abid_prefix: str = DEFAULT_ABID_PREFIX # e.g. 'tag_'
- abid_ts_src = 'None' # e.g. 'self.created'
- abid_uri_src = 'None' # e.g. 'self.uri'
- abid_subtype_src = 'None' # e.g. 'self.extractor'
- abid_rand_src = 'None' # e.g. 'self.uuid' or 'self.id'
- # id = models.UUIDField(primary_key=True, default=uuid4, editable=True)
- # uuid = models.UUIDField(blank=True, null=True, editable=True, unique=True)
- abid = ABIDField(prefix=abid_prefix)
- created_by = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE, default=get_or_create_system_user_pk)
- created = models.DateTimeField(auto_now_add=True)
- modified = models.DateTimeField(auto_now=True)
- class Meta(TypedModelMeta):
- abstract = True
- def save(self, *args: Any, **kwargs: Any) -> None:
- # when first creating a row, self.ABID is the source of truth
- # overwrite default prefilled self.id & self.abid with generated self.ABID value
- if self._state.adding or not self.id:
- self.id = self.ABID.uuid
- if self._state.adding or not self.abid:
- self.abid = str(self.ABID)
- super().save(*args, **kwargs)
- assert str(self.id) == str(self.ABID.uuid), f'self.id {self.id} does not match self.ABID {self.ABID.uuid}'
- assert str(self.abid) == str(self.ABID), f'self.abid {self.id} does not match self.ABID {self.ABID.uuid}'
- @property
- def abid_values(self) -> Dict[str, Any]:
- return {
- 'prefix': self.abid_prefix,
- 'ts': eval(self.abid_ts_src),
- 'uri': eval(self.abid_uri_src),
- 'subtype': eval(self.abid_subtype_src),
- 'rand': eval(self.abid_rand_src),
- }
- def generate_abid(self) -> ABID:
- """
- Return a freshly derived ABID (assembled from attrs defined in ABIDModel.abid_*_src).
- """
- prefix, ts, uri, subtype, rand = self.abid_values.values()
- if (not prefix) or prefix == DEFAULT_ABID_PREFIX:
- suggested_abid = self.__class__.__name__[:3].lower()
- raise Exception(f'{self.__class__.__name__}.abid_prefix must be defined to calculate ABIDs (suggested: {suggested_abid})')
- if not ts:
- # default to unix epoch with 00:00:00 UTC
- ts = datetime.fromtimestamp(0, timezone.utc) # equivalent to: ts = datetime.utcfromtimestamp(0)
- print(f'[!] WARNING: Generating ABID with ts=0000000000 placeholder because {self.__class__.__name__}.abid_ts_src={self.abid_ts_src} is unset!', ts.isoformat())
- if not uri:
- uri = str(self)
- print(f'[!] WARNING: Generating ABID with uri=str(self) placeholder because {self.__class__.__name__}.abid_uri_src={self.abid_uri_src} is unset!', uri)
- if not subtype:
- subtype = self.__class__.__name__
- print(f'[!] WARNING: Generating ABID with subtype={subtype} placeholder because {self.__class__.__name__}.abid_subtype_src={self.abid_subtype_src} is unset!', subtype)
- if not rand:
- rand = getattr(self, 'uuid', None) or getattr(self, 'id', None) or getattr(self, 'pk')
- print(f'[!] WARNING: Generating ABID with rand=self.id placeholder because {self.__class__.__name__}.abid_rand_src={self.abid_rand_src} is unset!', rand)
- abid = abid_from_values(
- prefix=prefix,
- ts=ts,
- uri=uri,
- subtype=subtype,
- rand=rand,
- salt=DEFAULT_ABID_URI_SALT,
- )
- assert abid.ulid and abid.uuid and abid.typeid, f'Failed to calculate {prefix}_ABID for {self.__class__.__name__}'
- return abid
- @property
- def ABID(self) -> ABID:
- """
- ULIDParts(timestamp='01HX9FPYTR', url='E4A5CCD9', subtype='00', randomness='ZYEBQE')
- """
-
- # if object is not yet saved to DB, always generate fresh ABID from values
- if self._state.adding:
- return self.generate_abid()
-
- # otherwise DB is single source of truth, load ABID from existing db pk
- abid: ABID | None = None
- try:
- abid = abid or ABID.parse(self.pk)
- except Exception:
- pass
- try:
- abid = abid or ABID.parse(self.id)
- except Exception:
- pass
- try:
- abid = abid or ABID.parse(cast(str, self.abid))
- except Exception:
- pass
- abid = abid or self.generate_abid()
- return abid
- @property
- def ULID(self) -> ULID:
- """
- Get a ulid.ULID representation of the object's ABID.
- """
- return self.ABID.ulid
- @property
- def UUID(self) -> UUID:
- """
- Get a uuid.UUID (v4) representation of the object's ABID.
- """
- return self.ABID.uuid
- @property
- def TypeID(self) -> TypeID:
- """
- Get a typeid.TypeID (stripe-style) representation of the object's ABID.
- """
- return self.ABID.typeid
- ####################################################
- # Django helpers
- def find_all_abid_prefixes() -> Dict[str, type[models.Model]]:
- """
- Return the mapping of all ABID prefixes to their models.
- e.g. {'tag_': core.models.Tag, 'snp_': core.models.Snapshot, ...}
- """
- import django.apps
- prefix_map = {}
- for model in django.apps.apps.get_models():
- abid_prefix = getattr(model, 'abid_prefix', None)
- if abid_prefix:
- prefix_map[abid_prefix] = model
- return prefix_map
- def find_prefix_for_abid(abid: ABID) -> str:
- """
- Find the correct prefix for a given ABID that may have be missing a prefix (slow).
- e.g. ABID('obj_01BJQMF54D093DXEAWZ6JYRPAQ') -> 'snp_'
- """
- # if existing abid prefix is correct, lookup is easy
- model = find_model_from_abid(abid)
- if model:
- assert issubclass(model, ABIDModel)
- return model.abid_prefix
- # prefix might be obj_ or missing, fuzzy-search to find any object that matches
- return find_obj_from_abid_rand(abid)[0].abid_prefix
- def find_model_from_abid_prefix(prefix: str) -> type[ABIDModel] | None:
- """
- Return the Django Model that corresponds to a given ABID prefix.
- e.g. 'tag_' -> core.models.Tag
- """
- prefix = abid_part_from_prefix(prefix)
- import django.apps
- for model in django.apps.apps.get_models():
- if not issubclass(model, ABIDModel): continue # skip non-ABID-enabled models
- if not hasattr(model, 'objects'): continue # skip abstract models
- if (model.abid_prefix == prefix):
- return model
- return None
- def find_model_from_abid(abid: ABID) -> type[models.Model] | None:
- """
- Shortcut for find_model_from_abid_prefix(abid.prefix)
- """
- return find_model_from_abid_prefix(abid.prefix)
- def find_obj_from_abid_rand(rand: Union[ABID, str], model=None) -> List[ABIDModel]:
- """
- Find an object corresponding to an ABID by exhaustively searching using its random suffix (slow).
- e.g. 'obj_....................JYRPAQ' -> Snapshot('snp_01BJQMF54D093DXEAWZ6JYRPAQ')
- """
- # convert str to ABID if necessary
- if isinstance(rand, ABID):
- abid: ABID = rand
- else:
- rand = str(rand)
- if len(rand) < ABID_SUFFIX_LEN:
- padding_needed = ABID_SUFFIX_LEN - len(rand)
- rand = ('0'*padding_needed) + rand
- abid = ABID.parse(rand)
- import django.apps
- partial_matches: List[ABIDModel] = []
- models_to_try = cast(Set[type[models.Model]], set(filter(bool, (
- model,
- find_model_from_abid(abid),
- *django.apps.apps.get_models(),
- ))))
- # print(abid, abid.rand, abid.uuid, models_to_try)
- for model in models_to_try:
- if not issubclass(model, ABIDModel): continue # skip Models that arent ABID-enabled
- if not hasattr(model, 'objects'): continue # skip abstract Models
- assert hasattr(model, 'objects') # force-fix for type hint nit about missing manager https://github.com/typeddjango/django-stubs/issues/1684
- # continue on to try fuzzy searching by randomness portion derived from uuid field
- try:
- qs = []
- if hasattr(model, 'abid'):
- qs = model.objects.filter(abid__endswith=abid.rand)
- elif hasattr(model, 'uuid'):
- qs = model.objects.filter(uuid__endswith=str(abid.uuid)[-ABID_RAND_LEN:])
- elif hasattr(model, 'id'):
- # NOTE: this only works on SQLite where every column is a string
- # other DB backends like postgres dont let you do __endswith if this is a BigAutoInteger field
-
- # try to search for uuid=...-2354352
- # try to search for id=...2354352
- # try to search for id=2354352
- qs = model.objects.filter(
- models.Q(id__endswith=str(abid.uuid)[-ABID_RAND_LEN:])
- | models.Q(id__endswith=abid.rand)
- | models.Q(id__startswith=str(int(abid.rand)) if abid.rand.isdigit() else abid.rand)
- )
- for obj in qs:
- if obj.generate_abid() == abid:
- # found exact match, no need to keep iterating
- return [obj]
- partial_matches.append(obj)
- except OperationalError as err:
- print(f'[!] WARNING: Got error while trying to iterate through QuerySet for {model}:', err, '\n')
- return partial_matches
- def find_obj_from_abid(abid: ABID, model=None, fuzzy=False) -> Any:
- """
- Find an object with a given ABID by filtering possible models for a matching abid/uuid/id (fast).
- e.g. 'snp_01BJQMF54D093DXEAWZ6JYRPAQ' -> Snapshot('snp_01BJQMF54D093DXEAWZ6JYRPAQ')
- """
- model = model or find_model_from_abid(abid)
- assert model, f'Could not find model that could match this ABID type: {abid}'
- try:
- if hasattr(model, 'abid'):
- return model.objects.get(abid__endswith=abid.suffix)
- if hasattr(model, 'uuid'):
- return model.objects.get(uuid=abid.uuid)
- return model.objects.get(id=abid.uuid)
- except model.DoesNotExist:
- # if the model has an abid field then it shouldve matched, pointless to fuzzy search in that case
- if hasattr(model, 'abid') or (not fuzzy):
- raise
- # continue on to try fuzzy searching by randomness portion derived from uuid field
- match_by_rand = find_obj_from_abid_rand(abid, model=model)
- if match_by_rand:
- if match_by_rand[0].abid_prefix != abid.prefix:
- print(f'[!] WARNING: fetched object {match_by_rand} even though prefix {abid.prefix} doesnt match!', abid, '\n')
- return match_by_rand
- raise model.DoesNotExist
|