v1_core.py 6.7 KB


  1. __package__ = 'archivebox.api'
  2. from uuid import UUID
  3. from typing import List, Optional
  4. from datetime import datetime
  5. from django.shortcuts import get_object_or_404
  6. from ninja import Router, Schema, FilterSchema, Field, Query
  7. from ninja.pagination import paginate
  8. from core.models import Snapshot, ArchiveResult, Tag
  9. from abid_utils.abid import ABID
  10. router = Router(tags=['Core Models'])
  11. ### ArchiveResult #########################################################################
  12. class ArchiveResultSchema(Schema):
  13. pk: str
  14. uuid: UUID
  15. abid: str
  16. snapshot_abid: str
  17. snapshot_url: str
  18. snapshot_tags: str
  19. extractor: str
  20. cmd: List[str]
  21. pwd: str
  22. cmd_version: str
  23. output: str
  24. status: str
  25. created: datetime
  26. @staticmethod
  27. def resolve_pk(obj):
  28. return str(obj.pk)
  29. @staticmethod
  30. def resolve_uuid(obj):
  31. return str(obj.uuid)
  32. @staticmethod
  33. def resolve_abid(obj):
  34. return str(obj.ABID)
  35. @staticmethod
  36. def resolve_created(obj):
  37. return obj.start_ts
  38. @staticmethod
  39. def resolve_snapshot_url(obj):
  40. return obj.snapshot.url
  41. @staticmethod
  42. def resolve_snapshot_abid(obj):
  43. return str(obj.snapshot.ABID)
  44. @staticmethod
  45. def resolve_snapshot_tags(obj):
  46. return obj.snapshot.tags_str()
  47. class ArchiveResultFilterSchema(FilterSchema):
  48. uuid: Optional[UUID] = Field(None, q='uuid')
  49. # abid: Optional[str] = Field(None, q='abid')
  50. search: Optional[str] = Field(None, q=['snapshot__url__icontains', 'snapshot__title__icontains', 'snapshot__tags__name__icontains', 'extractor', 'output__icontains'])
  51. snapshot_uuid: Optional[UUID] = Field(None, q='snapshot_uuid')
  52. snapshot_url: Optional[str] = Field(None, q='snapshot__url')
  53. snapshot_tag: Optional[str] = Field(None, q='snapshot__tags__name')
  54. status: Optional[str] = Field(None, q='status')
  55. output: Optional[str] = Field(None, q='output__icontains')
  56. extractor: Optional[str] = Field(None, q='extractor__icontains')
  57. cmd: Optional[str] = Field(None, q='cmd__0__icontains')
  58. pwd: Optional[str] = Field(None, q='pwd__icontains')
  59. cmd_version: Optional[str] = Field(None, q='cmd_version')
  60. created: Optional[datetime] = Field(None, q='updated')
  61. created__gte: Optional[datetime] = Field(None, q='updated__gte')
  62. created__lt: Optional[datetime] = Field(None, q='updated__lt')
  63. @router.get("/archiveresults", response=List[ArchiveResultSchema])
  64. @paginate
  65. def list_archiveresults(request, filters: ArchiveResultFilterSchema = Query(...)):
  66. qs = ArchiveResult.objects.all()
  67. results = filters.filter(qs)
  68. return results
  69. @router.get("/archiveresult/{archiveresult_id}", response=ArchiveResultSchema)
  70. def get_archiveresult(request, archiveresult_id: str):
  71. archiveresult = get_object_or_404(ArchiveResult, id=archiveresult_id)
  72. return archiveresult
  73. # @router.post("/archiveresult", response=ArchiveResultSchema)
  74. # def create_archiveresult(request, payload: ArchiveResultSchema):
  75. # archiveresult = ArchiveResult.objects.create(**payload.dict())
  76. # return archiveresult
  77. #
  78. # @router.put("/archiveresult/{archiveresult_id}", response=ArchiveResultSchema)
  79. # def update_archiveresult(request, archiveresult_id: str, payload: ArchiveResultSchema):
  80. # archiveresult = get_object_or_404(ArchiveResult, id=archiveresult_id)
  81. #
  82. # for attr, value in payload.dict().items():
  83. # setattr(archiveresult, attr, value)
  84. # archiveresult.save()
  85. #
  86. # return archiveresult
  87. #
  88. # @router.delete("/archiveresult/{archiveresult_id}")
  89. # def delete_archiveresult(request, archiveresult_id: str):
  90. # archiveresult = get_object_or_404(ArchiveResult, id=archiveresult_id)
  91. # archiveresult.delete()
  92. # return {"success": True}
  93. ### Snapshot #########################################################################
  94. class SnapshotSchema(Schema):
  95. pk: str
  96. uuid: UUID
  97. abid: str
  98. url: str
  99. tags: str
  100. title: Optional[str]
  101. timestamp: str
  102. bookmarked: datetime
  103. added: datetime
  104. updated: datetime
  105. archive_path: str
  106. archiveresults: List[ArchiveResultSchema]
  107. @staticmethod
  108. def resolve_pk(obj):
  109. return str(obj.pk)
  110. @staticmethod
  111. def resolve_uuid(obj):
  112. return str(obj.uuid)
  113. @staticmethod
  114. def resolve_abid(obj):
  115. return str(obj.ABID)
  116. @staticmethod
  117. def resolve_tags(obj):
  118. return obj.tags_str()
  119. @staticmethod
  120. def resolve_archiveresults(obj, context):
  121. if context['request'].with_archiveresults:
  122. return obj.archiveresult_set.all().distinct()
  123. return ArchiveResult.objects.none()
  124. class SnapshotFilterSchema(FilterSchema):
  125. id: Optional[UUID] = Field(None, q='id')
  126. search: Optional[str] = Field(None, q=['url__icontains', 'title__icontains', 'tags__name__icontains'])
  127. url: Optional[str] = Field(None, q='url')
  128. tag: Optional[str] = Field(None, q='tags__name')
  129. title: Optional[str] = Field(None, q='title__icontains')
  130. timestamp: Optional[str] = Field(None, q='timestamp__startswith')
  131. added: Optional[datetime] = Field(None, q='added')
  132. added__gte: Optional[datetime] = Field(None, q='added__gte')
  133. added__lt: Optional[datetime] = Field(None, q='added__lt')
  134. @router.get("/snapshots", response=List[SnapshotSchema])
  135. @paginate
  136. def list_snapshots(request, filters: SnapshotFilterSchema = Query(...), with_archiveresults: bool=True):
  137. request.with_archiveresults = with_archiveresults
  138. qs = Snapshot.objects.all()
  139. results = filters.filter(qs)
  140. return results
  141. @router.get("/snapshot/{snapshot_uuid}", response=SnapshotSchema)
  142. def get_snapshot(request, snapshot_uuid: str, with_archiveresults: bool=True):
  143. request.with_archiveresults = with_archiveresults
  144. snapshot = get_object_or_404(Snapshot, uuid=snapshot_uuid)
  145. return snapshot
  146. # @router.post("/snapshot", response=SnapshotSchema)
  147. # def create_snapshot(request, payload: SnapshotSchema):
  148. # snapshot = Snapshot.objects.create(**payload.dict())
  149. # return snapshot
  150. #
  151. # @router.put("/snapshot/{snapshot_uuid}", response=SnapshotSchema)
  152. # def update_snapshot(request, snapshot_uuid: str, payload: SnapshotSchema):
  153. # snapshot = get_object_or_404(Snapshot, uuid=snapshot_uuid)
  154. #
  155. # for attr, value in payload.dict().items():
  156. # setattr(snapshot, attr, value)
  157. # snapshot.save()
  158. #
  159. # return snapshot
  160. #
  161. # @router.delete("/snapshot/{snapshot_uuid}")
  162. # def delete_snapshot(request, snapshot_uuid: str):
  163. # snapshot = get_object_or_404(Snapshot, uuid=snapshot_uuid)
  164. # snapshot.delete()
  165. # return {"success": True}
  166. ### Tag #########################################################################
  167. class TagSchema(Schema):
  168. name: str
  169. slug: str
  170. @router.get("/tags", response=List[TagSchema])
  171. def list_tags(request):
  172. return Tag.objects.all()