| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255 |
- __package__ = 'archivebox.core'
- import time
- from datetime import timedelta
- from typing import ClassVar
- from django.utils import timezone
- from statemachine import State, StateMachine
- from actors.actor import ActorType
- from core.models import Snapshot, ArchiveResult
- class SnapshotMachine(StateMachine, strict_states=True):
- """
- State machine for managing Snapshot lifecycle.
-
- https://github.com/ArchiveBox/ArchiveBox/wiki/ArchiveBox-Architecture-Diagrams
- """
-
- model: Snapshot
-
- # States
- queued = State(value=Snapshot.StatusChoices.QUEUED, initial=True)
- started = State(value=Snapshot.StatusChoices.STARTED)
- sealed = State(value=Snapshot.StatusChoices.SEALED, final=True)
-
- # Tick Event
- tick = (
- queued.to.itself(unless='can_start') |
- queued.to(started, cond='can_start') |
- started.to.itself(unless='is_finished') |
- started.to(sealed, cond='is_finished')
- )
-
- def __init__(self, snapshot, *args, **kwargs):
- self.snapshot = snapshot
- super().__init__(snapshot, *args, **kwargs)
-
- def can_start(self) -> bool:
- can_start = bool(self.snapshot.url)
- if not can_start:
- print(f'SnapshotMachine[{self.snapshot.ABID}].can_start() False: {self.snapshot.url} {self.snapshot.retry_at} {timezone.now()}')
- return can_start
-
- def is_finished(self) -> bool:
- # if no archiveresults exist yet, it's not finished
- if not self.snapshot.archiveresult_set.exists():
- return False
-
- # if archiveresults exist but are still pending, it's not finished
- if self.snapshot.pending_archiveresults().exists():
- return False
-
- # otherwise archiveresults exist and are all finished, so it's finished
- return True
-
- def on_transition(self, event, state):
- print(f'SnapshotMachine[{self.snapshot.ABID}].on_transition() {event} -> {state}')
-
- @queued.enter
- def enter_queued(self):
- print(f'SnapshotMachine[{self.snapshot.ABID}].on_queued(): snapshot.retry_at = now()')
- self.snapshot.update_for_workers(
- retry_at=timezone.now(),
- status=Snapshot.StatusChoices.QUEUED,
- )
-
- @started.enter
- def enter_started(self):
- print(f'SnapshotMachine[{self.snapshot.ABID}].on_started(): snapshot.create_pending_archiveresults() + snapshot.bump_retry_at(+60s)')
- # lock the snapshot while we create the pending archiveresults
- self.snapshot.update_for_workers(
- retry_at=timezone.now() + timedelta(seconds=30), # if failed, wait 30s before retrying
- )
- # create the pending archiveresults
- self.snapshot.create_pending_archiveresults()
-
- # unlock the snapshot after we're done creating the pending archiveresults + set status = started
- self.snapshot.update_for_workers(
- retry_at=timezone.now() + timedelta(seconds=5), # wait 5s before checking it again
- status=Snapshot.StatusChoices.STARTED,
- )
-
- @sealed.enter
- def enter_sealed(self):
- print(f'SnapshotMachine[{self.snapshot.ABID}].on_sealed(): snapshot.retry_at=None')
- self.snapshot.update_for_workers(
- retry_at=None,
- status=Snapshot.StatusChoices.SEALED,
- )
- class SnapshotWorker(ActorType[Snapshot]):
- """
- The primary actor for progressing Snapshot objects
- through their lifecycle using the SnapshotMachine.
- """
- Model = Snapshot
- StateMachineClass = SnapshotMachine
-
- ACTIVE_STATE: ClassVar[State] = SnapshotMachine.started # 'started'
-
- MAX_CONCURRENT_ACTORS: ClassVar[int] = 3
- MAX_TICK_TIME: ClassVar[int] = 10
- CLAIM_FROM_TOP_N: ClassVar[int] = MAX_CONCURRENT_ACTORS * 10
- class ArchiveResultMachine(StateMachine, strict_states=True):
- """
- State machine for managing ArchiveResult lifecycle.
-
- https://github.com/ArchiveBox/ArchiveBox/wiki/ArchiveBox-Architecture-Diagrams
- """
-
- model: ArchiveResult
-
- # States
- queued = State(value=ArchiveResult.StatusChoices.QUEUED, initial=True)
- started = State(value=ArchiveResult.StatusChoices.STARTED)
- backoff = State(value=ArchiveResult.StatusChoices.BACKOFF)
- succeeded = State(value=ArchiveResult.StatusChoices.SUCCEEDED, final=True)
- failed = State(value=ArchiveResult.StatusChoices.FAILED, final=True)
-
- # Tick Event
- tick = (
- queued.to.itself(unless='can_start') |
- queued.to(started, cond='can_start') |
- started.to.itself(unless='is_finished') |
- started.to(succeeded, cond='is_succeeded') |
- started.to(failed, cond='is_failed') |
- started.to(backoff, cond='is_backoff') |
- backoff.to.itself(unless='can_start') |
- backoff.to(started, cond='can_start') |
- backoff.to(succeeded, cond='is_succeeded') |
- backoff.to(failed, cond='is_failed')
- )
- def __init__(self, archiveresult, *args, **kwargs):
- self.archiveresult = archiveresult
- super().__init__(archiveresult, *args, **kwargs)
-
- def can_start(self) -> bool:
- can_start = bool(self.archiveresult.snapshot.url)
- if not can_start:
- print(f'ArchiveResultMachine[{self.archiveresult.ABID}].can_start() False: {self.archiveresult.snapshot.url} {self.archiveresult.retry_at} {timezone.now()}')
- return can_start
-
- def is_succeeded(self) -> bool:
- if self.archiveresult.output and 'err' not in self.archiveresult.output.lower():
- return True
- return False
-
- def is_failed(self) -> bool:
- if self.archiveresult.output and 'err' in self.archiveresult.output.lower():
- return True
- return False
-
- def is_backoff(self) -> bool:
- if self.archiveresult.output is None:
- return True
- return False
-
- def is_finished(self) -> bool:
- return self.is_failed() or self.is_succeeded()
-
- @queued.enter
- def enter_queued(self):
- print(f'ArchiveResultMachine[{self.archiveresult.ABID}].on_queued(): archiveresult.retry_at = now()')
- self.archiveresult.update_for_workers(
- retry_at=timezone.now(),
- status=ArchiveResult.StatusChoices.QUEUED,
- start_ts=None,
- ) # bump the snapshot's retry_at so they pickup any new changes
-
- @started.enter
- def enter_started(self):
- print(f'ArchiveResultMachine[{self.archiveresult.ABID}].on_started(): archiveresult.start_ts + create_output_dir() + bump_retry_at(+60s)')
- # lock the object for the next 30sec
- self.archiveresult.update_for_workers(
- retry_at=timezone.now() + timedelta(seconds=30),
- status=ArchiveResult.StatusChoices.QUEUED,
- start_ts=timezone.now(),
- ) # lock the obj for the next ~30s to limit racing with other workers
-
- # create the output directory and fork the new extractor job subprocess
- self.archiveresult.create_output_dir()
- # self.archiveresult.extract(background=True)
-
- # mark the object as started
- self.archiveresult.update_for_workers(
- retry_at=timezone.now() + timedelta(seconds=30), # retry it again in 30s if it fails
- status=ArchiveResult.StatusChoices.STARTED,
- )
-
- # simulate slow running extractor that completes after 2 seconds
- time.sleep(2)
- self.archiveresult.update_for_workers(output='completed')
- @backoff.enter
- def enter_backoff(self):
- print(f'ArchiveResultMachine[{self.archiveresult.ABID}].on_backoff(): archiveresult.retries += 1, archiveresult.bump_retry_at(+60s), archiveresult.end_ts = None')
- self.archiveresult.update_for_workers(
- retry_at=timezone.now() + timedelta(seconds=60),
- status=ArchiveResult.StatusChoices.BACKOFF,
- end_ts=None,
- # retries=F('retries') + 1, # F() equivalent to getattr(self.archiveresult, 'retries', 0) + 1,
- )
- self.archiveresult.save(write_indexes=True)
-
- @succeeded.enter
- def enter_succeeded(self):
- print(f'ArchiveResultMachine[{self.archiveresult.ABID}].on_succeeded(): archiveresult.retry_at = None, archiveresult.end_ts = now()')
- self.archiveresult.update_for_workers(
- retry_at=None,
- status=ArchiveResult.StatusChoices.SUCCEEDED,
- end_ts=timezone.now(),
- # **self.archiveresult.get_output_dict(), # {output, output_json, stderr, stdout, returncode, errors, cmd_version, pwd, cmd, machine}
- )
- self.archiveresult.save(write_indexes=True)
- @failed.enter
- def enter_failed(self):
- print(f'ArchiveResultMachine[{self.archiveresult.ABID}].on_failed(): archivebox.retry_at = None, archiveresult.end_ts = now()')
- self.archiveresult.update_for_workers(
- retry_at=None,
- status=ArchiveResult.StatusChoices.FAILED,
- end_ts=timezone.now(),
- # **self.archiveresult.get_output_dict(), # {output, output_json, stderr, stdout, returncode, errors, cmd_version, pwd, cmd, machine}
- )
-
- def after_transition(self, event: str, source: State, target: State):
- # print(f"after '{event}' from '{source.id}' to '{target.id}'")
- self.archiveresult.snapshot.update_for_workers() # bump snapshot retry time so it picks up all the new changes
- class ArchiveResultWorker(ActorType[ArchiveResult]):
- """
- The primary actor for progressing ArchiveResult objects
- through their lifecycle using the ArchiveResultMachine.
- """
- Model = ArchiveResult
- StateMachineClass = ArchiveResultMachine
-
- ACTIVE_STATE: ClassVar[State] = ArchiveResultMachine.started # 'started'
-
- MAX_CONCURRENT_ACTORS: ClassVar[int] = 6
- MAX_TICK_TIME: ClassVar[int] = 60
- CLAIM_FROM_TOP_N: ClassVar[int] = MAX_CONCURRENT_ACTORS * 10
|