v1_core.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210
  1. __package__ = 'archivebox.api'
  2. from uuid import UUID
  3. from typing import List, Optional
  4. from datetime import datetime
  5. from django.shortcuts import get_object_or_404
  6. from ninja import Router, Schema, FilterSchema, Field, Query
  7. from ninja.pagination import paginate
  8. from core.models import Snapshot, ArchiveResult, Tag
  9. router = Router(tags=['Core Models'])
  10. ### ArchiveResult #########################################################################
  11. class ArchiveResultSchema(Schema):
  12. id: UUID
  13. snapshot_id: UUID
  14. snapshot_url: str
  15. snapshot_tags: str
  16. extractor: str
  17. cmd: List[str]
  18. pwd: str
  19. cmd_version: str
  20. output: str
  21. status: str
  22. created: datetime
  23. @staticmethod
  24. def resolve_id(obj):
  25. return obj.uuid
  26. @staticmethod
  27. def resolve_created(obj):
  28. return obj.start_ts
  29. @staticmethod
  30. def resolve_snapshot_url(obj):
  31. return obj.snapshot.url
  32. @staticmethod
  33. def resolve_snapshot_tags(obj):
  34. return obj.snapshot.tags_str()
  35. class ArchiveResultFilterSchema(FilterSchema):
  36. id: Optional[UUID] = Field(None, q='uuid')
  37. search: Optional[str] = Field(None, q=['snapshot__url__icontains', 'snapshot__title__icontains', 'snapshot__tags__name__icontains', 'extractor', 'output__icontains'])
  38. snapshot_id: Optional[UUID] = Field(None, q='snapshot_id')
  39. snapshot_url: Optional[str] = Field(None, q='snapshot__url')
  40. snapshot_tag: Optional[str] = Field(None, q='snapshot__tags__name')
  41. status: Optional[str] = Field(None, q='status')
  42. output: Optional[str] = Field(None, q='output__icontains')
  43. extractor: Optional[str] = Field(None, q='extractor__icontains')
  44. cmd: Optional[str] = Field(None, q='cmd__0__icontains')
  45. pwd: Optional[str] = Field(None, q='pwd__icontains')
  46. cmd_version: Optional[str] = Field(None, q='cmd_version')
  47. created: Optional[datetime] = Field(None, q='updated')
  48. created__gte: Optional[datetime] = Field(None, q='updated__gte')
  49. created__lt: Optional[datetime] = Field(None, q='updated__lt')
  50. @router.get("/archiveresults", response=List[ArchiveResultSchema])
  51. @paginate
  52. def list_archiveresults(request, filters: ArchiveResultFilterSchema = Query(...)):
  53. qs = ArchiveResult.objects.all()
  54. results = filters.filter(qs)
  55. return results
  56. @router.get("/archiveresult/{archiveresult_id}", response=ArchiveResultSchema)
  57. def get_archiveresult(request, archiveresult_id: str):
  58. archiveresult = get_object_or_404(ArchiveResult, id=archiveresult_id)
  59. return archiveresult
  60. # @router.post("/archiveresult", response=ArchiveResultSchema)
  61. # def create_archiveresult(request, payload: ArchiveResultSchema):
  62. # archiveresult = ArchiveResult.objects.create(**payload.dict())
  63. # return archiveresult
  64. #
  65. # @router.put("/archiveresult/{archiveresult_id}", response=ArchiveResultSchema)
  66. # def update_archiveresult(request, archiveresult_id: str, payload: ArchiveResultSchema):
  67. # archiveresult = get_object_or_404(ArchiveResult, id=archiveresult_id)
  68. #
  69. # for attr, value in payload.dict().items():
  70. # setattr(archiveresult, attr, value)
  71. # archiveresult.save()
  72. #
  73. # return archiveresult
  74. #
  75. # @router.delete("/archiveresult/{archiveresult_id}")
  76. # def delete_archiveresult(request, archiveresult_id: str):
  77. # archiveresult = get_object_or_404(ArchiveResult, id=archiveresult_id)
  78. # archiveresult.delete()
  79. # return {"success": True}
  80. ### Snapshot #########################################################################
  81. class SnapshotSchema(Schema):
  82. id: UUID
  83. url: str
  84. tags: str
  85. title: Optional[str]
  86. timestamp: str
  87. bookmarked: datetime
  88. added: datetime
  89. updated: datetime
  90. archive_path: str
  91. archiveresults: List[ArchiveResultSchema]
  92. # @staticmethod
  93. # def resolve_id(obj):
  94. # return str(obj.id)
  95. @staticmethod
  96. def resolve_tags(obj):
  97. return obj.tags_str()
  98. @staticmethod
  99. def resolve_archiveresults(obj, context):
  100. if context['request'].with_archiveresults:
  101. return obj.archiveresult_set.all().distinct()
  102. return ArchiveResult.objects.none()
  103. class SnapshotFilterSchema(FilterSchema):
  104. id: Optional[UUID] = Field(None, q='id')
  105. search: Optional[str] = Field(None, q=['url__icontains', 'title__icontains', 'tags__name__icontains'])
  106. url: Optional[str] = Field(None, q='url')
  107. tag: Optional[str] = Field(None, q='tags__name')
  108. title: Optional[str] = Field(None, q='title__icontains')
  109. timestamp: Optional[str] = Field(None, q='timestamp__startswith')
  110. added: Optional[datetime] = Field(None, q='added')
  111. added__gte: Optional[datetime] = Field(None, q='added__gte')
  112. added__lt: Optional[datetime] = Field(None, q='added__lt')
  113. @router.get("/snapshots", response=List[SnapshotSchema])
  114. @paginate
  115. def list_snapshots(request, filters: SnapshotFilterSchema = Query(...), with_archiveresults: bool=True):
  116. request.with_archiveresults = with_archiveresults
  117. qs = Snapshot.objects.all()
  118. results = filters.filter(qs)
  119. return results
  120. @router.get("/snapshot/{snapshot_id}", response=SnapshotSchema)
  121. def get_snapshot(request, snapshot_id: str, with_archiveresults: bool=True):
  122. request.with_archiveresults = with_archiveresults
  123. snapshot = get_object_or_404(Snapshot, id=snapshot_id)
  124. return snapshot
  125. # @router.post("/snapshot", response=SnapshotSchema)
  126. # def create_snapshot(request, payload: SnapshotSchema):
  127. # snapshot = Snapshot.objects.create(**payload.dict())
  128. # return snapshot
  129. #
  130. # @router.put("/snapshot/{snapshot_id}", response=SnapshotSchema)
  131. # def update_snapshot(request, snapshot_id: str, payload: SnapshotSchema):
  132. # snapshot = get_object_or_404(Snapshot, id=snapshot_id)
  133. #
  134. # for attr, value in payload.dict().items():
  135. # setattr(snapshot, attr, value)
  136. # snapshot.save()
  137. #
  138. # return snapshot
  139. #
  140. # @router.delete("/snapshot/{snapshot_id}")
  141. # def delete_snapshot(request, snapshot_id: str):
  142. # snapshot = get_object_or_404(Snapshot, id=snapshot_id)
  143. # snapshot.delete()
  144. # return {"success": True}
  145. ### Tag #########################################################################
  146. class TagSchema(Schema):
  147. name: str
  148. slug: str
  149. @router.get("/tags", response=List[TagSchema])
  150. def list_tags(request):
  151. return Tag.objects.all()