statemachines.py 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255
  1. __package__ = 'archivebox.core'
  2. import time
  3. from datetime import timedelta
  4. from typing import ClassVar
  5. from django.utils import timezone
  6. from statemachine import State, StateMachine
  7. from actors.actor import ActorType
  8. from core.models import Snapshot, ArchiveResult
  9. class SnapshotMachine(StateMachine, strict_states=True):
  10. """
  11. State machine for managing Snapshot lifecycle.
  12. https://github.com/ArchiveBox/ArchiveBox/wiki/ArchiveBox-Architecture-Diagrams
  13. """
  14. model: Snapshot
  15. # States
  16. queued = State(value=Snapshot.StatusChoices.QUEUED, initial=True)
  17. started = State(value=Snapshot.StatusChoices.STARTED)
  18. sealed = State(value=Snapshot.StatusChoices.SEALED, final=True)
  19. # Tick Event
  20. tick = (
  21. queued.to.itself(unless='can_start') |
  22. queued.to(started, cond='can_start') |
  23. started.to.itself(unless='is_finished') |
  24. started.to(sealed, cond='is_finished')
  25. )
  26. def __init__(self, snapshot, *args, **kwargs):
  27. self.snapshot = snapshot
  28. super().__init__(snapshot, *args, **kwargs)
  29. def can_start(self) -> bool:
  30. can_start = bool(self.snapshot.url)
  31. if not can_start:
  32. print(f'SnapshotMachine[{self.snapshot.ABID}].can_start() False: {self.snapshot.url} {self.snapshot.retry_at} {timezone.now()}')
  33. return can_start
  34. def is_finished(self) -> bool:
  35. # if no archiveresults exist yet, it's not finished
  36. if not self.snapshot.archiveresult_set.exists():
  37. return False
  38. # if archiveresults exist but are still pending, it's not finished
  39. if self.snapshot.pending_archiveresults().exists():
  40. return False
  41. # otherwise archiveresults exist and are all finished, so it's finished
  42. return True
  43. def on_transition(self, event, state):
  44. print(f'SnapshotMachine[{self.snapshot.ABID}].on_transition() {event} -> {state}')
  45. @queued.enter
  46. def enter_queued(self):
  47. print(f'SnapshotMachine[{self.snapshot.ABID}].on_queued(): snapshot.retry_at = now()')
  48. self.snapshot.update_for_workers(
  49. retry_at=timezone.now(),
  50. status=Snapshot.StatusChoices.QUEUED,
  51. )
  52. @started.enter
  53. def enter_started(self):
  54. print(f'SnapshotMachine[{self.snapshot.ABID}].on_started(): snapshot.create_pending_archiveresults() + snapshot.bump_retry_at(+60s)')
  55. # lock the snapshot while we create the pending archiveresults
  56. self.snapshot.update_for_workers(
  57. retry_at=timezone.now() + timedelta(seconds=30), # if failed, wait 30s before retrying
  58. )
  59. # create the pending archiveresults
  60. self.snapshot.create_pending_archiveresults()
  61. # unlock the snapshot after we're done creating the pending archiveresults + set status = started
  62. self.snapshot.update_for_workers(
  63. retry_at=timezone.now() + timedelta(seconds=5), # wait 5s before checking it again
  64. status=Snapshot.StatusChoices.STARTED,
  65. )
  66. @sealed.enter
  67. def enter_sealed(self):
  68. print(f'SnapshotMachine[{self.snapshot.ABID}].on_sealed(): snapshot.retry_at=None')
  69. self.snapshot.update_for_workers(
  70. retry_at=None,
  71. status=Snapshot.StatusChoices.SEALED,
  72. )
  73. class SnapshotWorker(ActorType[Snapshot]):
  74. """
  75. The primary actor for progressing Snapshot objects
  76. through their lifecycle using the SnapshotMachine.
  77. """
  78. Model = Snapshot
  79. StateMachineClass = SnapshotMachine
  80. ACTIVE_STATE: ClassVar[State] = SnapshotMachine.started # 'started'
  81. MAX_CONCURRENT_ACTORS: ClassVar[int] = 3
  82. MAX_TICK_TIME: ClassVar[int] = 10
  83. CLAIM_FROM_TOP_N: ClassVar[int] = MAX_CONCURRENT_ACTORS * 10
  84. class ArchiveResultMachine(StateMachine, strict_states=True):
  85. """
  86. State machine for managing ArchiveResult lifecycle.
  87. https://github.com/ArchiveBox/ArchiveBox/wiki/ArchiveBox-Architecture-Diagrams
  88. """
  89. model: ArchiveResult
  90. # States
  91. queued = State(value=ArchiveResult.StatusChoices.QUEUED, initial=True)
  92. started = State(value=ArchiveResult.StatusChoices.STARTED)
  93. backoff = State(value=ArchiveResult.StatusChoices.BACKOFF)
  94. succeeded = State(value=ArchiveResult.StatusChoices.SUCCEEDED, final=True)
  95. failed = State(value=ArchiveResult.StatusChoices.FAILED, final=True)
  96. # Tick Event
  97. tick = (
  98. queued.to.itself(unless='can_start') |
  99. queued.to(started, cond='can_start') |
  100. started.to.itself(unless='is_finished') |
  101. started.to(succeeded, cond='is_succeeded') |
  102. started.to(failed, cond='is_failed') |
  103. started.to(backoff, cond='is_backoff') |
  104. backoff.to.itself(unless='can_start') |
  105. backoff.to(started, cond='can_start') |
  106. backoff.to(succeeded, cond='is_succeeded') |
  107. backoff.to(failed, cond='is_failed')
  108. )
  109. def __init__(self, archiveresult, *args, **kwargs):
  110. self.archiveresult = archiveresult
  111. super().__init__(archiveresult, *args, **kwargs)
  112. def can_start(self) -> bool:
  113. can_start = bool(self.archiveresult.snapshot.url)
  114. if not can_start:
  115. print(f'ArchiveResultMachine[{self.archiveresult.ABID}].can_start() False: {self.archiveresult.snapshot.url} {self.archiveresult.retry_at} {timezone.now()}')
  116. return can_start
  117. def is_succeeded(self) -> bool:
  118. if self.archiveresult.output and 'err' not in self.archiveresult.output.lower():
  119. return True
  120. return False
  121. def is_failed(self) -> bool:
  122. if self.archiveresult.output and 'err' in self.archiveresult.output.lower():
  123. return True
  124. return False
  125. def is_backoff(self) -> bool:
  126. if self.archiveresult.output is None:
  127. return True
  128. return False
  129. def is_finished(self) -> bool:
  130. return self.is_failed() or self.is_succeeded()
  131. @queued.enter
  132. def enter_queued(self):
  133. print(f'ArchiveResultMachine[{self.archiveresult.ABID}].on_queued(): archiveresult.retry_at = now()')
  134. self.archiveresult.update_for_workers(
  135. retry_at=timezone.now(),
  136. status=ArchiveResult.StatusChoices.QUEUED,
  137. start_ts=None,
  138. ) # bump the snapshot's retry_at so they pickup any new changes
  139. @started.enter
  140. def enter_started(self):
  141. print(f'ArchiveResultMachine[{self.archiveresult.ABID}].on_started(): archiveresult.start_ts + create_output_dir() + bump_retry_at(+60s)')
  142. # lock the object for the next 30sec
  143. self.archiveresult.update_for_workers(
  144. retry_at=timezone.now() + timedelta(seconds=30),
  145. status=ArchiveResult.StatusChoices.QUEUED,
  146. start_ts=timezone.now(),
  147. ) # lock the obj for the next ~30s to limit racing with other workers
  148. # create the output directory and fork the new extractor job subprocess
  149. self.archiveresult.create_output_dir()
  150. # self.archiveresult.extract(background=True)
  151. # mark the object as started
  152. self.archiveresult.update_for_workers(
  153. retry_at=timezone.now() + timedelta(seconds=30), # retry it again in 30s if it fails
  154. status=ArchiveResult.StatusChoices.STARTED,
  155. )
  156. # simulate slow running extractor that completes after 2 seconds
  157. time.sleep(2)
  158. self.archiveresult.update_for_workers(output='completed')
  159. @backoff.enter
  160. def enter_backoff(self):
  161. print(f'ArchiveResultMachine[{self.archiveresult.ABID}].on_backoff(): archiveresult.retries += 1, archiveresult.bump_retry_at(+60s), archiveresult.end_ts = None')
  162. self.archiveresult.update_for_workers(
  163. retry_at=timezone.now() + timedelta(seconds=60),
  164. status=ArchiveResult.StatusChoices.BACKOFF,
  165. end_ts=None,
  166. # retries=F('retries') + 1, # F() equivalent to getattr(self.archiveresult, 'retries', 0) + 1,
  167. )
  168. self.archiveresult.save(write_indexes=True)
  169. @succeeded.enter
  170. def enter_succeeded(self):
  171. print(f'ArchiveResultMachine[{self.archiveresult.ABID}].on_succeeded(): archiveresult.retry_at = None, archiveresult.end_ts = now()')
  172. self.archiveresult.update_for_workers(
  173. retry_at=None,
  174. status=ArchiveResult.StatusChoices.SUCCEEDED,
  175. end_ts=timezone.now(),
  176. # **self.archiveresult.get_output_dict(), # {output, output_json, stderr, stdout, returncode, errors, cmd_version, pwd, cmd, machine}
  177. )
  178. self.archiveresult.save(write_indexes=True)
  179. @failed.enter
  180. def enter_failed(self):
  181. print(f'ArchiveResultMachine[{self.archiveresult.ABID}].on_failed(): archivebox.retry_at = None, archiveresult.end_ts = now()')
  182. self.archiveresult.update_for_workers(
  183. retry_at=None,
  184. status=ArchiveResult.StatusChoices.FAILED,
  185. end_ts=timezone.now(),
  186. # **self.archiveresult.get_output_dict(), # {output, output_json, stderr, stdout, returncode, errors, cmd_version, pwd, cmd, machine}
  187. )
  188. def after_transition(self, event: str, source: State, target: State):
  189. # print(f"after '{event}' from '{source.id}' to '{target.id}'")
  190. self.archiveresult.snapshot.update_for_workers() # bump snapshot retry time so it picks up all the new changes
  191. class ArchiveResultWorker(ActorType[ArchiveResult]):
  192. """
  193. The primary actor for progressing ArchiveResult objects
  194. through their lifecycle using the ArchiveResultMachine.
  195. """
  196. Model = ArchiveResult
  197. StateMachineClass = ArchiveResultMachine
  198. ACTIVE_STATE: ClassVar[State] = ArchiveResultMachine.started # 'started'
  199. MAX_CONCURRENT_ACTORS: ClassVar[int] = 6
  200. MAX_TICK_TIME: ClassVar[int] = 60
  201. CLAIM_FROM_TOP_N: ClassVar[int] = MAX_CONCURRENT_ACTORS * 10