|
|
@@ -5,7 +5,7 @@ import sys
|
|
|
import time
|
|
|
import uuid
|
|
|
import json
|
|
|
-import unittest
|
|
|
+
|
|
|
from typing import ClassVar, Iterable, Type
|
|
|
from pathlib import Path
|
|
|
|
|
|
@@ -16,7 +16,7 @@ from django.db.models import QuerySet
|
|
|
from django.utils import timezone
|
|
|
from django.utils.functional import classproperty # type: ignore
|
|
|
|
|
|
-from crawls.models import Seed, Crawl
|
|
|
+from crawls.models import Crawl
|
|
|
from core.models import Snapshot, ArchiveResult
|
|
|
|
|
|
from workers.models import Event, Process, EventDict
|
|
|
@@ -276,16 +276,28 @@ class CrawlWorker(WorkerType):
|
|
|
|
|
|
@staticmethod
|
|
|
def on_CRAWL_CREATE(event: Event) -> Iterable[EventDict]:
|
|
|
- crawl = Crawl.objects.create(id=event.id, **event)
|
|
|
- yield {'name': 'FS_WRITE', 'path': crawl.OUTPUT_DIR / 'index.json', 'content': json.dumps(crawl.as_json(), default=str, indent=4, sort_keys=True)}
|
|
|
- yield {'name': 'CRAWL_UPDATED', 'id': crawl.id}
|
|
|
+ crawl, created = Crawl.objects.get_or_create(id=event.id, defaults=event)
|
|
|
+ if created:
|
|
|
+ yield {'name': 'CRAWL_UPDATED', 'crawl_id': crawl.id}
|
|
|
|
|
|
@staticmethod
|
|
|
def on_CRAWL_UPDATE(event: Event) -> Iterable[EventDict]:
|
|
|
- Crawl.objects.filter(id=event.id).update(**event)
|
|
|
- yield {'name': 'FS_WRITE', 'path': crawl.OUTPUT_DIR / 'index.json', 'content': json.dumps(crawl.as_json(), default=str, indent=4, sort_keys=True)}
|
|
|
- yield {'name': 'CRAWL_UPDATED', 'id': crawl.id}
|
|
|
+ crawl = Crawl.objects.get(id=event.pop('crawl_id'))
|
|
|
+ diff = {
|
|
|
+ key: val
|
|
|
+ for key, val in event.items()
|
|
|
+ if getattr(crawl, key) != val
|
|
|
+ }
|
|
|
+ if diff:
|
|
|
+ crawl.update(**diff)
|
|
|
+ yield {'name': 'CRAWL_UPDATED', 'crawl_id': crawl.id}
|
|
|
|
|
|
+ @staticmethod
|
|
|
+ def on_CRAWL_UPDATED(event: Event) -> Iterable[EventDict]:
|
|
|
+ crawl = Crawl.objects.get(id=event.crawl_id)
|
|
|
+ yield {'name': 'FS_WRITE_SYMLINKS', 'path': crawl.OUTPUT_DIR, 'symlinks': crawl.output_dir_symlinks}
|
|
|
+
|
|
|
+
|
|
|
@staticmethod
|
|
|
def on_CRAWL_SEAL(event: Event) -> Iterable[EventDict]:
|
|
|
crawl = Crawl.objects.filter(id=event.id, status=Crawl.StatusChoices.STARTED).first()
|
|
|
@@ -294,16 +306,16 @@ class CrawlWorker(WorkerType):
|
|
|
crawl.status = Crawl.StatusChoices.SEALED
|
|
|
crawl.save()
|
|
|
yield {'name': 'FS_WRITE', 'path': crawl.OUTPUT_DIR / 'index.json', 'content': json.dumps(crawl.as_json(), default=str, indent=4, sort_keys=True)}
|
|
|
- yield {'name': 'CRAWL_UPDATED', 'id': crawl.id}
|
|
|
+ yield {'name': 'CRAWL_UPDATED', 'crawl_id': crawl.id}
|
|
|
|
|
|
@staticmethod
|
|
|
def on_CRAWL_START(event: Event) -> Iterable[EventDict]:
|
|
|
# create root snapshot
|
|
|
crawl = Crawl.objects.get(id=event.crawl_id)
|
|
|
new_snapshot_id = uuid.uuid4()
|
|
|
- yield {'name': 'SNAPSHOT_CREATE', 'id': new_snapshot_id, 'crawl_id': crawl.id, 'url': crawl.seed.uri}
|
|
|
- yield {'name': 'SNAPSHOT_START', 'id': new_snapshot_id}
|
|
|
- yield {'name': 'CRAWL_UPDATE', 'id': crawl.id, 'status': 'started', 'retry_at': None}
|
|
|
+ yield {'name': 'SNAPSHOT_CREATE', 'snapshot_id': new_snapshot_id, 'crawl_id': crawl.id, 'url': crawl.seed.uri}
|
|
|
+ yield {'name': 'SNAPSHOT_START', 'snapshot_id': new_snapshot_id}
|
|
|
+ yield {'name': 'CRAWL_UPDATE', 'crawl_id': crawl.id, 'status': 'started', 'retry_at': None}
|
|
|
|
|
|
|
|
|
class SnapshotWorker(WorkerType):
|
|
|
@@ -361,26 +373,41 @@ class ArchiveResultWorker(WorkerType):
|
|
|
listens_to = 'ARCHIVERESULT_'
|
|
|
outputs = ['ARCHIVERESULT_', 'FS_']
|
|
|
|
|
|
-
|
|
|
@staticmethod
|
|
|
def on_ARCHIVERESULT_UPDATE(event: Event) -> Iterable[EventDict]:
|
|
|
- ArchiveResult.objects.filter(id=event.id).update(**event.kwargs)
|
|
|
archiveresult = ArchiveResult.objects.get(id=event.id)
|
|
|
- yield {'name': 'FS_WRITE', 'path': archiveresult.OUTPUT_DIR / f'{archiveresult.ABID}.json', 'content': json.dumps(archiveresult.as_json(), default=str, indent=4, sort_keys=True)}
|
|
|
- yield {'name': 'ARCHIVERESULT_UPDATED', 'id': archiveresult.id}
|
|
|
+ diff = {
|
|
|
+ key: val
|
|
|
+ for key, val in event.items()
|
|
|
+ if getattr(archiveresult, key) != val
|
|
|
+ }
|
|
|
+ if diff:
|
|
|
+ archiveresult.update(**diff)
|
|
|
+ yield {'name': 'ARCHIVERESULT_UPDATED', 'id': archiveresult.id}
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def on_ARCHIVERESULT_UPDATED(event: Event) -> Iterable[EventDict]:
|
|
|
+ archiveresult = ArchiveResult.objects.get(id=event.id)
|
|
|
+ yield {'name': 'FS_WRITE_SYMLINKS', 'path': archiveresult.OUTPUT_DIR, 'symlinks': archiveresult.output_dir_symlinks}
|
|
|
|
|
|
@staticmethod
|
|
|
def on_ARCHIVERESULT_CREATE(event: Event) -> Iterable[EventDict]:
|
|
|
- archiveresult = ArchiveResult.objects.create(id=event.id, **event)
|
|
|
- yield {'name': 'ARCHIVERESULT_UPDATE', 'id': archiveresult.id}
|
|
|
-
|
|
|
+ archiveresult, created = ArchiveResult.objects.get_or_create(id=event.pop('archiveresult_id'), defaults=event)
|
|
|
+ if created:
|
|
|
+ yield {'name': 'ARCHIVERESULT_UPDATE', 'id': archiveresult.id}
|
|
|
+ else:
|
|
|
+ diff = {
|
|
|
+ key: val
|
|
|
+ for key, val in event.items()
|
|
|
+ if getattr(archiveresult, key) != val
|
|
|
+ }
|
|
|
+ assert not diff, f'ArchiveResult {archiveresult.id} already exists and has different values, cannot create on top of it: {diff}'
|
|
|
+
|
|
|
@staticmethod
|
|
|
def on_ARCHIVERESULT_SEAL(event: Event) -> Iterable[EventDict]:
|
|
|
archiveresult = ArchiveResult.objects.get(id=event.id, status=ArchiveResult.StatusChoices.STARTED)
|
|
|
-
|
|
|
- yield {'name': 'ARCHIVERESULT_UPDATE', 'id': archiveresult.id, 'status': 'sealed', 'on_success': {
|
|
|
- 'name': 'FS_RSYNC', 'src': archiveresult.OUTPUT_DIR, 'dst': archiveresult.snapshot.OUTPUT_DIR, 'await_event_id': update_id,
|
|
|
- }}
|
|
|
+ assert archiveresult.can_seal()
|
|
|
+ yield {'name': 'ARCHIVERESULT_UPDATE', 'id': archiveresult.id, 'status': 'sealed'}
|
|
|
|
|
|
@staticmethod
|
|
|
def on_ARCHIVERESULT_START(event: Event) -> Iterable[EventDict]:
|