statemachines.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. __package__ = 'archivebox.snapshots'
  2. from django.utils import timezone
  3. from statemachine import State, StateMachine
  4. from core.models import Snapshot, ArchiveResult
  5. # State Machine Definitions
  6. #################################################
  7. class SnapshotMachine(StateMachine, strict_states=True):
  8. """State machine for managing Snapshot lifecycle."""
  9. model: Snapshot
  10. # States
  11. queued = State(value=Snapshot.StatusChoices.QUEUED, initial=True)
  12. started = State(value=Snapshot.StatusChoices.STARTED)
  13. sealed = State(value=Snapshot.StatusChoices.SEALED, final=True)
  14. # Tick Event
  15. tick = (
  16. queued.to.itself(unless='can_start', internal=True) |
  17. queued.to(started, cond='can_start') |
  18. started.to.itself(unless='is_finished', internal=True) |
  19. started.to(sealed, cond='is_finished')
  20. )
  21. def __init__(self, snapshot, *args, **kwargs):
  22. self.snapshot = snapshot
  23. super().__init__(snapshot, *args, **kwargs)
  24. def can_start(self) -> bool:
  25. return self.snapshot.seed and self.snapshot.seed.uri
  26. def is_finished(self) -> bool:
  27. return not self.snapshot.has_pending_archiveresults()
  28. def on_started(self):
  29. self.snapshot.create_pending_archiveresults()
  30. self.snapshot.bump_retry_at(seconds=60)
  31. self.snapshot.save()
  32. def on_sealed(self):
  33. self.snapshot.retry_at = None
  34. self.snapshot.save()
  35. class ArchiveResultMachine(StateMachine, strict_states=True):
  36. """State machine for managing ArchiveResult lifecycle."""
  37. model: ArchiveResult
  38. # States
  39. queued = State(value=ArchiveResult.StatusChoices.QUEUED, initial=True)
  40. started = State(value=ArchiveResult.StatusChoices.STARTED)
  41. backoff = State(value=ArchiveResult.StatusChoices.BACKOFF)
  42. succeeded = State(value=ArchiveResult.StatusChoices.SUCCEEDED, final=True)
  43. failed = State(value=ArchiveResult.StatusChoices.FAILED, final=True)
  44. # Tick Event
  45. tick = (
  46. queued.to.itself(unless='can_start', internal=True) |
  47. queued.to(started, cond='can_start') |
  48. started.to.itself(unless='is_finished', internal=True) |
  49. started.to(succeeded, cond='is_succeeded') |
  50. started.to(failed, cond='is_failed') |
  51. started.to(backoff, cond='is_backoff') |
  52. backoff.to.itself(unless='can_start', internal=True) |
  53. backoff.to(started, cond='can_start') |
  54. backoff.to(succeeded, cond='is_succeeded') |
  55. backoff.to(failed, cond='is_failed')
  56. )
  57. def __init__(self, archiveresult, *args, **kwargs):
  58. self.archiveresult = archiveresult
  59. super().__init__(archiveresult, *args, **kwargs)
  60. def can_start(self) -> bool:
  61. return self.archiveresult.snapshot and self.archiveresult.snapshot.STATE == Snapshot.active_state
  62. def is_succeeded(self) -> bool:
  63. return self.archiveresult.output_exists()
  64. def is_failed(self) -> bool:
  65. return not self.archiveresult.output_exists()
  66. def is_backoff(self) -> bool:
  67. return self.archiveresult.STATE == ArchiveResult.StatusChoices.BACKOFF
  68. def is_finished(self) -> bool:
  69. return self.is_failed() or self.is_succeeded()
  70. def on_started(self):
  71. self.archiveresult.start_ts = timezone.now()
  72. self.archiveresult.create_output_dir()
  73. self.archiveresult.bump_retry_at(seconds=60)
  74. self.archiveresult.save()
  75. def on_backoff(self):
  76. self.archiveresult.bump_retry_at(seconds=60)
  77. self.archiveresult.save()
  78. def on_succeeded(self):
  79. self.archiveresult.end_ts = timezone.now()
  80. self.archiveresult.save()
  81. def on_failed(self):
  82. self.archiveresult.end_ts = timezone.now()
  83. self.archiveresult.save()
  84. def after_transition(self, event: str, source: State, target: State):
  85. print(f"after '{event}' from '{source.id}' to '{target.id}'")
  86. # self.archiveresult.save_merkle_index()
  87. # self.archiveresult.save_html_index()
  88. # self.archiveresult.save_json_index()
  89. return "after_transition"