v1_workers.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. __package__ = 'archivebox.api'
  2. from uuid import UUID
  3. from typing import List, Any
  4. from datetime import datetime
  5. from ninja import Router, Schema
  6. router = Router(tags=['Workers and Tasks'])
  7. class QueueItemSchema(Schema):
  8. """Schema for a single item in a worker's queue."""
  9. TYPE: str
  10. id: UUID
  11. status: str
  12. retry_at: datetime | None
  13. created_at: datetime
  14. modified_at: datetime
  15. description: str
  16. @staticmethod
  17. def resolve_TYPE(obj) -> str:
  18. return f'{obj._meta.app_label}.{obj._meta.model_name}'
  19. @staticmethod
  20. def resolve_description(obj) -> str:
  21. return str(obj)
  22. class WorkerSchema(Schema):
  23. """Schema for a Worker type."""
  24. name: str
  25. model: str
  26. max_tick_time: int
  27. max_concurrent_tasks: int
  28. poll_interval: float
  29. idle_timeout: int
  30. running_count: int
  31. running_workers: List[dict[str, Any]]
  32. queue_count: int
  33. queue: List[QueueItemSchema]
  34. @staticmethod
  35. def resolve_model(obj) -> str:
  36. Model = obj.get_model()
  37. return f'{Model._meta.app_label}.{Model._meta.model_name}'
  38. @staticmethod
  39. def resolve_max_tick_time(obj) -> int:
  40. return obj.MAX_TICK_TIME
  41. @staticmethod
  42. def resolve_max_concurrent_tasks(obj) -> int:
  43. return obj.MAX_CONCURRENT_TASKS
  44. @staticmethod
  45. def resolve_poll_interval(obj) -> float:
  46. return obj.POLL_INTERVAL
  47. @staticmethod
  48. def resolve_idle_timeout(obj) -> int:
  49. return obj.IDLE_TIMEOUT
  50. @staticmethod
  51. def resolve_running_count(obj) -> int:
  52. return len(obj.get_running_workers())
  53. @staticmethod
  54. def resolve_running_workers(obj) -> List[dict[str, Any]]:
  55. return obj.get_running_workers()
  56. @staticmethod
  57. def resolve_queue_count(obj) -> int:
  58. return obj.get_queue().count()
  59. @staticmethod
  60. def resolve_queue(obj) -> List[QueueItemSchema]:
  61. return list(obj.get_queue()[:50]) # Limit to 50 items
  62. class OrchestratorSchema(Schema):
  63. """Schema for the Orchestrator."""
  64. is_running: bool
  65. poll_interval: float
  66. idle_timeout: int
  67. max_workers_per_type: int
  68. max_total_workers: int
  69. total_worker_count: int
  70. workers: List[WorkerSchema]
  71. @router.get("/orchestrator", response=OrchestratorSchema, url_name="get_orchestrator")
  72. def get_orchestrator(request):
  73. """Get the orchestrator status and all worker queues."""
  74. from archivebox.workers.orchestrator import Orchestrator
  75. from archivebox.workers.worker import CrawlWorker, SnapshotWorker, ArchiveResultWorker
  76. orchestrator = Orchestrator()
  77. # Create temporary worker instances to query their queues
  78. workers = [
  79. CrawlWorker(worker_id=-1),
  80. SnapshotWorker(worker_id=-1),
  81. ArchiveResultWorker(worker_id=-1),
  82. ]
  83. return {
  84. 'is_running': orchestrator.is_running(),
  85. 'poll_interval': orchestrator.POLL_INTERVAL,
  86. 'idle_timeout': orchestrator.IDLE_TIMEOUT,
  87. 'max_workers_per_type': orchestrator.MAX_WORKERS_PER_TYPE,
  88. 'max_total_workers': orchestrator.MAX_TOTAL_WORKERS,
  89. 'total_worker_count': orchestrator.get_total_worker_count(),
  90. 'workers': workers,
  91. }
  92. @router.get("/workers", response=List[WorkerSchema], url_name="get_workers")
  93. def get_workers(request):
  94. """List all worker types and their current status."""
  95. from archivebox.workers.worker import CrawlWorker, SnapshotWorker, ArchiveResultWorker
  96. # Create temporary instances to query their queues
  97. return [
  98. CrawlWorker(worker_id=-1),
  99. SnapshotWorker(worker_id=-1),
  100. ArchiveResultWorker(worker_id=-1),
  101. ]
  102. @router.get("/worker/{worker_name}", response=WorkerSchema, url_name="get_worker")
  103. def get_worker(request, worker_name: str):
  104. """Get status and queue for a specific worker type."""
  105. from archivebox.workers.worker import WORKER_TYPES
  106. if worker_name not in WORKER_TYPES:
  107. from ninja.errors import HttpError
  108. raise HttpError(404, f"Unknown worker type: {worker_name}. Valid types: {list(WORKER_TYPES.keys())}")
  109. WorkerClass = WORKER_TYPES[worker_name]
  110. return WorkerClass(worker_id=-1)
  111. @router.get("/worker/{worker_name}/queue", response=List[QueueItemSchema], url_name="get_worker_queue")
  112. def get_worker_queue(request, worker_name: str, limit: int = 100):
  113. """Get the current queue for a specific worker type."""
  114. from archivebox.workers.worker import WORKER_TYPES
  115. if worker_name not in WORKER_TYPES:
  116. from ninja.errors import HttpError
  117. raise HttpError(404, f"Unknown worker type: {worker_name}. Valid types: {list(WORKER_TYPES.keys())}")
  118. WorkerClass = WORKER_TYPES[worker_name]
  119. worker = WorkerClass(worker_id=-1)
  120. return list(worker.get_queue()[:limit])
  121. # Progress endpoint moved to core.views.live_progress_view for simplicity