orchestrator.py 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206
  1. __package__ = 'archivebox.workers'
  2. import os
  3. import time
  4. import sys
  5. import itertools
  6. from typing import Dict, Type, Literal, TYPE_CHECKING
  7. from django.utils.functional import classproperty
  8. from django.utils import timezone
  9. import multiprocessing
  10. from rich import print
  11. # from django.db.models import QuerySet
  12. from django.apps import apps
  13. if TYPE_CHECKING:
  14. from .actor import ActorType
  15. multiprocessing.set_start_method('fork', force=True)
  16. class Orchestrator:
  17. pid: int
  18. idle_count: int = 0
  19. actor_types: Dict[str, Type['ActorType']] = {}
  20. mode: Literal['thread', 'process'] = 'process'
  21. exit_on_idle: bool = True
  22. max_concurrent_actors: int = 20
  23. def __init__(self, actor_types: Dict[str, Type['ActorType']] | None = None, mode: Literal['thread', 'process'] | None=None, exit_on_idle: bool=True, max_concurrent_actors: int=max_concurrent_actors):
  24. self.actor_types = actor_types or self.actor_types or self.autodiscover_actor_types()
  25. self.mode = mode or self.mode
  26. self.exit_on_idle = exit_on_idle
  27. self.max_concurrent_actors = max_concurrent_actors
  28. def __repr__(self) -> str:
  29. label = 'tid' if self.mode == 'thread' else 'pid'
  30. return f'[underline]{self.name}[/underline]\\[{label}={self.pid}]'
  31. def __str__(self) -> str:
  32. return self.__repr__()
  33. @classproperty
  34. def name(cls) -> str:
  35. return cls.__name__ # type: ignore
  36. # def _fork_as_thread(self):
  37. # self.thread = Thread(target=self.runloop)
  38. # self.thread.start()
  39. # assert self.thread.native_id is not None
  40. # return self.thread.native_id
  41. def _fork_as_process(self):
  42. self.process = multiprocessing.Process(target=self.runloop)
  43. self.process.start()
  44. assert self.process.pid is not None
  45. return self.process.pid
  46. def start(self) -> int:
  47. if self.mode == 'thread':
  48. # return self._fork_as_thread()
  49. raise NotImplementedError('Thread-based orchestrators are disabled for now to reduce codebase complexity')
  50. elif self.mode == 'process':
  51. return self._fork_as_process()
  52. raise ValueError(f'Invalid orchestrator mode: {self.mode}')
  53. @classmethod
  54. def autodiscover_actor_types(cls) -> Dict[str, Type['ActorType']]:
  55. from archivebox.config.django import setup_django
  56. setup_django()
  57. # returns a Dict of all discovered {actor_type_id: ActorType} across the codebase
  58. # override this method in a subclass to customize the actor types that are used
  59. # return {'Snapshot': SnapshotWorker, 'ArchiveResult_chrome': ChromeActorType, ...}
  60. from crawls.statemachines import CrawlWorker
  61. from core.statemachines import SnapshotWorker, ArchiveResultWorker
  62. return {
  63. 'CrawlWorker': CrawlWorker,
  64. 'SnapshotWorker': SnapshotWorker,
  65. 'ArchiveResultWorker': ArchiveResultWorker,
  66. # look through all models and find all classes that inherit from ActorType
  67. # actor_type.__name__: actor_type
  68. # for actor_type in abx.pm.hook.get_all_ACTORS_TYPES().values()
  69. }
  70. @classmethod
  71. def get_orphaned_objects(cls, all_queues) -> list:
  72. # returns a list of objects that are in the queues of all actor types but not in the queues of any other actor types
  73. all_queued_ids = itertools.chain(*[queue.values('id', flat=True) for queue in all_queues.values()])
  74. orphaned_objects = []
  75. for model in apps.get_models():
  76. if hasattr(model, 'retry_at'):
  77. orphaned_objects.extend(model.objects.filter(retry_at__lt=timezone.now()).exclude(id__in=all_queued_ids))
  78. return orphaned_objects
  79. @classmethod
  80. def has_future_objects(cls, all_queues) -> bool:
  81. # returns a list of objects that are in the queues of all actor types but not in the queues of any other actor types
  82. return any(
  83. queue.filter(retry_at__gte=timezone.now()).exists()
  84. for queue in all_queues.values()
  85. )
  86. def on_startup(self):
  87. if self.mode == 'thread':
  88. # self.pid = get_native_id()
  89. print(f'[green]👨‍✈️ {self}.on_startup() STARTUP (THREAD)[/green]')
  90. raise NotImplementedError('Thread-based orchestrators are disabled for now to reduce codebase complexity')
  91. elif self.mode == 'process':
  92. self.pid = os.getpid()
  93. print(f'[green]👨‍✈️ {self}.on_startup() STARTUP (PROCESS)[/green]')
  94. # abx.pm.hook.on_orchestrator_startup(self)
  95. def on_shutdown(self, err: BaseException | None = None):
  96. print(f'[grey53]👨‍✈️ {self}.on_shutdown() SHUTTING DOWN[/grey53]', err or '[green](gracefully)[/green]')
  97. # abx.pm.hook.on_orchestrator_shutdown(self)
  98. def on_tick_started(self, all_queues):
  99. # total_pending = sum(queue.count() for queue in all_queues.values())
  100. # if total_pending:
  101. # print(f'👨‍✈️ {self}.on_tick_started()', f'total_pending={total_pending}')
  102. # abx.pm.hook.on_orchestrator_tick_started(self, actor_types, all_queues)
  103. pass
  104. def on_tick_finished(self, all_queues, all_existing_actors, all_spawned_actors):
  105. # if all_spawned_actors:
  106. # total_queue_length = sum(queue.count() for queue in all_queues.values())
  107. # print(f'[grey53]👨‍✈️ {self}.on_tick_finished() queue={total_queue_length} existing_actors={len(all_existing_actors)} spawned_actors={len(all_spawned_actors)}[/grey53]')
  108. # abx.pm.hook.on_orchestrator_tick_finished(self, actor_types, all_queues)
  109. pass
  110. def on_idle(self, all_queues):
  111. # print(f'👨‍✈️ {self}.on_idle()', f'idle_count={self.idle_count}')
  112. print('.', end='', flush=True, file=sys.stderr)
  113. # abx.pm.hook.on_orchestrator_idle(self)
  114. # check for orphaned objects left behind
  115. if self.idle_count == 60:
  116. orphaned_objects = self.get_orphaned_objects(all_queues)
  117. if orphaned_objects:
  118. print('[red]👨‍✈️ WARNING: some objects may not be processed, no actor has claimed them after 30s:[/red]', orphaned_objects)
  119. if self.idle_count > 3 and self.exit_on_idle and not self.has_future_objects(all_queues):
  120. raise KeyboardInterrupt('✅ All tasks completed, exiting')
  121. def runloop(self):
  122. from archivebox.config.django import setup_django
  123. setup_django()
  124. self.on_startup()
  125. try:
  126. while True:
  127. all_queues = {
  128. actor_type: actor_type.get_queue()
  129. for actor_type in self.actor_types.values()
  130. }
  131. if not all_queues:
  132. raise Exception('Failed to find any actor_types to process')
  133. self.on_tick_started(all_queues)
  134. all_existing_actors = []
  135. all_spawned_actors = []
  136. for actor_type, queue in all_queues.items():
  137. if not queue.exists():
  138. continue
  139. next_obj = queue.first()
  140. print()
  141. print(f'🏃‍♂️ {self}.runloop() {actor_type.__name__.ljust(20)} queue={str(queue.count()).ljust(3)} next={next_obj.abid if next_obj else "None"} {next_obj.status if next_obj else "None"} {(timezone.now() - next_obj.retry_at).total_seconds() if next_obj and next_obj.retry_at else "None"}')
  142. self.idle_count = 0
  143. try:
  144. existing_actors = actor_type.get_running_actors()
  145. all_existing_actors.extend(existing_actors)
  146. actors_to_spawn = actor_type.get_actors_to_spawn(queue, existing_actors)
  147. can_spawn_num_remaining = self.max_concurrent_actors - len(all_existing_actors) # set max_concurrent_actors=1 to disable multitasking
  148. for launch_kwargs in actors_to_spawn[:can_spawn_num_remaining]:
  149. new_actor_pid = actor_type.start(mode='process', **launch_kwargs)
  150. all_spawned_actors.append(new_actor_pid)
  151. except Exception as err:
  152. print(f'🏃‍♂️ ERROR: {self} Failed to get {actor_type} queue & running actors', err)
  153. except BaseException:
  154. raise
  155. if not any(queue.exists() for queue in all_queues.values()):
  156. self.on_idle(all_queues)
  157. self.idle_count += 1
  158. time.sleep(0.5)
  159. else:
  160. self.idle_count = 0
  161. self.on_tick_finished(all_queues, all_existing_actors, all_spawned_actors)
  162. time.sleep(1)
  163. except BaseException as err:
  164. if isinstance(err, KeyboardInterrupt):
  165. print()
  166. else:
  167. print(f'\n[red]🏃‍♂️ {self}.runloop() FATAL:[/red]', err.__class__.__name__, err)
  168. self.on_shutdown(err=err)