worker.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457
  1. __package__ = 'archivebox.workers'
  2. import os
  3. import sys
  4. import time
  5. import uuid
  6. import json
  7. from typing import ClassVar, Iterable, Type
  8. from pathlib import Path
  9. from rich import print
  10. from django.db import transaction
  11. from django.db.models import QuerySet
  12. from django.utils import timezone
  13. from django.utils.functional import classproperty # type: ignore
  14. from crawls.models import Crawl
  15. from core.models import Snapshot, ArchiveResult
  16. from workers.models import Event, Process, EventDict
  17. class WorkerType:
  18. # static class attributes
  19. name: ClassVar[str] # e.g. 'log' or 'filesystem' or 'crawl' or 'snapshot' or 'archiveresult' etc.
  20. listens_to: ClassVar[str] # e.g. 'LOG_' or 'FS_' or 'CRAWL_' or 'SNAPSHOT_' or 'ARCHIVERESULT_' etc.
  21. outputs: ClassVar[list[str]] # e.g. ['LOG_', 'FS_', 'CRAWL_', 'SNAPSHOT_', 'ARCHIVERESULT_'] etc.
  22. poll_interval: ClassVar[int] = 1 # how long to wait before polling for new events
  23. @classproperty
  24. def event_queue(cls) -> QuerySet[Event]:
  25. return Event.objects.filter(name__startswith=cls.listens_to)
  26. @classmethod
  27. def fork(cls, wait_for_first_event=False, exit_on_idle=True) -> Process:
  28. cmd = ['archivebox', 'worker', cls.name]
  29. if exit_on_idle:
  30. cmd.append('--exit-on-idle')
  31. if wait_for_first_event:
  32. cmd.append('--wait-for-first-event')
  33. return Process.create_and_fork(cmd=cmd, actor_type=cls.name)
  34. @classproperty
  35. def processes(cls) -> QuerySet[Process]:
  36. return Process.objects.filter(actor_type=cls.name)
  37. @classmethod
  38. def run(cls, wait_for_first_event=False, exit_on_idle=True):
  39. if wait_for_first_event:
  40. event = cls.event_queue.get_next_unclaimed()
  41. while not event:
  42. time.sleep(cls.poll_interval)
  43. event = cls.event_queue.get_next_unclaimed()
  44. while True:
  45. output_events = list(cls.process_next_event()) or list(cls.process_idle_tick()) # process next event, or tick if idle
  46. yield from output_events
  47. if not output_events:
  48. if exit_on_idle:
  49. break
  50. else:
  51. time.sleep(cls.poll_interval)
  52. @classmethod
  53. def process_next_event(cls) -> Iterable[EventDict]:
  54. event = cls.event_queue.get_next_unclaimed()
  55. output_events = []
  56. if not event:
  57. return []
  58. cls.mark_event_claimed(event)
  59. print(f'{cls.__name__}[{Process.current().pid}] {event}', file=sys.stderr)
  60. try:
  61. for output_event in cls.receive(event):
  62. output_events.append(output_event)
  63. yield output_event
  64. cls.mark_event_succeeded(event, output_events=output_events)
  65. except BaseException as e:
  66. cls.mark_event_failed(event, output_events=output_events, error=e)
  67. @classmethod
  68. def process_idle_tick(cls) -> Iterable[EventDict]:
  69. # reset the idle event to be claimed by the current process
  70. event, _created = Event.objects.update_or_create(
  71. name=f'{cls.listens_to}IDLE',
  72. emitted_by=Process.current(),
  73. defaults={
  74. 'deliver_at': timezone.now(),
  75. 'claimed_proc': None,
  76. 'claimed_at': None,
  77. 'finished_at': None,
  78. 'error': None,
  79. 'parent': None,
  80. },
  81. )
  82. # then process it like any other event
  83. yield from cls.process_next_event()
  84. @classmethod
  85. def receive(cls, event: Event) -> Iterable[EventDict]:
  86. handler_method = getattr(cls, f'on_{event.name}', None)
  87. if handler_method:
  88. yield from handler_method(event)
  89. else:
  90. raise Exception(f'No handler method for event: {event.name}')
  91. @staticmethod
  92. def on_IDLE() -> Iterable[EventDict]:
  93. return []
  94. @staticmethod
  95. def mark_event_claimed(event: Event):
  96. proc = Process.current()
  97. with transaction.atomic():
  98. claimed = Event.objects.filter(id=event.id, claimed_proc=None, claimed_at=None).update(claimed_proc=proc, claimed_at=timezone.now())
  99. event.refresh_from_db()
  100. if not claimed:
  101. raise Exception(f'Event already claimed by another process: {event.claimed_proc}')
  102. print(f'{self}.mark_event_claimed(): Claimed {event} ⛏️')
  103. # process_updated = Process.objects.filter(id=proc.id, active_event=None).update(active_event=event)
  104. # if not process_updated:
  105. # raise Exception(f'Unable to update process.active_event: {proc}.active_event = {event}')
  106. @staticmethod
  107. def mark_event_succeeded(event: Event, output_events: Iterable[EventDict]):
  108. event.refresh_from_db()
  109. assert event.claimed_proc, f'Cannot mark event as succeeded if it is not claimed by a process: {event}'
  110. assert (event.claimed_proc == Process.current()), f'Cannot mark event as succeeded if it claimed by a different process: {event}.claimed_proc = {event.claimed_proc}, current_process = {Process.current()}'
  111. with transaction.atomic():
  112. updated = Event.objects.filter(id=event.id, claimed_proc=event.claimed_proc, claimed_at=event.claimed_at, finished_at=None).update(finished_at=timezone.now())
  113. event.refresh_from_db()
  114. if not updated:
  115. raise Exception(f'Event {event} failed to mark as succeeded, it was modified by another process: {event.claimed_proc}')
  116. # process_updated = Process.objects.filter(id=event.claimed_proc.id, active_event=event).update(active_event=None)
  117. # if not process_updated:
  118. # raise Exception(f'Unable to unset process.active_event: {event.claimed_proc}.active_event = {event}')
  119. # dispatch any output events
  120. for output_event in output_events:
  121. Event.dispatch(event=output_event, parent=event)
  122. # trigger any callback events
  123. if event.on_success:
  124. Event.dispatch(event=event.on_success, parent=event)
  125. @staticmethod
  126. def mark_event_failed(event: Event, output_events: Iterable[EventDict]=(), error: BaseException | None = None):
  127. event.refresh_from_db()
  128. assert event.claimed_proc, f'Cannot mark event as failed if it is not claimed by a process: {event}'
  129. assert (event.claimed_proc == Process.current()), f'Cannot mark event as failed if it claimed by a different process: {event}.claimed_proc = {event.claimed_proc}, current_process = {Process.current()}'
  130. with transaction.atomic():
  131. updated = Event.objects.filter(id=event.id, claimed_proc=event.claimed_proc, claimed_at=event.claimed_at, finished_at=None).update(finished_at=timezone.now(), error=str(error))
  132. event.refresh_from_db()
  133. if not updated:
  134. raise Exception(f'Event {event} failed to mark as failed, it was modified by another process: {event.claimed_proc}')
  135. # process_updated = Process.objects.filter(id=event.claimed_proc.id, active_event=event).update(active_event=None)
  136. # if not process_updated:
  137. # raise Exception(f'Unable to unset process.active_event: {event.claimed_proc}.active_event = {event}')
  138. # add dedicated error event to the output events
  139. if not event.name.endswith('_ERROR'):
  140. output_events = [
  141. *output_events,
  142. {'name': f'{event.name}_ERROR', 'msg': f'{type(error).__name__}: {error}'},
  143. ]
  144. # dispatch any output events
  145. for output_event in output_events:
  146. Event.dispatch(event=output_event, parent=event)
  147. # trigger any callback events
  148. if event.on_failure:
  149. Event.dispatch(event=event.on_failure, parent=event)
  150. class OrchestratorWorker(WorkerType):
  151. name = 'orchestrator'
  152. listens_to = 'PROC_'
  153. outputs = ['PROC_']
  154. @staticmethod
  155. def on_PROC_IDLE() -> Iterable[EventDict]:
  156. # look through all Processes that are not yet launched and launch them
  157. to_launch = Process.objects.filter(launched_at=None).order_by('created_at').first()
  158. if not to_launch:
  159. return []
  160. yield {'name': 'PROC_LAUNCH', 'id': to_launch.id}
  161. @staticmethod
  162. def on_PROC_LAUNCH(event: Event) -> Iterable[EventDict]:
  163. process = Process.create_and_fork(**event.kwargs)
  164. yield {'name': 'PROC_LAUNCHED', 'process_id': process.id}
  165. @staticmethod
  166. def on_PROC_EXIT(event: Event) -> Iterable[EventDict]:
  167. process = Process.objects.get(id=event.process_id)
  168. process.kill()
  169. yield {'name': 'PROC_KILLED', 'process_id': process.id}
  170. @staticmethod
  171. def on_PROC_KILL(event: Event) -> Iterable[EventDict]:
  172. process = Process.objects.get(id=event.process_id)
  173. process.kill()
  174. yield {'name': 'PROC_KILLED', 'process_id': process.id}
  175. class FileSystemWorker(WorkerType):
  176. name = 'filesystem'
  177. listens_to = 'FS_'
  178. outputs = ['FS_']
  179. @staticmethod
  180. def on_FS_IDLE(event: Event) -> Iterable[EventDict]:
  181. # check for tmp files that can be deleted
  182. for tmp_file in Path('/tmp').glob('archivebox/*'):
  183. yield {'name': 'FS_DELETE', 'path': str(tmp_file)}
  184. @staticmethod
  185. def on_FS_WRITE(event: Event) -> Iterable[EventDict]:
  186. with open(event.path, 'w') as f:
  187. f.write(event.content)
  188. yield {'name': 'FS_CHANGED', 'path': event.path}
  189. @staticmethod
  190. def on_FS_APPEND(event: Event) -> Iterable[EventDict]:
  191. with open(event.path, 'a') as f:
  192. f.write(event.content)
  193. yield {'name': 'FS_CHANGED', 'path': event.path}
  194. @staticmethod
  195. def on_FS_DELETE(event: Event) -> Iterable[EventDict]:
  196. os.remove(event.path)
  197. yield {'name': 'FS_CHANGED', 'path': event.path}
  198. @staticmethod
  199. def on_FS_RSYNC(event: Event) -> Iterable[EventDict]:
  200. os.system(f'rsync -av {event.src} {event.dst}')
  201. yield {'name': 'FS_CHANGED', 'path': event.dst}
  202. class CrawlWorker(WorkerType):
  203. name = 'crawl'
  204. listens_to = 'CRAWL_'
  205. outputs = ['CRAWL_', 'FS_', 'SNAPSHOT_']
  206. @staticmethod
  207. def on_CRAWL_IDLE(event: Event) -> Iterable[EventDict]:
  208. # check for any stale crawls that can be started or sealed
  209. stale_crawl = Crawl.objects.filter(retry_at__lt=timezone.now()).first()
  210. if not stale_crawl:
  211. return []
  212. if stale_crawl.can_start():
  213. yield {'name': 'CRAWL_START', 'id': stale_crawl.id}
  214. elif stale_crawl.can_seal():
  215. yield {'name': 'CRAWL_SEAL', 'id': stale_crawl.id}
  216. @staticmethod
  217. def on_CRAWL_CREATE(event: Event) -> Iterable[EventDict]:
  218. crawl, created = Crawl.objects.get_or_create(id=event.id, defaults=event)
  219. if created:
  220. yield {'name': 'CRAWL_UPDATED', 'crawl_id': crawl.id}
  221. @staticmethod
  222. def on_CRAWL_UPDATE(event: Event) -> Iterable[EventDict]:
  223. crawl = Crawl.objects.get(id=event.pop('crawl_id'))
  224. diff = {
  225. key: val
  226. for key, val in event.items()
  227. if getattr(crawl, key) != val
  228. }
  229. if diff:
  230. crawl.update(**diff)
  231. yield {'name': 'CRAWL_UPDATED', 'crawl_id': crawl.id}
  232. @staticmethod
  233. def on_CRAWL_UPDATED(event: Event) -> Iterable[EventDict]:
  234. crawl = Crawl.objects.get(id=event.crawl_id)
  235. yield {'name': 'FS_WRITE_SYMLINKS', 'path': crawl.OUTPUT_DIR, 'symlinks': crawl.output_dir_symlinks}
  236. @staticmethod
  237. def on_CRAWL_SEAL(event: Event) -> Iterable[EventDict]:
  238. crawl = Crawl.objects.filter(id=event.id, status=Crawl.StatusChoices.STARTED).first()
  239. if not crawl:
  240. return
  241. crawl.status = Crawl.StatusChoices.SEALED
  242. crawl.save()
  243. yield {'name': 'FS_WRITE', 'path': crawl.OUTPUT_DIR / 'index.json', 'content': json.dumps(crawl.as_json(), default=str, indent=4, sort_keys=True)}
  244. yield {'name': 'CRAWL_UPDATED', 'crawl_id': crawl.id}
  245. @staticmethod
  246. def on_CRAWL_START(event: Event) -> Iterable[EventDict]:
  247. # create root snapshot
  248. crawl = Crawl.objects.get(id=event.crawl_id)
  249. new_snapshot_id = uuid.uuid4()
  250. yield {'name': 'SNAPSHOT_CREATE', 'snapshot_id': new_snapshot_id, 'crawl_id': crawl.id, 'url': crawl.seed.uri}
  251. yield {'name': 'SNAPSHOT_START', 'snapshot_id': new_snapshot_id}
  252. yield {'name': 'CRAWL_UPDATE', 'crawl_id': crawl.id, 'status': 'started', 'retry_at': None}
  253. class SnapshotWorker(WorkerType):
  254. name = 'snapshot'
  255. listens_to = 'SNAPSHOT_'
  256. outputs = ['SNAPSHOT_', 'FS_']
  257. @staticmethod
  258. def on_SNAPSHOT_IDLE(event: Event) -> Iterable[EventDict]:
  259. # check for any snapshots that can be started or sealed
  260. snapshot = Snapshot.objects.exclude(status=Snapshot.StatusChoices.SEALED).first()
  261. if not snapshot:
  262. return []
  263. if snapshot.can_start():
  264. yield {'name': 'SNAPSHOT_START', 'id': snapshot.id}
  265. elif snapshot.can_seal():
  266. yield {'name': 'SNAPSHOT_SEAL', 'id': snapshot.id}
  267. @staticmethod
  268. def on_SNAPSHOT_CREATE(event: Event) -> Iterable[EventDict]:
  269. snapshot = Snapshot.objects.create(id=event.snapshot_id, **event.kwargs)
  270. yield {'name': 'FS_WRITE', 'path': snapshot.OUTPUT_DIR / 'index.json', 'content': json.dumps(snapshot.as_json(), default=str, indent=4, sort_keys=True)}
  271. yield {'name': 'SNAPSHOT_UPDATED', 'id': snapshot.id}
  272. @staticmethod
  273. def on_SNAPSHOT_SEAL(event: Event) -> Iterable[EventDict]:
  274. snapshot = Snapshot.objects.get(id=event.snapshot_id, status=Snapshot.StatusChoices.STARTED)
  275. assert snapshot.can_seal()
  276. snapshot.status = Snapshot.StatusChoices.SEALED
  277. snapshot.save()
  278. yield {'name': 'FS_WRITE', 'path': snapshot.OUTPUT_DIR / 'index.json', 'content': json.dumps(snapshot.as_json(), default=str, indent=4, sort_keys=True)}
  279. yield {'name': 'SNAPSHOT_UPDATED', 'id': snapshot.id}
  280. @staticmethod
  281. def on_SNAPSHOT_START(event: Event) -> Iterable[EventDict]:
  282. snapshot = Snapshot.objects.get(id=event.snapshot_id, status=Snapshot.StatusChoices.QUEUED)
  283. assert snapshot.can_start()
  284. # create pending archiveresults for each extractor
  285. for extractor in snapshot.get_extractors():
  286. new_archiveresult_id = uuid.uuid4()
  287. yield {'name': 'ARCHIVERESULT_CREATE', 'id': new_archiveresult_id, 'snapshot_id': snapshot.id, 'extractor': extractor.name}
  288. yield {'name': 'ARCHIVERESULT_START', 'id': new_archiveresult_id}
  289. snapshot.status = Snapshot.StatusChoices.STARTED
  290. snapshot.save()
  291. yield {'name': 'FS_WRITE', 'path': snapshot.OUTPUT_DIR / 'index.json', 'content': json.dumps(snapshot.as_json(), default=str, indent=4, sort_keys=True)}
  292. yield {'name': 'SNAPSHOT_UPDATED', 'id': snapshot.id}
  293. class ArchiveResultWorker(WorkerType):
  294. name = 'archiveresult'
  295. listens_to = 'ARCHIVERESULT_'
  296. outputs = ['ARCHIVERESULT_', 'FS_']
  297. @staticmethod
  298. def on_ARCHIVERESULT_UPDATE(event: Event) -> Iterable[EventDict]:
  299. archiveresult = ArchiveResult.objects.get(id=event.id)
  300. diff = {
  301. key: val
  302. for key, val in event.items()
  303. if getattr(archiveresult, key) != val
  304. }
  305. if diff:
  306. archiveresult.update(**diff)
  307. yield {'name': 'ARCHIVERESULT_UPDATED', 'id': archiveresult.id}
  308. @staticmethod
  309. def on_ARCHIVERESULT_UPDATED(event: Event) -> Iterable[EventDict]:
  310. archiveresult = ArchiveResult.objects.get(id=event.id)
  311. yield {'name': 'FS_WRITE_SYMLINKS', 'path': archiveresult.OUTPUT_DIR, 'symlinks': archiveresult.output_dir_symlinks}
  312. @staticmethod
  313. def on_ARCHIVERESULT_CREATE(event: Event) -> Iterable[EventDict]:
  314. archiveresult, created = ArchiveResult.objects.get_or_create(id=event.pop('archiveresult_id'), defaults=event)
  315. if created:
  316. yield {'name': 'ARCHIVERESULT_UPDATE', 'id': archiveresult.id}
  317. else:
  318. diff = {
  319. key: val
  320. for key, val in event.items()
  321. if getattr(archiveresult, key) != val
  322. }
  323. assert not diff, f'ArchiveResult {archiveresult.id} already exists and has different values, cannot create on top of it: {diff}'
  324. @staticmethod
  325. def on_ARCHIVERESULT_SEAL(event: Event) -> Iterable[EventDict]:
  326. archiveresult = ArchiveResult.objects.get(id=event.id, status=ArchiveResult.StatusChoices.STARTED)
  327. assert archiveresult.can_seal()
  328. yield {'name': 'ARCHIVERESULT_UPDATE', 'id': archiveresult.id, 'status': 'sealed'}
  329. @staticmethod
  330. def on_ARCHIVERESULT_START(event: Event) -> Iterable[EventDict]:
  331. archiveresult = ArchiveResult.objects.get(id=event.id, status=ArchiveResult.StatusChoices.QUEUED)
  332. yield {
  333. 'name': 'SHELL_EXEC',
  334. 'cmd': archiveresult.EXTRACTOR.get_cmd(),
  335. 'cwd': archiveresult.OUTPUT_DIR,
  336. 'on_exit': {
  337. 'name': 'ARCHIVERESULT_SEAL',
  338. 'id': archiveresult.id,
  339. },
  340. }
  341. archiveresult.status = ArchiveResult.StatusChoices.STARTED
  342. archiveresult.save()
  343. yield {'name': 'FS_WRITE', 'path': archiveresult.OUTPUT_DIR / 'index.json', 'content': json.dumps(archiveresult.as_json(), default=str, indent=4, sort_keys=True)}
  344. yield {'name': 'ARCHIVERESULT_UPDATED', 'id': archiveresult.id}
  345. @staticmethod
  346. def on_ARCHIVERESULT_IDLE(event: Event) -> Iterable[EventDict]:
  347. stale_archiveresult = ArchiveResult.objects.exclude(status__in=[ArchiveResult.StatusChoices.SUCCEEDED, ArchiveResult.StatusChoices.FAILED]).first()
  348. if not stale_archiveresult:
  349. return []
  350. if stale_archiveresult.can_start():
  351. yield {'name': 'ARCHIVERESULT_START', 'id': stale_archiveresult.id}
  352. if stale_archiveresult.can_seal():
  353. yield {'name': 'ARCHIVERESULT_SEAL', 'id': stale_archiveresult.id}
  354. WORKER_TYPES = [
  355. OrchestratorWorker,
  356. FileSystemWorker,
  357. CrawlWorker,
  358. SnapshotWorker,
  359. ArchiveResultWorker,
  360. ]
  361. def get_worker_type(name: str) -> Type[WorkerType]:
  362. for worker_type in WORKER_TYPES:
  363. matches_verbose_name = (worker_type.name == name)
  364. matches_class_name = (worker_type.__name__.lower() == name.lower())
  365. matches_listens_to = (worker_type.listens_to.strip('_').lower() == name.strip('_').lower())
  366. if matches_verbose_name or matches_class_name or matches_listens_to:
  367. return worker_type
  368. raise Exception(f'Worker type not found: {name}')