v1_workers.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. __package__ = 'archivebox.api'
  2. from uuid import UUID
  3. from typing import List, Any
  4. from datetime import datetime
  5. from ninja import Router, Schema
  6. router = Router(tags=['Workers and Tasks'])
  7. class TaskSchema(Schema):
  8. TYPE: str
  9. id: UUID
  10. abid: str
  11. description: str
  12. status: str
  13. retry_at: datetime | None
  14. created_at: datetime
  15. modified_at: datetime
  16. created_by_id: int
  17. @staticmethod
  18. def resolve_description(obj) -> str:
  19. return str(obj)
  20. class ActorSchema(Schema):
  21. # TYPE: str = 'workers.actor.ActorType'
  22. # name: str
  23. #pid: int | None
  24. idle_count: int
  25. launch_kwargs: dict[str, Any]
  26. mode: str
  27. model: str
  28. statemachine: str
  29. ACTIVE_STATE: str
  30. EVENT_NAME: str
  31. CLAIM_ORDER: list[str]
  32. CLAIM_FROM_TOP_N: int
  33. CLAIM_ATOMIC: bool
  34. MAX_TICK_TIME: int
  35. MAX_CONCURRENT_ACTORS: int
  36. future: list[TaskSchema]
  37. pending: list[TaskSchema]
  38. stalled: list[TaskSchema]
  39. active: list[TaskSchema]
  40. past: list[TaskSchema]
  41. @staticmethod
  42. def resolve_model(obj) -> str:
  43. return obj.Model.__name__
  44. @staticmethod
  45. def resolve_statemachine(obj) -> str:
  46. return obj.StateMachineClass.__name__
  47. @staticmethod
  48. def resolve_name(obj) -> str:
  49. return str(obj)
  50. @staticmethod
  51. def resolve_ACTIVE_STATE(obj) -> str:
  52. return str(obj.ACTIVE_STATE)
  53. @staticmethod
  54. def resolve_FINAL_STATES(obj) -> list[str]:
  55. return [str(state) for state in obj.FINAL_STATES]
  56. @staticmethod
  57. def resolve_future(obj) -> list[TaskSchema]:
  58. return [obj for obj in obj.qs.filter(obj.future_q).order_by('-retry_at')]
  59. @staticmethod
  60. def resolve_pending(obj) -> list[TaskSchema]:
  61. return [obj for obj in obj.qs.filter(obj.pending_q).order_by('-retry_at')]
  62. @staticmethod
  63. def resolve_stalled(obj) -> list[TaskSchema]:
  64. return [obj for obj in obj.qs.filter(obj.stalled_q).order_by('-retry_at')]
  65. @staticmethod
  66. def resolve_active(obj) -> list[TaskSchema]:
  67. return [obj for obj in obj.qs.filter(obj.active_q).order_by('-retry_at')]
  68. @staticmethod
  69. def resolve_past(obj) -> list[TaskSchema]:
  70. return [obj for obj in obj.qs.filter(obj.final_q).order_by('-modified_at')]
  71. class OrchestratorSchema(Schema):
  72. # TYPE: str = 'workers.orchestrator.Orchestrator'
  73. #pid: int | None
  74. exit_on_idle: bool
  75. mode: str
  76. actors: list[ActorSchema]
  77. @staticmethod
  78. def resolve_actors(obj) -> list[ActorSchema]:
  79. return [actor() for actor in obj.actor_types.values()]
  80. @router.get("/orchestrators", response=List[OrchestratorSchema], url_name="get_orchestrators")
  81. def get_orchestrators(request):
  82. """List all the task orchestrators (aka Orchestrators) that are currently running"""
  83. from workers.orchestrator import Orchestrator
  84. orchestrator = Orchestrator()
  85. return [orchestrator]
  86. @router.get("/actors", response=List[ActorSchema], url_name="get_actors")
  87. def get_actors(request):
  88. """List all the task consumer workers (aka Actors) that are currently running"""
  89. from workers.orchestrator import Orchestrator
  90. orchestrator = Orchestrator()
  91. return orchestrator.actor_types.values()