|
|
@@ -270,27 +270,31 @@ class ActorType(Generic[ModelType]):
|
|
|
@classproperty
|
|
|
def final_q(cls) -> Q:
|
|
|
"""Get the filter for objects that are already completed / in a final state"""
|
|
|
- return Q(**{f'{cls.Model.state_field_name}__in': [cls._state_to_str(s) for s in cls.StateMachineClass.final_states]})
|
|
|
+ return Q(**{
|
|
|
+ f'{cls.Model.state_field_name}__in': [cls._state_to_str(s) for s in cls.StateMachineClass.final_states],
|
|
|
+ }) # status__in=('sealed', 'failed', 'succeeded')
|
|
|
|
|
|
@classproperty
|
|
|
def active_q(cls) -> Q:
|
|
|
- """Get the filter for objects that are actively processing right now"""
|
|
|
- return Q(**{cls.Model.state_field_name: cls._state_to_str(cls.ACTIVE_STATE)}) # e.g. Q(status='started')
|
|
|
+ """Get the filter for objects that are marked active (and are still running / not timed out)"""
|
|
|
+ return Q(retry_at__gte=timezone.now(), **{cls.Model.state_field_name: cls._state_to_str(cls.ACTIVE_STATE)}) # e.g. Q(status='started')
|
|
|
|
|
|
@classproperty
|
|
|
def stalled_q(cls) -> Q:
|
|
|
- """Get the filter for objects that are marked active but have timed out"""
|
|
|
- return cls.active_q & Q(retry_at__lte=timezone.now()) # e.g. Q(status='started') AND Q(<retry_at is in the past>)
|
|
|
+ """Get the filter for objects that are marked active but are timed out"""
|
|
|
+ return Q(retry_at__lte=timezone.now(), **{cls.Model.state_field_name: cls._state_to_str(cls.ACTIVE_STATE)}) # e.g. Q(status='started') AND Q(<retry_at is in the past>)
|
|
|
|
|
|
@classproperty
|
|
|
def future_q(cls) -> Q:
|
|
|
"""Get the filter for objects that have a retry_at in the future"""
|
|
|
- return Q(retry_at__gt=timezone.now())
|
|
|
+ return Q(retry_at__gt=timezone.now(), **{cls.Model.state_field_name: 'QUEUED'})
|
|
|
|
|
|
@classproperty
|
|
|
def pending_q(cls) -> Q:
|
|
|
"""Get the filter for objects that are ready for processing."""
|
|
|
- return (~(cls.active_q) & ~(cls.final_q)) | Q(retry_at__lte=timezone.now())
|
|
|
+ return ~Q(**{
|
|
|
+ f'{cls.Model.state_field_name}__in': (*[cls._state_to_str(s) for s in cls.StateMachineClass.final_states], cls._state_to_str(cls.ACTIVE_STATE))
|
|
|
+ }) # status__not_in=('sealed', 'failed', 'succeeded', 'started')
|
|
|
|
|
|
@classmethod
|
|
|
def get_queue(cls, sort: bool=True) -> QuerySet[ModelType]:
|
|
|
@@ -298,7 +302,7 @@ class ActorType(Generic[ModelType]):
|
|
|
Get the sorted and filtered QuerySet of objects that are ready for processing.
|
|
|
e.g. qs.exclude(status__in=('sealed', 'started'), retry_at__gt=timezone.now()).order_by('retry_at')
|
|
|
"""
|
|
|
- unsorted_qs = cls.qs.filter(cls.pending_q)
|
|
|
+ unsorted_qs = cls.qs.filter(cls.pending_q) | cls.qs.filter(cls.stalled_q)
|
|
|
return unsorted_qs.order_by(*cls.CLAIM_ORDER) if sort else unsorted_qs
|
|
|
|
|
|
### Instance Methods: Only called from within Actor instance after it has been spawned (i.e. forked as a thread or process)
|
|
|
@@ -324,7 +328,7 @@ class ActorType(Generic[ModelType]):
|
|
|
if self.idle_count >= 3:
|
|
|
break # stop looping and exit if queue is empty and we have idled for 30sec
|
|
|
else:
|
|
|
- # print('Actor runloop()', f'pid={self.pid}', 'queue empty, rechecking...')
|
|
|
+ print('Actor runloop()', f'pid={self.pid}', 'queue empty, rechecking...')
|
|
|
self.idle_count += 1
|
|
|
time.sleep(1)
|
|
|
continue
|
|
|
@@ -335,7 +339,7 @@ class ActorType(Generic[ModelType]):
|
|
|
self.tick(obj_to_process)
|
|
|
except Exception as err:
|
|
|
last_error = err
|
|
|
- # print(f'[red]🏃♂️ {self}.tick()[/red] {obj_to_process} ERROR: [red]{type(err).__name__}: {err}[/red]')
|
|
|
+ print(f'[red]🏃♂️ {self}.tick()[/red] {obj_to_process} ERROR: [red]{type(err).__name__}: {err}[/red]')
|
|
|
db.connections.close_all() # always reset the db connection after an exception to clear any pending transactions
|
|
|
self.on_tick_exception(obj_to_process, err)
|
|
|
traceback.print_exc()
|
|
|
@@ -362,7 +366,7 @@ class ActorType(Generic[ModelType]):
|
|
|
Can be a defined as a normal method (instead of classmethod) on subclasses if it needs to access instance vars.
|
|
|
"""
|
|
|
return {
|
|
|
- cls.Model.state_field_name: cls._state_to_str(cls.ACTIVE_STATE),
|
|
|
+ # cls.Model.state_field_name: cls._state_to_str(cls.ACTIVE_STATE), # do this manually in the state machine enter hooks
|
|
|
'retry_at': timezone.now() + timedelta(seconds=cls.MAX_TICK_TIME),
|
|
|
}
|
|
|
|
|
|
@@ -465,7 +469,7 @@ class ActorType(Generic[ModelType]):
|
|
|
def on_startup(self) -> None:
|
|
|
if self.mode == 'thread':
|
|
|
# self.pid = get_native_id() # thread id
|
|
|
- # print(f'[green]🏃♂️ {self}.on_startup() STARTUP (THREAD)[/green]')
|
|
|
+ print(f'[green]🏃♂️ {self}.on_startup() STARTUP (THREAD)[/green]')
|
|
|
raise NotImplementedError('Thread-based actors are disabled to reduce codebase complexity. Please use processes for everything')
|
|
|
else:
|
|
|
self.pid = os.getpid() # process id
|
|
|
@@ -486,13 +490,13 @@ class ActorType(Generic[ModelType]):
|
|
|
# abx.pm.hook.on_actor_shutdown(actor=self, last_obj=last_obj, last_error=last_error)
|
|
|
|
|
|
def on_tick_start(self, obj_to_process: ModelType) -> None:
|
|
|
- # print(f'🏃♂️ {self}.on_tick_start() {obj_to_process.ABID} {obj_to_process.status} {obj_to_process.retry_at}')
|
|
|
+ print(f'🏃♂️ {self}.on_tick_start() {obj_to_process.ABID} {obj_to_process.status} {obj_to_process.retry_at}')
|
|
|
# abx.pm.hook.on_actor_tick_start(actor=self, obj_to_process=obj)
|
|
|
# self.timer = TimedProgress(self.MAX_TICK_TIME, prefix=' ')
|
|
|
pass
|
|
|
|
|
|
def on_tick_end(self, obj_to_process: ModelType) -> None:
|
|
|
- # print(f'🏃♂️ {self}.on_tick_end() {obj_to_process.ABID} {obj_to_process.status} {obj_to_process.retry_at}')
|
|
|
+ print(f'🏃♂️ {self}.on_tick_end() {obj_to_process.ABID} {obj_to_process.status} {obj_to_process.retry_at}')
|
|
|
# abx.pm.hook.on_actor_tick_end(actor=self, obj_to_process=obj_to_process)
|
|
|
# self.timer.end()
|
|
|
pass
|