actor.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. # __package__ = 'archivebox.workers'
  2. # import time
  3. # from typing import ClassVar, Type, Iterable, TypedDict
  4. # from django.db.models import QuerySet
  5. # from django.db import transaction
  6. # from django.utils import timezone
  7. # from django.utils.functional import classproperty # type: ignore
  8. # from .models import Event, Process, EventDict
  9. # class ActorType:
  10. # # static class attributes
  11. # name: ClassVar[str]
  12. # event_prefix: ClassVar[str]
  13. # poll_interval: ClassVar[int] = 1
  14. # @classproperty
  15. # def event_queue(cls) -> QuerySet[Event]:
  16. # return Event.objects.filter(type__startswith=cls.event_prefix)
  17. # @classmethod
  18. # def fork(cls, wait_for_first_event=False, exit_on_idle=True) -> Process:
  19. # cmd = ['archivebox', 'actor', cls.name]
  20. # if exit_on_idle:
  21. # cmd.append('--exit-on-idle')
  22. # if wait_for_first_event:
  23. # cmd.append('--wait-for-first-event')
  24. # return Process.create_and_fork(cmd=cmd, actor_type=cls.name)
  25. # @classproperty
  26. # def processes(cls) -> QuerySet[Process]:
  27. # return Process.objects.filter(actor_type=cls.name)
  28. # @classmethod
  29. # def run(cls, wait_for_first_event=False, exit_on_idle=True):
  30. # if wait_for_first_event:
  31. # event = cls.event_queue.get_next_unclaimed()
  32. # while not event:
  33. # time.sleep(cls.poll_interval)
  34. # event = cls.event_queue.get_next_unclaimed()
  35. # while True:
  36. # output_events = list(cls.process_next_event()) or list(cls.process_idle_tick()) # process next event, or tick if idle
  37. # yield from output_events
  38. # if not output_events:
  39. # if exit_on_idle:
  40. # break
  41. # else:
  42. # time.sleep(cls.poll_interval)
  43. # @classmethod
  44. # def process_next_event(cls) -> Iterable[EventDict]:
  45. # event = cls.event_queue.get_next_unclaimed()
  46. # output_events = []
  47. # if not event:
  48. # return []
  49. # cls.mark_event_claimed(event, duration=60)
  50. # try:
  51. # for output_event in cls.receive(event):
  52. # output_events.append(output_event)
  53. # yield output_event
  54. # cls.mark_event_succeeded(event, output_events=output_events)
  55. # except BaseException as e:
  56. # cls.mark_event_failed(event, output_events=output_events, error=e)
  57. # @classmethod
  58. # def process_idle_tick(cls) -> Iterable[EventDict]:
  59. # # reset the idle event to be claimed by the current process
  60. # event, _created = Event.objects.update_or_create(
  61. # name=f'{cls.event_prefix}IDLE',
  62. # emitted_by=Process.current(),
  63. # defaults={
  64. # 'deliver_at': timezone.now(),
  65. # 'claimed_proc': None,
  66. # 'claimed_at': None,
  67. # 'finished_at': None,
  68. # 'error': None,
  69. # 'parent': None,
  70. # },
  71. # )
  72. # # then process it like any other event
  73. # yield from cls.process_next_event()
  74. # @classmethod
  75. # def receive(cls, event: Event) -> Iterable[EventDict]:
  76. # handler_method = getattr(cls, f'on_{event.name}', None)
  77. # if handler_method:
  78. # yield from handler_method(event)
  79. # else:
  80. # raise Exception(f'No handler method for event: {event.name}')
  81. # @staticmethod
  82. # def on_IDLE() -> Iterable[EventDict]:
  83. # return []
  84. # @staticmethod
  85. # def mark_event_claimed(event: Event, duration: int=60):
  86. # proc = Process.current()
  87. # with transaction.atomic():
  88. # claimed = Event.objects.filter(id=event.id, claimed_proc=None, claimed_at=None).update(claimed_proc=proc, claimed_at=timezone.now())
  89. # if not claimed:
  90. # event.refresh_from_db()
  91. # raise Exception(f'Event already claimed by another process: {event.claimed_proc}')
  92. # process_updated = Process.objects.filter(id=proc.id, active_event=None).update(active_event=event)
  93. # if not process_updated:
  94. # raise Exception(f'Unable to update process.active_event: {proc}.active_event = {event}')
  95. # @staticmethod
  96. # def mark_event_succeeded(event: Event, output_events: Iterable[EventDict]):
  97. # assert event.claimed_proc and (event.claimed_proc == Process.current())
  98. # with transaction.atomic():
  99. # updated = Event.objects.filter(id=event.id, claimed_proc=event.claimed_proc, claimed_at=event.claimed_at, finished_at=None).update(finished_at=timezone.now())
  100. # if not updated:
  101. # event.refresh_from_db()
  102. # raise Exception(f'Event {event} failed to mark as succeeded, it was modified by another process: {event.claimed_proc}')
  103. # process_updated = Process.objects.filter(id=event.claimed_proc.id, active_event=event).update(active_event=None)
  104. # if not process_updated:
  105. # raise Exception(f'Unable to unset process.active_event: {event.claimed_proc}.active_event = {event}')
  106. # # dispatch any output events
  107. # for output_event in output_events:
  108. # Event.dispatch(event=output_event, parent=event)
  109. # # trigger any callback events
  110. # if event.on_success:
  111. # Event.dispatch(event=event.on_success, parent=event)
  112. # @staticmethod
  113. # def mark_event_failed(event: Event, output_events: Iterable[EventDict]=(), error: BaseException | None = None):
  114. # assert event.claimed_proc and (event.claimed_proc == Process.current())
  115. # with transaction.atomic():
  116. # updated = event.objects.filter(id=event.id, claimed_proc=event.claimed_proc, claimed_at=event.claimed_at, finished_at=None).update(finished_at=timezone.now(), error=str(error))
  117. # if not updated:
  118. # event.refresh_from_db()
  119. # raise Exception(f'Event {event} failed to mark as failed, it was modified by another process: {event.claimed_proc}')
  120. # process_updated = Process.objects.filter(id=event.claimed_proc.id, active_event=event).update(active_event=None)
  121. # if not process_updated:
  122. # raise Exception(f'Unable to unset process.active_event: {event.claimed_proc}.active_event = {event}')
  123. # # add dedicated error event to the output events
  124. # output_events = [
  125. # *output_events,
  126. # {'name': f'{event.name}_ERROR', 'error': f'{type(error).__name__}: {error}'},
  127. # ]
  128. # # dispatch any output events
  129. # for output_event in output_events:
  130. # Event.dispatch(event=output_event, parent=event)
  131. # # trigger any callback events
  132. # if event.on_failure:
  133. # Event.dispatch(event=event.on_failure, parent=event)