v1_core.py 9.1 KB


  1. __package__ = 'archivebox.api'
  2. from uuid import UUID
  3. from typing import List, Optional
  4. from datetime import datetime
  5. from django.db.models import Q
  6. from django.shortcuts import get_object_or_404
  7. from ninja import Router, Schema, FilterSchema, Field, Query
  8. from ninja.pagination import paginate
  9. from core.models import Snapshot, ArchiveResult, Tag
  10. from abid_utils.abid import ABID
  11. router = Router(tags=['Core Models'])
  12. ### ArchiveResult #########################################################################
  13. class ArchiveResultSchema(Schema):
  14. id: UUID
  15. old_id: int
  16. abid: str
  17. modified: datetime
  18. created: datetime
  19. created_by_id: str
  20. snapshot_abid: str
  21. snapshot_url: str
  22. snapshot_tags: str
  23. extractor: str
  24. cmd_version: Optional[str]
  25. cmd: List[str]
  26. pwd: str
  27. status: str
  28. output: str
  29. @staticmethod
  30. def resolve_created_by_id(obj):
  31. return str(obj.created_by_id)
  32. @staticmethod
  33. def resolve_pk(obj):
  34. return str(obj.pk)
  35. @staticmethod
  36. def resolve_uuid(obj):
  37. return str(obj.uuid)
  38. @staticmethod
  39. def resolve_abid(obj):
  40. return str(obj.ABID)
  41. @staticmethod
  42. def resolve_created(obj):
  43. return obj.start_ts
  44. @staticmethod
  45. def resolve_snapshot_url(obj):
  46. return obj.snapshot.url
  47. @staticmethod
  48. def resolve_snapshot_abid(obj):
  49. return str(obj.snapshot.ABID)
  50. @staticmethod
  51. def resolve_snapshot_tags(obj):
  52. return obj.snapshot.tags_str()
  53. class ArchiveResultFilterSchema(FilterSchema):
  54. id: Optional[UUID] = Field(None, q='id')
  55. search: Optional[str] = Field(None, q=['snapshot__url__icontains', 'snapshot__title__icontains', 'snapshot__tags__name__icontains', 'extractor', 'output__icontains'])
  56. snapshot_id: Optional[UUID] = Field(None, q='snapshot_id__icontains')
  57. snapshot_url: Optional[str] = Field(None, q='snapshot__url__icontains')
  58. snapshot_tag: Optional[str] = Field(None, q='snapshot__tags__name__icontains')
  59. status: Optional[str] = Field(None, q='status')
  60. output: Optional[str] = Field(None, q='output__icontains')
  61. extractor: Optional[str] = Field(None, q='extractor__icontains')
  62. cmd: Optional[str] = Field(None, q='cmd__0__icontains')
  63. pwd: Optional[str] = Field(None, q='pwd__icontains')
  64. cmd_version: Optional[str] = Field(None, q='cmd_version')
  65. created: Optional[datetime] = Field(None, q='updated')
  66. created__gte: Optional[datetime] = Field(None, q='updated__gte')
  67. created__lt: Optional[datetime] = Field(None, q='updated__lt')
  68. @router.get("/archiveresults", response=List[ArchiveResultSchema], url_name="get_archiveresult")
  69. @paginate
  70. def get_archiveresults(request, filters: ArchiveResultFilterSchema = Query(...)):
  71. """List all ArchiveResult entries matching these filters."""
  72. qs = ArchiveResult.objects.all()
  73. results = filters.filter(qs)
  74. return results
  75. @router.get("/archiveresult/{archiveresult_id}", response=ArchiveResultSchema, url_name="get_archiveresult")
  76. def get_archiveresult(request, archiveresult_id: str):
  77. """Get a specific ArchiveResult by pk, abid, or old_id."""
  78. return ArchiveResult.objects.get(Q(id__icontains=archiveresult_id) | Q(abid__icontains=archiveresult_id) | Q(old_id__icontains=archiveresult_id))
  79. # @router.post("/archiveresult", response=ArchiveResultSchema)
  80. # def create_archiveresult(request, payload: ArchiveResultSchema):
  81. # archiveresult = ArchiveResult.objects.create(**payload.dict())
  82. # return archiveresult
  83. #
  84. # @router.put("/archiveresult/{archiveresult_id}", response=ArchiveResultSchema)
  85. # def update_archiveresult(request, archiveresult_id: str, payload: ArchiveResultSchema):
  86. # archiveresult = get_object_or_404(ArchiveResult, id=archiveresult_id)
  87. #
  88. # for attr, value in payload.dict().items():
  89. # setattr(archiveresult, attr, value)
  90. # archiveresult.save()
  91. #
  92. # return archiveresult
  93. #
  94. # @router.delete("/archiveresult/{archiveresult_id}")
  95. # def delete_archiveresult(request, archiveresult_id: str):
  96. # archiveresult = get_object_or_404(ArchiveResult, id=archiveresult_id)
  97. # archiveresult.delete()
  98. # return {"success": True}
  99. ### Snapshot #########################################################################
  100. class SnapshotSchema(Schema):
  101. id: UUID
  102. old_id: UUID
  103. abid: str
  104. modified: datetime
  105. created: datetime
  106. created_by_id: str
  107. url: str
  108. tags: str
  109. title: Optional[str]
  110. timestamp: str
  111. archive_path: str
  112. bookmarked: datetime
  113. added: datetime
  114. updated: Optional[datetime]
  115. num_archiveresults: int
  116. archiveresults: List[ArchiveResultSchema]
  117. @staticmethod
  118. def resolve_created_by_id(obj):
  119. return str(obj.created_by_id)
  120. @staticmethod
  121. def resolve_pk(obj):
  122. return str(obj.pk)
  123. @staticmethod
  124. def resolve_uuid(obj):
  125. return str(obj.uuid)
  126. @staticmethod
  127. def resolve_abid(obj):
  128. return str(obj.ABID)
  129. @staticmethod
  130. def resolve_tags(obj):
  131. return obj.tags_str()
  132. @staticmethod
  133. def resolve_num_archiveresults(obj, context):
  134. return obj.archiveresult_set.all().distinct().count()
  135. @staticmethod
  136. def resolve_archiveresults(obj, context):
  137. if context['request'].with_archiveresults:
  138. return obj.archiveresult_set.all().distinct()
  139. return ArchiveResult.objects.none()
  140. class SnapshotFilterSchema(FilterSchema):
  141. id: Optional[str] = Field(None, q='id__icontains')
  142. old_id: Optional[str] = Field(None, q='old_id__icontains')
  143. abid: Optional[str] = Field(None, q='abid__icontains')
  144. created_by_id: str = Field(None, q='created_by_id__icontains')
  145. created__gte: datetime = Field(None, q='created__gte')
  146. created__lt: datetime = Field(None, q='created__lt')
  147. created: datetime = Field(None, q='created')
  148. modified: datetime = Field(None, q='modified')
  149. modified__gte: datetime = Field(None, q='modified__gte')
  150. modified__lt: datetime = Field(None, q='modified__lt')
  151. search: Optional[str] = Field(None, q=['url__icontains', 'title__icontains', 'tags__name__icontains', 'id__icontains', 'abid__icontains', 'old_id__icontains'])
  152. url: Optional[str] = Field(None, q='url')
  153. tag: Optional[str] = Field(None, q='tags__name')
  154. title: Optional[str] = Field(None, q='title__icontains')
  155. timestamp: Optional[str] = Field(None, q='timestamp__startswith')
  156. added__gte: Optional[datetime] = Field(None, q='added__gte')
  157. added__lt: Optional[datetime] = Field(None, q='added__lt')
  158. @router.get("/snapshots", response=List[SnapshotSchema], url_name="get_snapshots")
  159. @paginate
  160. def get_snapshots(request, filters: SnapshotFilterSchema = Query(...), with_archiveresults: bool=True):
  161. """List all Snapshot entries matching these filters."""
  162. request.with_archiveresults = with_archiveresults
  163. qs = Snapshot.objects.all()
  164. results = filters.filter(qs)
  165. return results
  166. @router.get("/snapshot/{snapshot_id}", response=SnapshotSchema, url_name="get_snapshot")
  167. def get_snapshot(request, snapshot_id: str, with_archiveresults: bool=True):
  168. """Get a specific Snapshot by abid, uuid, or pk."""
  169. request.with_archiveresults = with_archiveresults
  170. snapshot = None
  171. try:
  172. snapshot = Snapshot.objects.get(Q(abid__startswith=snapshot_id) | Q(id__startswith=snapshot_id) | Q(old_id__startswith=snapshot_id))
  173. except Snapshot.DoesNotExist:
  174. pass
  175. try:
  176. snapshot = snapshot or Snapshot.objects.get()
  177. except Snapshot.DoesNotExist:
  178. pass
  179. try:
  180. snapshot = snapshot or Snapshot.objects.get(Q(abid__icontains=snapshot_id) | Q(id__icontains=snapshot_id) | Q(old_id__icontains=snapshot_id))
  181. except Snapshot.DoesNotExist:
  182. pass
  183. return snapshot
  184. # @router.post("/snapshot", response=SnapshotSchema)
  185. # def create_snapshot(request, payload: SnapshotSchema):
  186. # snapshot = Snapshot.objects.create(**payload.dict())
  187. # return snapshot
  188. #
  189. # @router.put("/snapshot/{snapshot_uuid}", response=SnapshotSchema)
  190. # def update_snapshot(request, snapshot_uuid: str, payload: SnapshotSchema):
  191. # snapshot = get_object_or_404(Snapshot, uuid=snapshot_uuid)
  192. #
  193. # for attr, value in payload.dict().items():
  194. # setattr(snapshot, attr, value)
  195. # snapshot.save()
  196. #
  197. # return snapshot
  198. #
  199. # @router.delete("/snapshot/{snapshot_uuid}")
  200. # def delete_snapshot(request, snapshot_uuid: str):
  201. # snapshot = get_object_or_404(Snapshot, uuid=snapshot_uuid)
  202. # snapshot.delete()
  203. # return {"success": True}
  204. ### Tag #########################################################################
  205. class TagSchema(Schema):
  206. abid: Optional[UUID] = Field(None, q='abid')
  207. uuid: Optional[UUID] = Field(None, q='uuid')
  208. pk: Optional[UUID] = Field(None, q='pk')
  209. modified: datetime
  210. created: datetime
  211. created_by_id: str
  212. name: str
  213. slug: str
  214. @staticmethod
  215. def resolve_created_by_id(obj):
  216. return str(obj.created_by_id)
  217. @router.get("/tags", response=List[TagSchema], url_name="get_tags")
  218. def get_tags(request):
  219. return Tag.objects.all()
  220. @router.get("/tag/{tag_id}", response=TagSchema, url_name="get_tag")
  221. def get_tag(request, tag_id: str):
  222. return Tag.objects.get(id=tag_id)