Browse Source

fix orchestrator startup and add exit_on_idle option

Nick Sweeting 1 year ago
parent
commit
48bb634b75
2 changed files with 116 additions and 82 deletions
  1. 2 2
      archivebox/actors/models.py
  2. 114 80
      archivebox/actors/orchestrator.py

+ 2 - 2
archivebox/actors/models.py

@@ -18,7 +18,7 @@ class DefaultStatusChoices(models.TextChoices):
 
 
 
 
 default_status_field: models.CharField = models.CharField(choices=DefaultStatusChoices.choices, max_length=15, default=DefaultStatusChoices.QUEUED, null=False, blank=False, db_index=True)
 default_status_field: models.CharField = models.CharField(choices=DefaultStatusChoices.choices, max_length=15, default=DefaultStatusChoices.QUEUED, null=False, blank=False, db_index=True)
-default_retry_at_field: models.DateTimeField = models.DateTimeField(default=timezone.now, null=False, db_index=True)
+default_retry_at_field: models.DateTimeField = models.DateTimeField(default=timezone.now, null=True, blank=True, db_index=True)
 
 
 ObjectState = State | str
 ObjectState = State | str
 ObjectStateList = Iterable[ObjectState]
 ObjectStateList = Iterable[ObjectState]
@@ -164,7 +164,7 @@ class BaseModelWithStateMachine(models.Model, MachineMixin):
         
         
     @classproperty
     @classproperty
     def ACTIVE_STATE(cls) -> str:
     def ACTIVE_STATE(cls) -> str:
-        return cls._state_to_str(cls.StateMachineClass.active_state)
+        return cls._state_to_str(cls.active_state)
         
         
     @classproperty
     @classproperty
     def INITIAL_STATE(cls) -> str:
     def INITIAL_STATE(cls) -> str:

+ 114 - 80
archivebox/actors/orchestrator.py

@@ -3,29 +3,40 @@ __package__ = 'archivebox.actors'
 import os
 import os
 import time
 import time
 import itertools
 import itertools
-from typing import Dict, Type, Literal, ClassVar
+from typing import Dict, Type, Literal, TYPE_CHECKING
 from django.utils.functional import classproperty
 from django.utils.functional import classproperty
+from django.utils import timezone
+
+import multiprocessing
+
 
 
-from multiprocessing import Process
 from threading import Thread, get_native_id
 from threading import Thread, get_native_id
 
 
 
 
 from rich import print
 from rich import print
 
 
-from django.db.models import QuerySet
+# from django.db.models import QuerySet
 
 
 from django.apps import apps
 from django.apps import apps
-from .actor import ActorType
+
+if TYPE_CHECKING:
+    from .actor import ActorType
+
+
+multiprocessing.set_start_method('fork', force=True)
+
 
 
 class Orchestrator:
 class Orchestrator:
     pid: int
     pid: int
     idle_count: int = 0
     idle_count: int = 0
-    actor_types: Dict[str, Type[ActorType]]
+    actor_types: Dict[str, Type['ActorType']] = {}
     mode: Literal['thread', 'process'] = 'process'
     mode: Literal['thread', 'process'] = 'process'
-
-    def __init__(self, actor_types: Dict[str, Type[ActorType]] | None = None, mode: Literal['thread', 'process'] | None=None):
+    exit_on_idle: bool = True
+    
+    def __init__(self, actor_types: Dict[str, Type['ActorType']] | None = None, mode: Literal['thread', 'process'] | None=None, exit_on_idle: bool=True):
         self.actor_types = actor_types or self.actor_types or self.autodiscover_actor_types()
         self.actor_types = actor_types or self.actor_types or self.autodiscover_actor_types()
         self.mode = mode or self.mode
         self.mode = mode or self.mode
+        self.exit_on_idle = exit_on_idle
 
 
     def __repr__(self) -> str:
     def __repr__(self) -> str:
         label = 'tid' if self.mode == 'thread' else 'pid'
         label = 'tid' if self.mode == 'thread' else 'pid'
@@ -45,7 +56,7 @@ class Orchestrator:
         return self.thread.native_id
         return self.thread.native_id
     
     
     def fork_as_process(self):
     def fork_as_process(self):
-        self.process = Process(target=self.runloop)
+        self.process = multiprocessing.Process(target=self.runloop)
         self.process.start()
         self.process.start()
         assert self.process.pid is not None
         assert self.process.pid is not None
         return self.process.pid
         return self.process.pid
@@ -58,11 +69,19 @@ class Orchestrator:
         raise ValueError(f'Invalid orchestrator mode: {self.mode}')
         raise ValueError(f'Invalid orchestrator mode: {self.mode}')
     
     
     @classmethod
     @classmethod
-    def autodiscover_actor_types(cls) -> Dict[str, Type[ActorType]]:
+    def autodiscover_actor_types(cls) -> Dict[str, Type['ActorType']]:
+        from archivebox.config.django import setup_django
+        setup_django()
+        
         # returns a Dict of all discovered {actor_type_id: ActorType} across the codebase
         # returns a Dict of all discovered {actor_type_id: ActorType} across the codebase
         # override this method in a subclass to customize the actor types that are used
         # override this method in a subclass to customize the actor types that are used
         # return {'Snapshot': SnapshotActorType, 'ArchiveResult_chrome': ChromeActorType, ...}
         # return {'Snapshot': SnapshotActorType, 'ArchiveResult_chrome': ChromeActorType, ...}
+        from crawls.actors import CrawlActor
+        from core.actors import SnapshotActor, ArchiveResultActor   
         return {
         return {
+            'CrawlActor': CrawlActor,
+            'SnapshotActor': SnapshotActor,
+            'ArchiveResultActor': ArchiveResultActor,
             # look through all models and find all classes that inherit from ActorType
             # look through all models and find all classes that inherit from ActorType
             # actor_type.__name__: actor_type
             # actor_type.__name__: actor_type
             # for actor_type in abx.pm.hook.get_all_ACTORS_TYPES().values()
             # for actor_type in abx.pm.hook.get_all_ACTORS_TYPES().values()
@@ -111,8 +130,13 @@ class Orchestrator:
             orphaned_objects = self.get_orphaned_objects(all_queues)
             orphaned_objects = self.get_orphaned_objects(all_queues)
             if orphaned_objects:
             if orphaned_objects:
                 print('[red]👨‍✈️ WARNING: some objects may not be processed, no actor has claimed them after 60s:[/red]', orphaned_objects)
                 print('[red]👨‍✈️ WARNING: some objects may not be processed, no actor has claimed them after 60s:[/red]', orphaned_objects)
+        if self.idle_count > 5 and self.exit_on_idle:
+            raise KeyboardInterrupt('Orchestrator has no more tasks to process, exiting')
 
 
     def runloop(self):
     def runloop(self):
+        from archivebox.config.django import setup_django
+        setup_django()
+        
         self.on_startup()
         self.on_startup()
         try:
         try:
             while True:
             while True:
@@ -160,85 +184,95 @@ class Orchestrator:
 
 
 
 
 
 
-from archivebox.config.django import setup_django
+# from archivebox.config.django import setup_django
 
 
-setup_django()
+# setup_django()
 
 
-from core.models import ArchiveResult, Snapshot
+# from core.models import ArchiveResult, Snapshot
 
 
-from django.utils import timezone
+# from django.utils import timezone
+
+# from django import db
+# from django.db import connection
 
 
-from django import db
-from django.db import connection
 
 
+# from crawls.actors import CrawlActor
+# from core.actors import SnapshotActor, ArchiveResultActor
 
 
-from crawls.actors import CrawlActor
-from .actor_snapshot import SnapshotActor
+# class ArchivingOrchestrator(Orchestrator):
+#     actor_types = {
+#         'CrawlActor': CrawlActor,
+#         'SnapshotActor': SnapshotActor,
+#         'ArchiveResultActor': ArchiveResultActor,
+#         # 'FaviconActor': FaviconActor,
+#         # 'SinglefileActor': SinglefileActor,
+#     }
 
 
-from abx_plugin_singlefile.actors import SinglefileActor
+# from abx_plugin_singlefile.actors import SinglefileActor
 
 
 
 
-class FaviconActor(ActorType[ArchiveResult]):
-    CLAIM_ORDER: ClassVar[str] = 'created_at DESC'
-    CLAIM_WHERE: ClassVar[str] = 'status = "queued" AND extractor = "favicon"'
-    CLAIM_SET: ClassVar[str] = 'status = "started"'
+# class FaviconActor(ActorType[ArchiveResult]):
+#     CLAIM_ORDER: ClassVar[str] = 'created_at DESC'
+#     CLAIM_WHERE: ClassVar[str] = 'status = "queued" AND extractor = "favicon"'
+#     CLAIM_SET: ClassVar[str] = 'status = "started"'
     
     
-    @classproperty
-    def QUERYSET(cls) -> QuerySet:
-        return ArchiveResult.objects.filter(status='failed', extractor='favicon')
-
-    def tick(self, obj: ArchiveResult):
-        print(f'[grey53]{self}.tick({obj.abid or obj.id}, status={obj.status}) remaining:[/grey53]', self.get_queue().count())
-        updated = ArchiveResult.objects.filter(id=obj.id, status='started').update(status='success') == 1
-        if not updated:
-            raise Exception(f'Failed to update {obj.abid or obj.id}, interrupted by another actor writing to the same object')
-        obj.refresh_from_db()
-        obj.save()
-
-
-class ExtractorsOrchestrator(Orchestrator):
-    actor_types = {
-        'CrawlActor': CrawlActor,
-        'SnapshotActor': SnapshotActor,
-        'FaviconActor': FaviconActor,
-        'SinglefileActor': SinglefileActor,
-    }
-
-
-if __name__ == '__main__':    
-    orchestrator = ExtractorsOrchestrator()
-    orchestrator.start()
+#     @classproperty
+#     def QUERYSET(cls) -> QuerySet:
+#         return ArchiveResult.objects.filter(status='failed', extractor='favicon')
+
+#     def tick(self, obj: ArchiveResult):
+#         print(f'[grey53]{self}.tick({obj.abid or obj.id}, status={obj.status}) remaining:[/grey53]', self.get_queue().count())
+#         updated = ArchiveResult.objects.filter(id=obj.id, status='started').update(status='success') == 1
+#         if not updated:
+#             raise Exception(f'Failed to update {obj.abid or obj.id}, interrupted by another actor writing to the same object')
+#         obj.refresh_from_db()
+#         obj.save()
+
+
+# class ArchivingOrchestrator(Orchestrator):
+#     actor_types = {
+#         'CrawlActor': CrawlActor,
+#         'SnapshotActor': SnapshotActor,
+#         'ArchiveResultActor': ArchiveResultActor,
+#         # 'FaviconActor': FaviconActor,
+#         # 'SinglefileActor': SinglefileActor,
+#     }
+
+
+# if __name__ == '__main__':    
+#     orchestrator = ExtractorsOrchestrator()
+#     orchestrator.start()
     
     
-    snap = Snapshot.objects.last()
-    assert snap is not None
-    created = 0
-    while True:
-        time.sleep(0.05)
-        # try:
-        #     ArchiveResult.objects.bulk_create([
-        #         ArchiveResult(
-        #             id=uuid.uuid4(),
-        #             snapshot=snap,
-        #             status='failed',
-        #             extractor='favicon',
-        #             cmd=['echo', '"hello"'],
-        #             cmd_version='1.0',
-        #             pwd='.',
-        #             start_ts=timezone.now(),
-        #             end_ts=timezone.now(),
-        #             created_at=timezone.now(),
-        #             modified_at=timezone.now(),
-        #             created_by_id=1,
-        #         )
-        #         for _ in range(100)
-        #     ])
-        #     created += 100
-        #     if created % 1000 == 0:
-        #         print(f'[blue]Created {created} ArchiveResults...[/blue]')
-        #         time.sleep(25)
-        # except Exception as err:
-        #     print(err)
-        #     db.connections.close_all()
-        # except BaseException as err:
-        #     print(err)
-        #     break
+#     snap = Snapshot.objects.last()
+#     assert snap is not None
+#     created = 0
+#     while True:
+#         time.sleep(0.05)
+#         # try:
+#         #     ArchiveResult.objects.bulk_create([
+#         #         ArchiveResult(
+#         #             id=uuid.uuid4(),
+#         #             snapshot=snap,
+#         #             status='failed',
+#         #             extractor='favicon',
+#         #             cmd=['echo', '"hello"'],
+#         #             cmd_version='1.0',
+#         #             pwd='.',
+#         #             start_ts=timezone.now(),
+#         #             end_ts=timezone.now(),
+#         #             created_at=timezone.now(),
+#         #             modified_at=timezone.now(),
+#         #             created_by_id=1,
+#         #         )
+#         #         for _ in range(100)
+#         #     ])
+#         #     created += 100
+#         #     if created % 1000 == 0:
+#         #         print(f'[blue]Created {created} ArchiveResults...[/blue]')
+#         #         time.sleep(25)
+#         # except Exception as err:
+#         #     print(err)
+#         #     db.connections.close_all()
+#         # except BaseException as err:
+#         #     print(err)
+#         #     break