| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457 |
- __package__ = 'archivebox.workers'
- import os
- import sys
- import time
- import uuid
- import json
- from typing import ClassVar, Iterable, Type
- from pathlib import Path
- from rich import print
- from django.db import transaction
- from django.db.models import QuerySet
- from django.utils import timezone
- from django.utils.functional import classproperty # type: ignore
- from crawls.models import Crawl
- from core.models import Snapshot, ArchiveResult
- from workers.models import Event, Process, EventDict
- class WorkerType:
- # static class attributes
- name: ClassVar[str] # e.g. 'log' or 'filesystem' or 'crawl' or 'snapshot' or 'archiveresult' etc.
-
- listens_to: ClassVar[str] # e.g. 'LOG_' or 'FS_' or 'CRAWL_' or 'SNAPSHOT_' or 'ARCHIVERESULT_' etc.
- outputs: ClassVar[list[str]] # e.g. ['LOG_', 'FS_', 'CRAWL_', 'SNAPSHOT_', 'ARCHIVERESULT_'] etc.
-
- poll_interval: ClassVar[int] = 1 # how long to wait before polling for new events
-
- @classproperty
- def event_queue(cls) -> QuerySet[Event]:
- return Event.objects.filter(name__startswith=cls.listens_to)
- @classmethod
- def fork(cls, wait_for_first_event=False, exit_on_idle=True) -> Process:
- cmd = ['archivebox', 'worker', cls.name]
- if exit_on_idle:
- cmd.append('--exit-on-idle')
- if wait_for_first_event:
- cmd.append('--wait-for-first-event')
- return Process.create_and_fork(cmd=cmd, actor_type=cls.name)
- @classproperty
- def processes(cls) -> QuerySet[Process]:
- return Process.objects.filter(actor_type=cls.name)
- @classmethod
- def run(cls, wait_for_first_event=False, exit_on_idle=True):
- if wait_for_first_event:
- event = cls.event_queue.get_next_unclaimed()
- while not event:
- time.sleep(cls.poll_interval)
- event = cls.event_queue.get_next_unclaimed()
- while True:
- output_events = list(cls.process_next_event()) or list(cls.process_idle_tick()) # process next event, or tick if idle
- yield from output_events
- if not output_events:
- if exit_on_idle:
- break
- else:
- time.sleep(cls.poll_interval)
- @classmethod
- def process_next_event(cls) -> Iterable[EventDict]:
- event = cls.event_queue.get_next_unclaimed()
- output_events = []
-
- if not event:
- return []
-
- cls.mark_event_claimed(event)
- print(f'{cls.__name__}[{Process.current().pid}] {event}', file=sys.stderr)
- try:
- for output_event in cls.receive(event):
- output_events.append(output_event)
- yield output_event
- cls.mark_event_succeeded(event, output_events=output_events)
- except BaseException as e:
- cls.mark_event_failed(event, output_events=output_events, error=e)
- @classmethod
- def process_idle_tick(cls) -> Iterable[EventDict]:
- # reset the idle event to be claimed by the current process
- event, _created = Event.objects.update_or_create(
- name=f'{cls.listens_to}IDLE',
- emitted_by=Process.current(),
- defaults={
- 'deliver_at': timezone.now(),
- 'claimed_proc': None,
- 'claimed_at': None,
- 'finished_at': None,
- 'error': None,
- 'parent': None,
- },
- )
-
- # then process it like any other event
- yield from cls.process_next_event()
- @classmethod
- def receive(cls, event: Event) -> Iterable[EventDict]:
- handler_method = getattr(cls, f'on_{event.name}', None)
- if handler_method:
- yield from handler_method(event)
- else:
- raise Exception(f'No handler method for event: {event.name}')
- @staticmethod
- def on_IDLE() -> Iterable[EventDict]:
- return []
-
- @staticmethod
- def mark_event_claimed(event: Event):
- proc = Process.current()
-
- with transaction.atomic():
- claimed = Event.objects.filter(id=event.id, claimed_proc=None, claimed_at=None).update(claimed_proc=proc, claimed_at=timezone.now())
- event.refresh_from_db()
- if not claimed:
- raise Exception(f'Event already claimed by another process: {event.claimed_proc}')
-
- print(f'{self}.mark_event_claimed(): Claimed {event} ⛏️')
-
- # process_updated = Process.objects.filter(id=proc.id, active_event=None).update(active_event=event)
- # if not process_updated:
- # raise Exception(f'Unable to update process.active_event: {proc}.active_event = {event}')
- @staticmethod
- def mark_event_succeeded(event: Event, output_events: Iterable[EventDict]):
- event.refresh_from_db()
- assert event.claimed_proc, f'Cannot mark event as succeeded if it is not claimed by a process: {event}'
- assert (event.claimed_proc == Process.current()), f'Cannot mark event as succeeded if it claimed by a different process: {event}.claimed_proc = {event.claimed_proc}, current_process = {Process.current()}'
-
- with transaction.atomic():
- updated = Event.objects.filter(id=event.id, claimed_proc=event.claimed_proc, claimed_at=event.claimed_at, finished_at=None).update(finished_at=timezone.now())
- event.refresh_from_db()
- if not updated:
- raise Exception(f'Event {event} failed to mark as succeeded, it was modified by another process: {event.claimed_proc}')
- # process_updated = Process.objects.filter(id=event.claimed_proc.id, active_event=event).update(active_event=None)
- # if not process_updated:
- # raise Exception(f'Unable to unset process.active_event: {event.claimed_proc}.active_event = {event}')
- # dispatch any output events
- for output_event in output_events:
- Event.dispatch(event=output_event, parent=event)
- # trigger any callback events
- if event.on_success:
- Event.dispatch(event=event.on_success, parent=event)
- @staticmethod
- def mark_event_failed(event: Event, output_events: Iterable[EventDict]=(), error: BaseException | None = None):
- event.refresh_from_db()
- assert event.claimed_proc, f'Cannot mark event as failed if it is not claimed by a process: {event}'
- assert (event.claimed_proc == Process.current()), f'Cannot mark event as failed if it claimed by a different process: {event}.claimed_proc = {event.claimed_proc}, current_process = {Process.current()}'
-
- with transaction.atomic():
- updated = Event.objects.filter(id=event.id, claimed_proc=event.claimed_proc, claimed_at=event.claimed_at, finished_at=None).update(finished_at=timezone.now(), error=str(error))
- event.refresh_from_db()
- if not updated:
- raise Exception(f'Event {event} failed to mark as failed, it was modified by another process: {event.claimed_proc}')
- # process_updated = Process.objects.filter(id=event.claimed_proc.id, active_event=event).update(active_event=None)
- # if not process_updated:
- # raise Exception(f'Unable to unset process.active_event: {event.claimed_proc}.active_event = {event}')
-
- # add dedicated error event to the output events
- if not event.name.endswith('_ERROR'):
- output_events = [
- *output_events,
- {'name': f'{event.name}_ERROR', 'msg': f'{type(error).__name__}: {error}'},
- ]
-
- # dispatch any output events
- for output_event in output_events:
- Event.dispatch(event=output_event, parent=event)
-
- # trigger any callback events
- if event.on_failure:
- Event.dispatch(event=event.on_failure, parent=event)
- class OrchestratorWorker(WorkerType):
- name = 'orchestrator'
- listens_to = 'PROC_'
- outputs = ['PROC_']
-
- @staticmethod
- def on_PROC_IDLE() -> Iterable[EventDict]:
- # look through all Processes that are not yet launched and launch them
- to_launch = Process.objects.filter(launched_at=None).order_by('created_at').first()
- if not to_launch:
- return []
-
- yield {'name': 'PROC_LAUNCH', 'id': to_launch.id}
-
- @staticmethod
- def on_PROC_LAUNCH(event: Event) -> Iterable[EventDict]:
- process = Process.create_and_fork(**event.kwargs)
- yield {'name': 'PROC_LAUNCHED', 'process_id': process.id}
-
- @staticmethod
- def on_PROC_EXIT(event: Event) -> Iterable[EventDict]:
- process = Process.objects.get(id=event.process_id)
- process.kill()
- yield {'name': 'PROC_KILLED', 'process_id': process.id}
-
- @staticmethod
- def on_PROC_KILL(event: Event) -> Iterable[EventDict]:
- process = Process.objects.get(id=event.process_id)
- process.kill()
- yield {'name': 'PROC_KILLED', 'process_id': process.id}
- class FileSystemWorker(WorkerType):
- name = 'filesystem'
- listens_to = 'FS_'
- outputs = ['FS_']
- @staticmethod
- def on_FS_IDLE(event: Event) -> Iterable[EventDict]:
- # check for tmp files that can be deleted
- for tmp_file in Path('/tmp').glob('archivebox/*'):
- yield {'name': 'FS_DELETE', 'path': str(tmp_file)}
-
- @staticmethod
- def on_FS_WRITE(event: Event) -> Iterable[EventDict]:
- with open(event.path, 'w') as f:
- f.write(event.content)
- yield {'name': 'FS_CHANGED', 'path': event.path}
- @staticmethod
- def on_FS_APPEND(event: Event) -> Iterable[EventDict]:
- with open(event.path, 'a') as f:
- f.write(event.content)
- yield {'name': 'FS_CHANGED', 'path': event.path}
-
- @staticmethod
- def on_FS_DELETE(event: Event) -> Iterable[EventDict]:
- os.remove(event.path)
- yield {'name': 'FS_CHANGED', 'path': event.path}
-
- @staticmethod
- def on_FS_RSYNC(event: Event) -> Iterable[EventDict]:
- os.system(f'rsync -av {event.src} {event.dst}')
- yield {'name': 'FS_CHANGED', 'path': event.dst}
- class CrawlWorker(WorkerType):
- name = 'crawl'
- listens_to = 'CRAWL_'
- outputs = ['CRAWL_', 'FS_', 'SNAPSHOT_']
- @staticmethod
- def on_CRAWL_IDLE(event: Event) -> Iterable[EventDict]:
- # check for any stale crawls that can be started or sealed
- stale_crawl = Crawl.objects.filter(retry_at__lt=timezone.now()).first()
- if not stale_crawl:
- return []
- if stale_crawl.can_start():
- yield {'name': 'CRAWL_START', 'id': stale_crawl.id}
-
- elif stale_crawl.can_seal():
- yield {'name': 'CRAWL_SEAL', 'id': stale_crawl.id}
-
- @staticmethod
- def on_CRAWL_CREATE(event: Event) -> Iterable[EventDict]:
- crawl, created = Crawl.objects.get_or_create(id=event.id, defaults=event)
- if created:
- yield {'name': 'CRAWL_UPDATED', 'crawl_id': crawl.id}
-
- @staticmethod
- def on_CRAWL_UPDATE(event: Event) -> Iterable[EventDict]:
- crawl = Crawl.objects.get(id=event.pop('crawl_id'))
- diff = {
- key: val
- for key, val in event.items()
- if getattr(crawl, key) != val
- }
- if diff:
- crawl.update(**diff)
- yield {'name': 'CRAWL_UPDATED', 'crawl_id': crawl.id}
-
- @staticmethod
- def on_CRAWL_UPDATED(event: Event) -> Iterable[EventDict]:
- crawl = Crawl.objects.get(id=event.crawl_id)
- yield {'name': 'FS_WRITE_SYMLINKS', 'path': crawl.OUTPUT_DIR, 'symlinks': crawl.output_dir_symlinks}
-
- @staticmethod
- def on_CRAWL_SEAL(event: Event) -> Iterable[EventDict]:
- crawl = Crawl.objects.filter(id=event.id, status=Crawl.StatusChoices.STARTED).first()
- if not crawl:
- return
- crawl.status = Crawl.StatusChoices.SEALED
- crawl.save()
- yield {'name': 'FS_WRITE', 'path': crawl.OUTPUT_DIR / 'index.json', 'content': json.dumps(crawl.as_json(), default=str, indent=4, sort_keys=True)}
- yield {'name': 'CRAWL_UPDATED', 'crawl_id': crawl.id}
-
- @staticmethod
- def on_CRAWL_START(event: Event) -> Iterable[EventDict]:
- # create root snapshot
- crawl = Crawl.objects.get(id=event.crawl_id)
- new_snapshot_id = uuid.uuid4()
- yield {'name': 'SNAPSHOT_CREATE', 'snapshot_id': new_snapshot_id, 'crawl_id': crawl.id, 'url': crawl.seed.uri}
- yield {'name': 'SNAPSHOT_START', 'snapshot_id': new_snapshot_id}
- yield {'name': 'CRAWL_UPDATE', 'crawl_id': crawl.id, 'status': 'started', 'retry_at': None}
- class SnapshotWorker(WorkerType):
- name = 'snapshot'
- listens_to = 'SNAPSHOT_'
- outputs = ['SNAPSHOT_', 'FS_']
-
- @staticmethod
- def on_SNAPSHOT_IDLE(event: Event) -> Iterable[EventDict]:
- # check for any snapshots that can be started or sealed
- snapshot = Snapshot.objects.exclude(status=Snapshot.StatusChoices.SEALED).first()
- if not snapshot:
- return []
-
- if snapshot.can_start():
- yield {'name': 'SNAPSHOT_START', 'id': snapshot.id}
- elif snapshot.can_seal():
- yield {'name': 'SNAPSHOT_SEAL', 'id': snapshot.id}
-
- @staticmethod
- def on_SNAPSHOT_CREATE(event: Event) -> Iterable[EventDict]:
- snapshot = Snapshot.objects.create(id=event.snapshot_id, **event.kwargs)
- yield {'name': 'FS_WRITE', 'path': snapshot.OUTPUT_DIR / 'index.json', 'content': json.dumps(snapshot.as_json(), default=str, indent=4, sort_keys=True)}
- yield {'name': 'SNAPSHOT_UPDATED', 'id': snapshot.id}
-
- @staticmethod
- def on_SNAPSHOT_SEAL(event: Event) -> Iterable[EventDict]:
- snapshot = Snapshot.objects.get(id=event.snapshot_id, status=Snapshot.StatusChoices.STARTED)
- assert snapshot.can_seal()
- snapshot.status = Snapshot.StatusChoices.SEALED
- snapshot.save()
- yield {'name': 'FS_WRITE', 'path': snapshot.OUTPUT_DIR / 'index.json', 'content': json.dumps(snapshot.as_json(), default=str, indent=4, sort_keys=True)}
- yield {'name': 'SNAPSHOT_UPDATED', 'id': snapshot.id}
- @staticmethod
- def on_SNAPSHOT_START(event: Event) -> Iterable[EventDict]:
- snapshot = Snapshot.objects.get(id=event.snapshot_id, status=Snapshot.StatusChoices.QUEUED)
- assert snapshot.can_start()
-
- # create pending archiveresults for each extractor
- for extractor in snapshot.get_extractors():
- new_archiveresult_id = uuid.uuid4()
- yield {'name': 'ARCHIVERESULT_CREATE', 'id': new_archiveresult_id, 'snapshot_id': snapshot.id, 'extractor': extractor.name}
- yield {'name': 'ARCHIVERESULT_START', 'id': new_archiveresult_id}
-
- snapshot.status = Snapshot.StatusChoices.STARTED
- snapshot.save()
- yield {'name': 'FS_WRITE', 'path': snapshot.OUTPUT_DIR / 'index.json', 'content': json.dumps(snapshot.as_json(), default=str, indent=4, sort_keys=True)}
- yield {'name': 'SNAPSHOT_UPDATED', 'id': snapshot.id}
-
-
- class ArchiveResultWorker(WorkerType):
- name = 'archiveresult'
- listens_to = 'ARCHIVERESULT_'
- outputs = ['ARCHIVERESULT_', 'FS_']
- @staticmethod
- def on_ARCHIVERESULT_UPDATE(event: Event) -> Iterable[EventDict]:
- archiveresult = ArchiveResult.objects.get(id=event.id)
- diff = {
- key: val
- for key, val in event.items()
- if getattr(archiveresult, key) != val
- }
- if diff:
- archiveresult.update(**diff)
- yield {'name': 'ARCHIVERESULT_UPDATED', 'id': archiveresult.id}
-
- @staticmethod
- def on_ARCHIVERESULT_UPDATED(event: Event) -> Iterable[EventDict]:
- archiveresult = ArchiveResult.objects.get(id=event.id)
- yield {'name': 'FS_WRITE_SYMLINKS', 'path': archiveresult.OUTPUT_DIR, 'symlinks': archiveresult.output_dir_symlinks}
-
- @staticmethod
- def on_ARCHIVERESULT_CREATE(event: Event) -> Iterable[EventDict]:
- archiveresult, created = ArchiveResult.objects.get_or_create(id=event.pop('archiveresult_id'), defaults=event)
- if created:
- yield {'name': 'ARCHIVERESULT_UPDATE', 'id': archiveresult.id}
- else:
- diff = {
- key: val
- for key, val in event.items()
- if getattr(archiveresult, key) != val
- }
- assert not diff, f'ArchiveResult {archiveresult.id} already exists and has different values, cannot create on top of it: {diff}'
-
- @staticmethod
- def on_ARCHIVERESULT_SEAL(event: Event) -> Iterable[EventDict]:
- archiveresult = ArchiveResult.objects.get(id=event.id, status=ArchiveResult.StatusChoices.STARTED)
- assert archiveresult.can_seal()
- yield {'name': 'ARCHIVERESULT_UPDATE', 'id': archiveresult.id, 'status': 'sealed'}
- @staticmethod
- def on_ARCHIVERESULT_START(event: Event) -> Iterable[EventDict]:
- archiveresult = ArchiveResult.objects.get(id=event.id, status=ArchiveResult.StatusChoices.QUEUED)
- yield {
- 'name': 'SHELL_EXEC',
- 'cmd': archiveresult.EXTRACTOR.get_cmd(),
- 'cwd': archiveresult.OUTPUT_DIR,
- 'on_exit': {
- 'name': 'ARCHIVERESULT_SEAL',
- 'id': archiveresult.id,
- },
- }
-
- archiveresult.status = ArchiveResult.StatusChoices.STARTED
- archiveresult.save()
- yield {'name': 'FS_WRITE', 'path': archiveresult.OUTPUT_DIR / 'index.json', 'content': json.dumps(archiveresult.as_json(), default=str, indent=4, sort_keys=True)}
- yield {'name': 'ARCHIVERESULT_UPDATED', 'id': archiveresult.id}
-
- @staticmethod
- def on_ARCHIVERESULT_IDLE(event: Event) -> Iterable[EventDict]:
- stale_archiveresult = ArchiveResult.objects.exclude(status__in=[ArchiveResult.StatusChoices.SUCCEEDED, ArchiveResult.StatusChoices.FAILED]).first()
- if not stale_archiveresult:
- return []
- if stale_archiveresult.can_start():
- yield {'name': 'ARCHIVERESULT_START', 'id': stale_archiveresult.id}
- if stale_archiveresult.can_seal():
- yield {'name': 'ARCHIVERESULT_SEAL', 'id': stale_archiveresult.id}
- WORKER_TYPES = [
- OrchestratorWorker,
- FileSystemWorker,
- CrawlWorker,
- SnapshotWorker,
- ArchiveResultWorker,
- ]
- def get_worker_type(name: str) -> Type[WorkerType]:
- for worker_type in WORKER_TYPES:
- matches_verbose_name = (worker_type.name == name)
- matches_class_name = (worker_type.__name__.lower() == name.lower())
- matches_listens_to = (worker_type.listens_to.strip('_').lower() == name.strip('_').lower())
- if matches_verbose_name or matches_class_name or matches_listens_to:
- return worker_type
- raise Exception(f'Worker type not found: {name}')
|