tasks.py 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. """
  2. Background task functions for queuing work to the orchestrator.
  3. These functions queue Snapshots/Crawls for processing by setting their status
  4. to QUEUED, which the orchestrator workers will pick up and process.
  5. NOTE: These functions do NOT start the orchestrator - they assume it's already
  6. running via `archivebox server` (supervisord) or will be run inline by the CLI.
  7. """
  8. __package__ = 'archivebox.workers'
  9. from django.utils import timezone
  10. def bg_add(add_kwargs: dict) -> int:
  11. """
  12. Add URLs and queue them for archiving.
  13. Returns the number of snapshots created.
  14. """
  15. from archivebox.cli.archivebox_add import add
  16. assert add_kwargs and add_kwargs.get("urls")
  17. # When called as background task, always run in background mode
  18. add_kwargs = add_kwargs.copy()
  19. add_kwargs['bg'] = True
  20. result = add(**add_kwargs)
  21. return len(result) if result else 0
  22. def bg_archive_snapshots(snapshots, kwargs: dict | None = None) -> int:
  23. """
  24. Queue multiple snapshots for archiving via the state machine system.
  25. This sets snapshots to 'queued' status so the orchestrator workers pick them up.
  26. The actual archiving happens through the worker's process_item() method.
  27. Returns the number of snapshots queued.
  28. """
  29. from archivebox.core.models import Snapshot
  30. kwargs = kwargs or {}
  31. # Queue snapshots by setting status to queued with immediate retry_at
  32. queued_count = 0
  33. for snapshot in snapshots:
  34. if hasattr(snapshot, 'id'):
  35. # Update snapshot to queued state so workers pick it up
  36. Snapshot.objects.filter(id=snapshot.id).update(
  37. status=Snapshot.StatusChoices.QUEUED,
  38. retry_at=timezone.now(),
  39. )
  40. queued_count += 1
  41. return queued_count
  42. def bg_archive_snapshot(snapshot, overwrite: bool = False, methods: list | None = None) -> int:
  43. """
  44. Queue a single snapshot for archiving via the state machine system.
  45. This sets the snapshot to 'queued' status so the orchestrator workers pick it up.
  46. The actual archiving happens through the worker's process_item() method.
  47. Returns 1 if queued, 0 otherwise.
  48. """
  49. from archivebox.core.models import Snapshot
  50. # Queue the snapshot by setting status to queued
  51. if hasattr(snapshot, 'id'):
  52. Snapshot.objects.filter(id=snapshot.id).update(
  53. status=Snapshot.StatusChoices.QUEUED,
  54. retry_at=timezone.now(),
  55. )
  56. return 1
  57. return 0