models.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363
  1. __package__ = 'archivebox.workers'
  2. from typing import ClassVar, Type, Iterable
  3. from datetime import datetime, timedelta
  4. from statemachine.mixins import MachineMixin
  5. from django.db import models
  6. from django.core import checks
  7. from django.utils import timezone
  8. from django.utils.functional import classproperty
  9. from statemachine import registry, StateMachine, State
  10. class DefaultStatusChoices(models.TextChoices):
  11. QUEUED = 'queued', 'Queued'
  12. STARTED = 'started', 'Started'
  13. SEALED = 'sealed', 'Sealed'
  14. default_status_field: models.CharField = models.CharField(choices=DefaultStatusChoices.choices, max_length=15, default=DefaultStatusChoices.QUEUED, null=False, blank=False, db_index=True)
  15. default_retry_at_field: models.DateTimeField = models.DateTimeField(default=timezone.now, null=True, blank=True, db_index=True)
  16. ObjectState = State | str
  17. ObjectStateList = Iterable[ObjectState]
  18. class BaseModelWithStateMachine(models.Model, MachineMixin):
  19. id: models.UUIDField
  20. StatusChoices: ClassVar[Type[models.TextChoices]]
  21. # status: models.CharField
  22. # retry_at: models.DateTimeField
  23. state_machine_name: ClassVar[str]
  24. state_field_name: ClassVar[str]
  25. state_machine_attr: ClassVar[str] = 'sm'
  26. bind_events_as_methods: ClassVar[bool] = True
  27. active_state: ClassVar[ObjectState]
  28. retry_at_field_name: ClassVar[str]
  29. class Meta:
  30. app_label = 'workers'
  31. abstract = True
  32. @classmethod
  33. def check(cls, sender=None, **kwargs):
  34. import sys
  35. # Skip state machine checks during makemigrations to avoid premature registry access
  36. if 'makemigrations' in sys.argv:
  37. return super().check(**kwargs)
  38. errors = super().check(**kwargs)
  39. found_id_field = False
  40. found_status_field = False
  41. found_retry_at_field = False
  42. for field in cls._meta.get_fields():
  43. if getattr(field, '_is_state_field', False):
  44. if cls.state_field_name == field.name:
  45. found_status_field = True
  46. if getattr(field, 'choices', None) != cls.StatusChoices.choices:
  47. errors.append(checks.Error(
  48. f'{cls.__name__}.{field.name} must have choices set to {cls.__name__}.StatusChoices.choices',
  49. hint=f'{cls.__name__}.{field.name}.choices = {getattr(field, "choices", None)!r}',
  50. obj=cls,
  51. id='workers.E011',
  52. ))
  53. if getattr(field, '_is_retry_at_field', False):
  54. if cls.retry_at_field_name == field.name:
  55. found_retry_at_field = True
  56. if field.name == 'id' and getattr(field, 'primary_key', False):
  57. found_id_field = True
  58. if not found_status_field:
  59. errors.append(checks.Error(
  60. f'{cls.__name__}.state_field_name must be defined and point to a StatusField()',
  61. hint=f'{cls.__name__}.state_field_name = {cls.state_field_name!r} but {cls.__name__}.{cls.state_field_name!r} was not found or does not refer to StatusField',
  62. obj=cls,
  63. id='workers.E012',
  64. ))
  65. if not found_retry_at_field:
  66. errors.append(checks.Error(
  67. f'{cls.__name__}.retry_at_field_name must be defined and point to a RetryAtField()',
  68. hint=f'{cls.__name__}.retry_at_field_name = {cls.retry_at_field_name!r} but {cls.__name__}.{cls.retry_at_field_name!r} was not found or does not refer to RetryAtField',
  69. obj=cls,
  70. id='workers.E013',
  71. ))
  72. if not found_id_field:
  73. errors.append(checks.Error(
  74. f'{cls.__name__} must have an id field that is a primary key',
  75. hint=f'{cls.__name__}.id = {cls.id!r}',
  76. obj=cls,
  77. id='workers.E014',
  78. ))
  79. if not isinstance(cls.state_machine_name, str):
  80. errors.append(checks.Error(
  81. f'{cls.__name__}.state_machine_name must be a dotted-import path to a StateMachine class',
  82. hint=f'{cls.__name__}.state_machine_name = {cls.state_machine_name!r}',
  83. obj=cls,
  84. id='workers.E015',
  85. ))
  86. try:
  87. cls.StateMachineClass
  88. except Exception as err:
  89. errors.append(checks.Error(
  90. f'{cls.__name__}.state_machine_name must point to a valid StateMachine class, but got {type(err).__name__} {err} when trying to access {cls.__name__}.StateMachineClass',
  91. hint=f'{cls.__name__}.state_machine_name = {cls.state_machine_name!r}',
  92. obj=cls,
  93. id='workers.E016',
  94. ))
  95. if cls.INITIAL_STATE not in cls.StatusChoices.values:
  96. errors.append(checks.Error(
  97. f'{cls.__name__}.StateMachineClass.initial_state must be present within {cls.__name__}.StatusChoices',
  98. hint=f'{cls.__name__}.StateMachineClass.initial_state = {cls.StateMachineClass.initial_state!r}',
  99. obj=cls,
  100. id='workers.E017',
  101. ))
  102. if cls.ACTIVE_STATE not in cls.StatusChoices.values:
  103. errors.append(checks.Error(
  104. f'{cls.__name__}.active_state must be set to a valid State present within {cls.__name__}.StatusChoices',
  105. hint=f'{cls.__name__}.active_state = {cls.active_state!r}',
  106. obj=cls,
  107. id='workers.E018',
  108. ))
  109. for state in cls.FINAL_STATES:
  110. if state not in cls.StatusChoices.values:
  111. errors.append(checks.Error(
  112. f'{cls.__name__}.StateMachineClass.final_states must all be present within {cls.__name__}.StatusChoices',
  113. hint=f'{cls.__name__}.StateMachineClass.final_states = {cls.StateMachineClass.final_states!r}',
  114. obj=cls,
  115. id='workers.E019',
  116. ))
  117. break
  118. return errors
  119. @staticmethod
  120. def _state_to_str(state: ObjectState) -> str:
  121. """Convert a statemachine.State, models.TextChoices.choices value, or Enum value to a str"""
  122. return str(state.value) if isinstance(state, State) else str(state)
  123. @property
  124. def RETRY_AT(self) -> datetime:
  125. return getattr(self, self.retry_at_field_name)
  126. @RETRY_AT.setter
  127. def RETRY_AT(self, value: datetime):
  128. setattr(self, self.retry_at_field_name, value)
  129. @property
  130. def STATE(self) -> str:
  131. return getattr(self, self.state_field_name)
  132. @STATE.setter
  133. def STATE(self, value: str):
  134. setattr(self, self.state_field_name, value)
  135. def bump_retry_at(self, seconds: int = 10):
  136. self.RETRY_AT = timezone.now() + timedelta(seconds=seconds)
  137. def update_and_requeue(self, **kwargs) -> bool:
  138. """
  139. Atomically update fields and schedule retry_at for next worker tick.
  140. Returns True if the update was successful, False if the object was modified by another worker.
  141. """
  142. # Get the current retry_at to use as optimistic lock
  143. current_retry_at = self.RETRY_AT
  144. # Apply the updates
  145. for key, value in kwargs.items():
  146. setattr(self, key, value)
  147. # Try to save with optimistic locking
  148. updated = type(self).objects.filter(
  149. pk=self.pk,
  150. retry_at=current_retry_at,
  151. ).update(**{k: getattr(self, k) for k in kwargs})
  152. if updated == 1:
  153. self.refresh_from_db()
  154. return True
  155. return False
  156. @classmethod
  157. def get_queue(cls):
  158. """
  159. Get the sorted and filtered QuerySet of objects that are ready for processing.
  160. Objects are ready if:
  161. - status is not in FINAL_STATES
  162. - retry_at is in the past (or now)
  163. """
  164. return cls.objects.filter(
  165. retry_at__lte=timezone.now()
  166. ).exclude(
  167. status__in=cls.FINAL_STATES
  168. ).order_by('retry_at')
  169. @classmethod
  170. def claim_for_worker(cls, obj: 'BaseModelWithStateMachine', lock_seconds: int = 60) -> bool:
  171. """
  172. Atomically claim an object for processing using optimistic locking.
  173. Returns True if successfully claimed, False if another worker got it first.
  174. """
  175. updated = cls.objects.filter(
  176. pk=obj.pk,
  177. retry_at=obj.retry_at,
  178. ).update(
  179. retry_at=timezone.now() + timedelta(seconds=lock_seconds)
  180. )
  181. return updated == 1
  182. @classproperty
  183. def ACTIVE_STATE(cls) -> str:
  184. return cls._state_to_str(cls.active_state)
  185. @classproperty
  186. def INITIAL_STATE(cls) -> str:
  187. return cls._state_to_str(cls.StateMachineClass.initial_state)
  188. @classproperty
  189. def FINAL_STATES(cls) -> list[str]:
  190. return [cls._state_to_str(state) for state in cls.StateMachineClass.final_states]
  191. @classproperty
  192. def FINAL_OR_ACTIVE_STATES(cls) -> list[str]:
  193. return [*cls.FINAL_STATES, cls.ACTIVE_STATE]
  194. @classmethod
  195. def extend_choices(cls, base_choices: Type[models.TextChoices]):
  196. """
  197. Decorator to extend the base choices with extra choices, e.g.:
  198. class MyModel(ModelWithStateMachine):
  199. @ModelWithStateMachine.extend_choices(ModelWithStateMachine.StatusChoices)
  200. class StatusChoices(models.TextChoices):
  201. SUCCEEDED = 'succeeded'
  202. FAILED = 'failed'
  203. SKIPPED = 'skipped'
  204. """
  205. assert issubclass(base_choices, models.TextChoices), f'@extend_choices(base_choices) must be a TextChoices class, not {base_choices.__name__}'
  206. def wrapper(extra_choices: Type[models.TextChoices]) -> Type[models.TextChoices]:
  207. joined = {}
  208. for item in base_choices.choices:
  209. joined[item[0]] = item[1]
  210. for item in extra_choices.choices:
  211. joined[item[0]] = item[1]
  212. return models.TextChoices('StatusChoices', joined)
  213. return wrapper
  214. @classmethod
  215. def StatusField(cls, **kwargs) -> models.CharField:
  216. """
  217. Used on subclasses to extend/modify the status field with updated kwargs. e.g.:
  218. class MyModel(ModelWithStateMachine):
  219. class StatusChoices(ModelWithStateMachine.StatusChoices):
  220. QUEUED = 'queued', 'Queued'
  221. STARTED = 'started', 'Started'
  222. SEALED = 'sealed', 'Sealed'
  223. BACKOFF = 'backoff', 'Backoff'
  224. FAILED = 'failed', 'Failed'
  225. SKIPPED = 'skipped', 'Skipped'
  226. status = ModelWithStateMachine.StatusField(choices=StatusChoices.choices, default=StatusChoices.QUEUED)
  227. """
  228. default_kwargs = default_status_field.deconstruct()[3]
  229. updated_kwargs = {**default_kwargs, **kwargs}
  230. field = models.CharField(**updated_kwargs)
  231. field._is_state_field = True # type: ignore
  232. return field
  233. @classmethod
  234. def RetryAtField(cls, **kwargs) -> models.DateTimeField:
  235. """
  236. Used on subclasses to extend/modify the retry_at field with updated kwargs. e.g.:
  237. class MyModel(ModelWithStateMachine):
  238. retry_at = ModelWithStateMachine.RetryAtField(editable=False)
  239. """
  240. default_kwargs = default_retry_at_field.deconstruct()[3]
  241. updated_kwargs = {**default_kwargs, **kwargs}
  242. field = models.DateTimeField(**updated_kwargs)
  243. field._is_retry_at_field = True # type: ignore
  244. return field
  245. @classproperty
  246. def StateMachineClass(cls) -> Type[StateMachine]:
  247. """Get the StateMachine class for the given django Model that inherits from MachineMixin"""
  248. model_state_machine_name = getattr(cls, 'state_machine_name', None)
  249. if model_state_machine_name:
  250. StateMachineCls = registry.get_machine_cls(model_state_machine_name)
  251. assert issubclass(StateMachineCls, StateMachine)
  252. return StateMachineCls
  253. raise NotImplementedError(f'ActorType[{cls.__name__}] must define .state_machine_name: str that points to a valid StateMachine')
  254. class ModelWithStateMachine(BaseModelWithStateMachine):
  255. StatusChoices: ClassVar[Type[DefaultStatusChoices]] = DefaultStatusChoices
  256. status: models.CharField = BaseModelWithStateMachine.StatusField()
  257. retry_at: models.DateTimeField = BaseModelWithStateMachine.RetryAtField()
  258. state_machine_name: ClassVar[str] # e.g. 'core.models.ArchiveResultMachine'
  259. state_field_name: ClassVar[str] = 'status'
  260. state_machine_attr: ClassVar[str] = 'sm'
  261. bind_events_as_methods: ClassVar[bool] = True
  262. active_state: ClassVar[str] = StatusChoices.STARTED
  263. retry_at_field_name: ClassVar[str] = 'retry_at'
  264. class Meta:
  265. app_label = 'workers'
  266. abstract = True
  267. class BaseStateMachine(StateMachine):
  268. """
  269. Base class for all ArchiveBox state machines.
  270. Eliminates boilerplate __init__, __repr__, __str__ methods that were
  271. duplicated across all 4 state machines (Snapshot, ArchiveResult, Crawl, Binary).
  272. Subclasses must set model_attr_name to specify the attribute name
  273. (e.g., 'snapshot', 'archiveresult', 'crawl', 'binary').
  274. Example usage:
  275. class SnapshotMachine(BaseStateMachine, strict_states=True):
  276. model_attr_name = 'snapshot'
  277. # States and transitions...
  278. queued = State(value=Snapshot.StatusChoices.QUEUED, initial=True)
  279. # ...
  280. The model instance is accessible via self.{model_attr_name}
  281. (e.g., self.snapshot, self.archiveresult, etc.)
  282. """
  283. model_attr_name: str = 'obj' # Override in subclasses
  284. def __init__(self, obj, *args, **kwargs):
  285. setattr(self, self.model_attr_name, obj)
  286. super().__init__(obj, *args, **kwargs)
  287. def __repr__(self) -> str:
  288. obj = getattr(self, self.model_attr_name)
  289. return f'{self.__class__.__name__}[{obj.id}]'
  290. def __str__(self) -> str:
  291. return self.__repr__()