statemachines.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278
  1. __package__ = 'archivebox.core'
  2. import time
  3. import os
  4. from datetime import timedelta
  5. from typing import ClassVar
  6. from django.utils import timezone
  7. from rich import print
  8. from statemachine import State, StateMachine
  9. # from workers.actor import ActorType
  10. from core.models import Snapshot, ArchiveResult
  11. class SnapshotMachine(StateMachine, strict_states=True):
  12. """
  13. State machine for managing Snapshot lifecycle.
  14. https://github.com/ArchiveBox/ArchiveBox/wiki/ArchiveBox-Architecture-Diagrams
  15. """
  16. model: Snapshot
  17. # States
  18. queued = State(value=Snapshot.StatusChoices.QUEUED, initial=True)
  19. started = State(value=Snapshot.StatusChoices.STARTED)
  20. sealed = State(value=Snapshot.StatusChoices.SEALED, final=True)
  21. # Tick Event
  22. tick = (
  23. queued.to.itself(unless='can_start') |
  24. queued.to(started, cond='can_start') |
  25. started.to.itself(unless='is_finished') |
  26. started.to(sealed, cond='is_finished')
  27. )
  28. def __init__(self, snapshot, *args, **kwargs):
  29. self.snapshot = snapshot
  30. super().__init__(snapshot, *args, **kwargs)
  31. def __repr__(self) -> str:
  32. return f'[grey53]Snapshot\\[{self.snapshot.ABID}] 🏃‍♂️ Worker\\[pid={os.getpid()}].tick()[/grey53] [blue]{self.snapshot.status.upper()}[/blue] ⚙️ [grey37]Machine[/grey37]'
  33. def __str__(self) -> str:
  34. return self.__repr__()
  35. def can_start(self) -> bool:
  36. can_start = bool(self.snapshot.url)
  37. if not can_start:
  38. print(f'{self}.can_start() [blue]QUEUED[/blue] ➡️❌ [blue]STARTED[/blue] cant start yet +{timezone.now() - self.snapshot.retry_at}s')
  39. return can_start
  40. def is_finished(self) -> bool:
  41. # if no archiveresults exist yet, it's not finished
  42. if not self.snapshot.archiveresult_set.exists():
  43. return False
  44. # if archiveresults exist but are still pending, it's not finished
  45. if self.snapshot.pending_archiveresults().exists():
  46. return False
  47. # otherwise archiveresults exist and are all finished, so it's finished
  48. return True
  49. # def on_transition(self, event, state):
  50. # print(f'{self}.on_transition() [blue]{str(state).upper()}[/blue] ➡️ ...')
  51. @queued.enter
  52. def enter_queued(self):
  53. print(f'{self}.on_queued() ↳ snapshot.retry_at = now()')
  54. self.snapshot.update_for_workers(
  55. retry_at=timezone.now(),
  56. status=Snapshot.StatusChoices.QUEUED,
  57. )
  58. @started.enter
  59. def enter_started(self):
  60. print(f'{self}.on_started() ↳ snapshot.create_pending_archiveresults() + snapshot.bump_retry_at(+60s)')
  61. # lock the snapshot while we create the pending archiveresults
  62. self.snapshot.update_for_workers(
  63. retry_at=timezone.now() + timedelta(seconds=30), # if failed, wait 30s before retrying
  64. )
  65. # create the pending archiveresults
  66. self.snapshot.create_pending_archiveresults()
  67. # unlock the snapshot after we're done creating the pending archiveresults + set status = started
  68. self.snapshot.update_for_workers(
  69. retry_at=timezone.now() + timedelta(seconds=5), # wait 5s before checking it again
  70. status=Snapshot.StatusChoices.STARTED,
  71. )
  72. # run_subcommand([
  73. # 'archivebox', 'snapshot', self.snapshot.ABID,
  74. # '--start',
  75. # ])
  76. @sealed.enter
  77. def enter_sealed(self):
  78. print(f'{self}.on_sealed() ↳ snapshot.retry_at=None')
  79. self.snapshot.update_for_workers(
  80. retry_at=None,
  81. status=Snapshot.StatusChoices.SEALED,
  82. )
  83. # class SnapshotWorker(ActorType[Snapshot]):
  84. # """
  85. # The primary actor for progressing Snapshot objects
  86. # through their lifecycle using the SnapshotMachine.
  87. # """
  88. # Model = Snapshot
  89. # StateMachineClass = SnapshotMachine
  90. # ACTIVE_STATE: ClassVar[State] = SnapshotMachine.started # 'started'
  91. # MAX_CONCURRENT_ACTORS: ClassVar[int] = 3
  92. # MAX_TICK_TIME: ClassVar[int] = 10
  93. # CLAIM_FROM_TOP_N: ClassVar[int] = MAX_CONCURRENT_ACTORS * 10
  94. class ArchiveResultMachine(StateMachine, strict_states=True):
  95. """
  96. State machine for managing ArchiveResult lifecycle.
  97. https://github.com/ArchiveBox/ArchiveBox/wiki/ArchiveBox-Architecture-Diagrams
  98. """
  99. model: ArchiveResult
  100. # States
  101. queued = State(value=ArchiveResult.StatusChoices.QUEUED, initial=True)
  102. started = State(value=ArchiveResult.StatusChoices.STARTED)
  103. backoff = State(value=ArchiveResult.StatusChoices.BACKOFF)
  104. succeeded = State(value=ArchiveResult.StatusChoices.SUCCEEDED, final=True)
  105. failed = State(value=ArchiveResult.StatusChoices.FAILED, final=True)
  106. # Tick Event
  107. tick = (
  108. queued.to.itself(unless='can_start') |
  109. queued.to(started, cond='can_start') |
  110. started.to.itself(unless='is_finished') |
  111. started.to(succeeded, cond='is_succeeded') |
  112. started.to(failed, cond='is_failed') |
  113. started.to(backoff, cond='is_backoff') |
  114. backoff.to.itself(unless='can_start') |
  115. backoff.to(started, cond='can_start') |
  116. backoff.to(succeeded, cond='is_succeeded') |
  117. backoff.to(failed, cond='is_failed')
  118. )
  119. def __init__(self, archiveresult, *args, **kwargs):
  120. self.archiveresult = archiveresult
  121. super().__init__(archiveresult, *args, **kwargs)
  122. def __repr__(self) -> str:
  123. return f'[grey53]ArchiveResult\\[{self.archiveresult.ABID}] 🏃‍♂️ Worker\\[pid={os.getpid()}].tick()[/grey53] [blue]{self.archiveresult.status.upper()}[/blue] ⚙️ [grey37]Machine[/grey37]'
  124. def __str__(self) -> str:
  125. return self.__repr__()
  126. def can_start(self) -> bool:
  127. can_start = bool(self.archiveresult.snapshot.url)
  128. if not can_start:
  129. print(f'{self}.can_start() [blue]QUEUED[/blue] ➡️❌ [blue]STARTED[/blue]: cant start yet +{timezone.now() - self.archiveresult.retry_at}s')
  130. return can_start
  131. def is_succeeded(self) -> bool:
  132. if self.archiveresult.output and 'err' not in self.archiveresult.output.lower():
  133. return True
  134. return False
  135. def is_failed(self) -> bool:
  136. if self.archiveresult.output and 'err' in self.archiveresult.output.lower():
  137. return True
  138. return False
  139. def is_backoff(self) -> bool:
  140. if self.archiveresult.output is None:
  141. return True
  142. return False
  143. def is_finished(self) -> bool:
  144. return self.is_failed() or self.is_succeeded()
  145. @queued.enter
  146. def enter_queued(self):
  147. print(f'{self}.on_queued() ↳ archiveresult.retry_at = now()')
  148. self.archiveresult.update_for_workers(
  149. retry_at=timezone.now(),
  150. status=ArchiveResult.StatusChoices.QUEUED,
  151. start_ts=None,
  152. ) # bump the snapshot's retry_at so they pickup any new changes
  153. @started.enter
  154. def enter_started(self):
  155. print(f'{self}.on_started() ↳ archiveresult.start_ts + create_output_dir() + bump_retry_at(+60s)')
  156. # lock the object for the next 30sec
  157. self.archiveresult.update_for_workers(
  158. retry_at=timezone.now() + timedelta(seconds=30),
  159. status=ArchiveResult.StatusChoices.QUEUED,
  160. start_ts=timezone.now(),
  161. ) # lock the obj for the next ~30s to limit racing with other workers
  162. # run_subcommand([
  163. # 'archivebox', 'extract', self.archiveresult.ABID,
  164. # ])
  165. # create the output directory and fork the new extractor job subprocess
  166. self.archiveresult.create_output_dir()
  167. # self.archiveresult.extract(background=True)
  168. # mark the object as started
  169. self.archiveresult.update_for_workers(
  170. retry_at=timezone.now() + timedelta(seconds=30), # retry it again in 30s if it fails
  171. status=ArchiveResult.StatusChoices.STARTED,
  172. )
  173. # simulate slow running extractor that completes after 2 seconds
  174. time.sleep(2)
  175. self.archiveresult.update_for_workers(output='completed')
  176. @backoff.enter
  177. def enter_backoff(self):
  178. print(f'{self}.on_backoff() ↳ archiveresult.retries += 1, archiveresult.bump_retry_at(+60s), archiveresult.end_ts = None')
  179. self.archiveresult.update_for_workers(
  180. retry_at=timezone.now() + timedelta(seconds=60),
  181. status=ArchiveResult.StatusChoices.BACKOFF,
  182. end_ts=None,
  183. # retries=F('retries') + 1, # F() equivalent to getattr(self.archiveresult, 'retries', 0) + 1,
  184. )
  185. self.archiveresult.save(write_indexes=True)
  186. @succeeded.enter
  187. def enter_succeeded(self):
  188. print(f'{self}.on_succeeded() ↳ archiveresult.retry_at = None, archiveresult.end_ts = now()')
  189. self.archiveresult.update_for_workers(
  190. retry_at=None,
  191. status=ArchiveResult.StatusChoices.SUCCEEDED,
  192. end_ts=timezone.now(),
  193. # **self.archiveresult.get_output_dict(), # {output, output_json, stderr, stdout, returncode, errors, cmd_version, pwd, cmd, machine}
  194. )
  195. self.archiveresult.save(write_indexes=True)
  196. @failed.enter
  197. def enter_failed(self):
  198. print(f'{self}.on_failed() ↳ archiveresult.retry_at = None, archiveresult.end_ts = now()')
  199. self.archiveresult.update_for_workers(
  200. retry_at=None,
  201. status=ArchiveResult.StatusChoices.FAILED,
  202. end_ts=timezone.now(),
  203. # **self.archiveresult.get_output_dict(), # {output, output_json, stderr, stdout, returncode, errors, cmd_version, pwd, cmd, machine}
  204. )
  205. def after_transition(self, event: str, source: State, target: State):
  206. # print(f"after '{event}' from '{source.id}' to '{target.id}'")
  207. self.archiveresult.snapshot.update_for_workers() # bump snapshot retry time so it picks up all the new changes
  208. # class ArchiveResultWorker(ActorType[ArchiveResult]):
  209. # """
  210. # The primary actor for progressing ArchiveResult objects
  211. # through their lifecycle using the ArchiveResultMachine.
  212. # """
  213. # Model = ArchiveResult
  214. # StateMachineClass = ArchiveResultMachine
  215. # ACTIVE_STATE: ClassVar[State] = ArchiveResultMachine.started # 'started'
  216. # MAX_CONCURRENT_ACTORS: ClassVar[int] = 6
  217. # MAX_TICK_TIME: ClassVar[int] = 60
  218. # CLAIM_FROM_TOP_N: ClassVar[int] = MAX_CONCURRENT_ACTORS * 10