| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160 |
- __package__ = 'archivebox.api'
- from uuid import UUID
- from typing import List, Any
- from datetime import datetime
- from ninja import Router, Schema
- router = Router(tags=['Workers and Tasks'])
- class QueueItemSchema(Schema):
- """Schema for a single item in a worker's queue."""
- TYPE: str
- id: UUID
- status: str
- retry_at: datetime | None
- created_at: datetime
- modified_at: datetime
- description: str
- @staticmethod
- def resolve_TYPE(obj) -> str:
- return f'{obj._meta.app_label}.{obj._meta.model_name}'
- @staticmethod
- def resolve_description(obj) -> str:
- return str(obj)
- class WorkerSchema(Schema):
- """Schema for a Worker type."""
- name: str
- model: str
- max_tick_time: int
- max_concurrent_tasks: int
- poll_interval: float
- idle_timeout: int
- running_count: int
- running_workers: List[dict[str, Any]]
- queue_count: int
- queue: List[QueueItemSchema]
- @staticmethod
- def resolve_model(obj) -> str:
- Model = obj.get_model()
- return f'{Model._meta.app_label}.{Model._meta.model_name}'
- @staticmethod
- def resolve_max_tick_time(obj) -> int:
- return obj.MAX_TICK_TIME
- @staticmethod
- def resolve_max_concurrent_tasks(obj) -> int:
- return obj.MAX_CONCURRENT_TASKS
- @staticmethod
- def resolve_poll_interval(obj) -> float:
- return obj.POLL_INTERVAL
- @staticmethod
- def resolve_idle_timeout(obj) -> int:
- return obj.IDLE_TIMEOUT
- @staticmethod
- def resolve_running_count(obj) -> int:
- return len(obj.get_running_workers())
- @staticmethod
- def resolve_running_workers(obj) -> List[dict[str, Any]]:
- return obj.get_running_workers()
- @staticmethod
- def resolve_queue_count(obj) -> int:
- return obj.get_queue().count()
- @staticmethod
- def resolve_queue(obj) -> List[QueueItemSchema]:
- return list(obj.get_queue()[:50]) # Limit to 50 items
- class OrchestratorSchema(Schema):
- """Schema for the Orchestrator."""
- is_running: bool
- poll_interval: float
- idle_timeout: int
- max_workers_per_type: int
- max_total_workers: int
- total_worker_count: int
- workers: List[WorkerSchema]
- @router.get("/orchestrator", response=OrchestratorSchema, url_name="get_orchestrator")
- def get_orchestrator(request):
- """Get the orchestrator status and all worker queues."""
- from archivebox.workers.orchestrator import Orchestrator
- from archivebox.workers.worker import CrawlWorker, SnapshotWorker, ArchiveResultWorker
- orchestrator = Orchestrator()
- # Create temporary worker instances to query their queues
- workers = [
- CrawlWorker(worker_id=-1),
- SnapshotWorker(worker_id=-1),
- ArchiveResultWorker(worker_id=-1),
- ]
- return {
- 'is_running': orchestrator.is_running(),
- 'poll_interval': orchestrator.POLL_INTERVAL,
- 'idle_timeout': orchestrator.IDLE_TIMEOUT,
- 'max_workers_per_type': orchestrator.MAX_WORKERS_PER_TYPE,
- 'max_total_workers': orchestrator.MAX_TOTAL_WORKERS,
- 'total_worker_count': orchestrator.get_total_worker_count(),
- 'workers': workers,
- }
- @router.get("/workers", response=List[WorkerSchema], url_name="get_workers")
- def get_workers(request):
- """List all worker types and their current status."""
- from archivebox.workers.worker import CrawlWorker, SnapshotWorker, ArchiveResultWorker
- # Create temporary instances to query their queues
- return [
- CrawlWorker(worker_id=-1),
- SnapshotWorker(worker_id=-1),
- ArchiveResultWorker(worker_id=-1),
- ]
- @router.get("/worker/{worker_name}", response=WorkerSchema, url_name="get_worker")
- def get_worker(request, worker_name: str):
- """Get status and queue for a specific worker type."""
- from archivebox.workers.worker import WORKER_TYPES
- if worker_name not in WORKER_TYPES:
- from ninja.errors import HttpError
- raise HttpError(404, f"Unknown worker type: {worker_name}. Valid types: {list(WORKER_TYPES.keys())}")
- WorkerClass = WORKER_TYPES[worker_name]
- return WorkerClass(worker_id=-1)
- @router.get("/worker/{worker_name}/queue", response=List[QueueItemSchema], url_name="get_worker_queue")
- def get_worker_queue(request, worker_name: str, limit: int = 100):
- """Get the current queue for a specific worker type."""
- from archivebox.workers.worker import WORKER_TYPES
- if worker_name not in WORKER_TYPES:
- from ninja.errors import HttpError
- raise HttpError(404, f"Unknown worker type: {worker_name}. Valid types: {list(WORKER_TYPES.keys())}")
- WorkerClass = WORKER_TYPES[worker_name]
- worker = WorkerClass(worker_id=-1)
- return list(worker.get_queue()[:limit])
- # Progress endpoint moved to core.views.live_progress_view for simplicity
|