| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210 |
- __package__ = 'archivebox.api'
- from uuid import UUID
- from typing import List, Optional, Union
- from datetime import datetime
- from django.shortcuts import get_object_or_404
- from ninja import Router, Schema, FilterSchema, Field, Query
- from ninja.pagination import paginate
- from core.models import Snapshot, ArchiveResult, Tag
- router = Router(tags=['Core Models'])
- ### ArchiveResult #########################################################################
- class ArchiveResultSchema(Schema):
- id: UUID
- snapshot_id: UUID
- snapshot_url: str
- snapshot_tags: str
- extractor: str
- cmd: List[str]
- pwd: str
- cmd_version: str
- output: str
- status: str
- created: datetime
- @staticmethod
- def resolve_id(obj):
- return obj.uuid
- @staticmethod
- def resolve_created(obj):
- return obj.start_ts
- @staticmethod
- def resolve_snapshot_url(obj):
- return obj.snapshot.url
- @staticmethod
- def resolve_snapshot_tags(obj):
- return obj.snapshot.tags_str()
- class ArchiveResultFilterSchema(FilterSchema):
- id: Optional[UUID] = Field(None, q='uuid')
- search: Optional[str] = Field(None, q=['snapshot__url__icontains', 'snapshot__title__icontains', 'snapshot__tags__name__icontains', 'extractor', 'output__icontains'])
- snapshot_id: Optional[UUID] = Field(None, q='snapshot_id')
- snapshot_url: Optional[str] = Field(None, q='snapshot__url')
- snapshot_tag: Optional[str] = Field(None, q='snapshot__tags__name')
-
- status: Optional[str] = Field(None, q='status')
- output: Optional[str] = Field(None, q='output__icontains')
- extractor: Optional[str] = Field(None, q='extractor__icontains')
- cmd: Optional[str] = Field(None, q='cmd__0__icontains')
- pwd: Optional[str] = Field(None, q='pwd__icontains')
- cmd_version: Optional[str] = Field(None, q='cmd_version')
- created: Optional[datetime] = Field(None, q='updated')
- created__gte: Optional[datetime] = Field(None, q='updated__gte')
- created__lt: Optional[datetime] = Field(None, q='updated__lt')
- @router.get("/archiveresults", response=List[ArchiveResultSchema])
- @paginate
- def list_archiveresults(request, filters: ArchiveResultFilterSchema = Query(...)):
- qs = ArchiveResult.objects.all()
- results = filters.filter(qs)
- return results
- @router.get("/archiveresult/{archiveresult_id}", response=ArchiveResultSchema)
- def get_archiveresult(request, archiveresult_id: str):
- archiveresult = get_object_or_404(ArchiveResult, id=archiveresult_id)
- return archiveresult
- # @router.post("/archiveresult", response=ArchiveResultSchema)
- # def create_archiveresult(request, payload: ArchiveResultSchema):
- # archiveresult = ArchiveResult.objects.create(**payload.dict())
- # return archiveresult
- #
- # @router.put("/archiveresult/{archiveresult_id}", response=ArchiveResultSchema)
- # def update_archiveresult(request, archiveresult_id: str, payload: ArchiveResultSchema):
- # archiveresult = get_object_or_404(ArchiveResult, id=archiveresult_id)
- #
- # for attr, value in payload.dict().items():
- # setattr(archiveresult, attr, value)
- # archiveresult.save()
- #
- # return archiveresult
- #
- # @router.delete("/archiveresult/{archiveresult_id}")
- # def delete_archiveresult(request, archiveresult_id: str):
- # archiveresult = get_object_or_404(ArchiveResult, id=archiveresult_id)
- # archiveresult.delete()
- # return {"success": True}
- ### Snapshot #########################################################################
- class SnapshotSchema(Schema):
- id: UUID
- url: str
- tags: str
- title: Optional[str]
- timestamp: str
- bookmarked: datetime
- added: datetime
- updated: datetime
- archive_path: str
- archiveresults: List[ArchiveResultSchema]
- # @staticmethod
- # def resolve_id(obj):
- # return str(obj.id)
- @staticmethod
- def resolve_tags(obj):
- return obj.tags_str()
- @staticmethod
- def resolve_archiveresults(obj, context):
- if context['request'].with_archiveresults:
- return obj.archiveresult_set.all().distinct()
- return ArchiveResult.objects.none()
- class SnapshotFilterSchema(FilterSchema):
- id: Optional[UUID] = Field(None, q='id')
- search: Optional[str] = Field(None, q=['url__icontains', 'title__icontains', 'tags__name__icontains'])
- url: Optional[str] = Field(None, q='url')
- tag: Optional[str] = Field(None, q='tags__name')
- title: Optional[str] = Field(None, q='title__icontains')
-
- timestamp: Optional[str] = Field(None, q='timestamp__startswith')
-
- added: Optional[datetime] = Field(None, q='added')
- added__gte: Optional[datetime] = Field(None, q='added__gte')
- added__lt: Optional[datetime] = Field(None, q='added__lt')
- @router.get("/snapshots", response=List[SnapshotSchema])
- @paginate
- def list_snapshots(request, filters: SnapshotFilterSchema = Query(...), with_archiveresults: bool=True):
- request.with_archiveresults = with_archiveresults
- qs = Snapshot.objects.all()
- results = filters.filter(qs)
- return results
- @router.get("/snapshot/{snapshot_id}", response=SnapshotSchema)
- def get_snapshot(request, snapshot_id: str, with_archiveresults: bool=True):
- request.with_archiveresults = with_archiveresults
- snapshot = get_object_or_404(Snapshot, id=snapshot_id)
- return snapshot
- # @router.post("/snapshot", response=SnapshotSchema)
- # def create_snapshot(request, payload: SnapshotSchema):
- # snapshot = Snapshot.objects.create(**payload.dict())
- # return snapshot
- #
- # @router.put("/snapshot/{snapshot_id}", response=SnapshotSchema)
- # def update_snapshot(request, snapshot_id: str, payload: SnapshotSchema):
- # snapshot = get_object_or_404(Snapshot, id=snapshot_id)
- #
- # for attr, value in payload.dict().items():
- # setattr(snapshot, attr, value)
- # snapshot.save()
- #
- # return snapshot
- #
- # @router.delete("/snapshot/{snapshot_id}")
- # def delete_snapshot(request, snapshot_id: str):
- # snapshot = get_object_or_404(Snapshot, id=snapshot_id)
- # snapshot.delete()
- # return {"success": True}
- ### Tag #########################################################################
- class TagSchema(Schema):
- name: str
- slug: str
- @router.get("/tags", response=List[TagSchema])
- def list_tags(request):
- return Tag.objects.all()
|