| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459 |
- __package__ = 'archivebox.api'
- import math
- from uuid import UUID
- from typing import List, Optional, Union, Any
- from datetime import datetime
- from django.db.models import Q
- from django.core.exceptions import ValidationError
- from django.contrib.auth import get_user_model
- from django.shortcuts import redirect
- from ninja import Router, Schema, FilterSchema, Field, Query
- from ninja.pagination import paginate, PaginationBase
- from ninja.errors import HttpError
- from archivebox.core.models import Snapshot, ArchiveResult, Tag
- from archivebox.api.v1_crawls import CrawlSchema
- router = Router(tags=['Core Models'])
- class CustomPagination(PaginationBase):
- class Input(Schema):
- limit: int = 200
- offset: int = 0
- page: int = 0
- class Output(Schema):
- total_items: int
- total_pages: int
- page: int
- limit: int
- offset: int
- num_items: int
- items: List[Any]
- def paginate_queryset(self, queryset, pagination: Input, **params):
- limit = min(pagination.limit, 500)
- offset = pagination.offset or (pagination.page * limit)
- total = queryset.count()
- total_pages = math.ceil(total / limit)
- current_page = math.ceil(offset / (limit + 1))
- items = queryset[offset : offset + limit]
- return {
- 'total_items': total,
- 'total_pages': total_pages,
- 'page': current_page,
- 'limit': limit,
- 'offset': offset,
- 'num_items': len(items),
- 'items': items,
- }
- ### ArchiveResult #########################################################################
- class MinimalArchiveResultSchema(Schema):
- TYPE: str = 'core.models.ArchiveResult'
- id: UUID
- created_at: datetime | None
- modified_at: datetime | None
- created_by_id: str
- created_by_username: str
- status: str
- retry_at: datetime | None
- plugin: str
- hook_name: str
- process_id: UUID | None
- cmd_version: str | None
- cmd: list[str] | None
- pwd: str | None
- output_str: str
- output_json: dict | None
- output_files: dict | None
- output_size: int
- output_mimetypes: str
- start_ts: datetime | None
- end_ts: datetime | None
- @staticmethod
- def resolve_created_by_id(obj):
- return str(obj.created_by.pk)
- @staticmethod
- def resolve_created_by_username(obj) -> str:
- return obj.created_by.username
- class ArchiveResultSchema(MinimalArchiveResultSchema):
- TYPE: str = 'core.models.ArchiveResult'
- snapshot_id: UUID
- snapshot_timestamp: str
- snapshot_url: str
- snapshot_tags: List[str]
- @staticmethod
- def resolve_snapshot_timestamp(obj):
- return obj.snapshot.timestamp
- @staticmethod
- def resolve_snapshot_url(obj):
- return obj.snapshot.url
- @staticmethod
- def resolve_snapshot_id(obj):
- return obj.snapshot_id
- @staticmethod
- def resolve_snapshot_tags(obj):
- return sorted(tag.name for tag in obj.snapshot.tags.all())
- class ArchiveResultFilterSchema(FilterSchema):
- id: Optional[str] = Field(None, q=['id__startswith', 'snapshot__id__startswith', 'snapshot__timestamp__startswith'])
- search: Optional[str] = Field(None, q=['snapshot__url__icontains', 'snapshot__title__icontains', 'snapshot__tags__name__icontains', 'plugin', 'output_str__icontains', 'id__startswith', 'snapshot__id__startswith', 'snapshot__timestamp__startswith'])
- snapshot_id: Optional[str] = Field(None, q=['snapshot__id__startswith', 'snapshot__timestamp__startswith'])
- snapshot_url: Optional[str] = Field(None, q='snapshot__url__icontains')
- snapshot_tag: Optional[str] = Field(None, q='snapshot__tags__name__icontains')
- status: Optional[str] = Field(None, q='status')
- output_str: Optional[str] = Field(None, q='output_str__icontains')
- plugin: Optional[str] = Field(None, q='plugin__icontains')
- hook_name: Optional[str] = Field(None, q='hook_name__icontains')
- process_id: Optional[str] = Field(None, q='process__id__startswith')
- cmd: Optional[str] = Field(None, q='cmd__0__icontains')
- pwd: Optional[str] = Field(None, q='pwd__icontains')
- cmd_version: Optional[str] = Field(None, q='cmd_version')
- created_at: Optional[datetime] = Field(None, q='created_at')
- created_at__gte: Optional[datetime] = Field(None, q='created_at__gte')
- created_at__lt: Optional[datetime] = Field(None, q='created_at__lt')
- @router.get("/archiveresults", response=List[ArchiveResultSchema], url_name="get_archiveresult")
- @paginate(CustomPagination)
- def get_archiveresults(request, filters: ArchiveResultFilterSchema = Query(...)):
- """List all ArchiveResult entries matching these filters."""
- return filters.filter(ArchiveResult.objects.all()).distinct()
- @router.get("/archiveresult/{archiveresult_id}", response=ArchiveResultSchema, url_name="get_archiveresult")
- def get_archiveresult(request, archiveresult_id: str):
- """Get a specific ArchiveResult by id."""
- return ArchiveResult.objects.get(Q(id__icontains=archiveresult_id))
- ### Snapshot #########################################################################
- class SnapshotSchema(Schema):
- TYPE: str = 'core.models.Snapshot'
- id: UUID
- created_by_id: str
- created_by_username: str
- created_at: datetime
- modified_at: datetime
- status: str
- retry_at: datetime | None
- bookmarked_at: datetime
- downloaded_at: Optional[datetime]
- url: str
- tags: List[str]
- title: Optional[str]
- timestamp: str
- archive_path: str
- num_archiveresults: int
- archiveresults: List[MinimalArchiveResultSchema]
- @staticmethod
- def resolve_created_by_id(obj):
- return str(obj.created_by.pk)
- @staticmethod
- def resolve_created_by_username(obj):
- return obj.created_by.username
- @staticmethod
- def resolve_tags(obj):
- return sorted(tag.name for tag in obj.tags.all())
- @staticmethod
- def resolve_num_archiveresults(obj, context):
- return obj.archiveresult_set.all().distinct().count()
- @staticmethod
- def resolve_archiveresults(obj, context):
- if context['request'].with_archiveresults:
- return obj.archiveresult_set.all().distinct()
- return ArchiveResult.objects.none()
- class SnapshotFilterSchema(FilterSchema):
- id: Optional[str] = Field(None, q=['id__icontains', 'timestamp__startswith'])
- created_by_id: str = Field(None, q='crawl__created_by_id')
- created_by_username: str = Field(None, q='crawl__created_by__username__icontains')
- created_at__gte: datetime = Field(None, q='created_at__gte')
- created_at__lt: datetime = Field(None, q='created_at__lt')
- created_at: datetime = Field(None, q='created_at')
- modified_at: datetime = Field(None, q='modified_at')
- modified_at__gte: datetime = Field(None, q='modified_at__gte')
- modified_at__lt: datetime = Field(None, q='modified_at__lt')
- search: Optional[str] = Field(None, q=['url__icontains', 'title__icontains', 'tags__name__icontains', 'id__icontains', 'timestamp__startswith'])
- url: Optional[str] = Field(None, q='url')
- tag: Optional[str] = Field(None, q='tags__name')
- title: Optional[str] = Field(None, q='title__icontains')
- timestamp: Optional[str] = Field(None, q='timestamp__startswith')
- bookmarked_at__gte: Optional[datetime] = Field(None, q='bookmarked_at__gte')
- bookmarked_at__lt: Optional[datetime] = Field(None, q='bookmarked_at__lt')
- @router.get("/snapshots", response=List[SnapshotSchema], url_name="get_snapshots")
- @paginate(CustomPagination)
- def get_snapshots(request, filters: SnapshotFilterSchema = Query(...), with_archiveresults: bool = False):
- """List all Snapshot entries matching these filters."""
- request.with_archiveresults = with_archiveresults
- return filters.filter(Snapshot.objects.all()).distinct()
- @router.get("/snapshot/{snapshot_id}", response=SnapshotSchema, url_name="get_snapshot")
- def get_snapshot(request, snapshot_id: str, with_archiveresults: bool = True):
- """Get a specific Snapshot by id."""
- request.with_archiveresults = with_archiveresults
- try:
- return Snapshot.objects.get(Q(id__startswith=snapshot_id) | Q(timestamp__startswith=snapshot_id))
- except Snapshot.DoesNotExist:
- return Snapshot.objects.get(Q(id__icontains=snapshot_id))
- ### Tag #########################################################################
- class TagSchema(Schema):
- TYPE: str = 'core.models.Tag'
- id: UUID
- modified_at: datetime
- created_at: datetime
- created_by_id: str
- created_by_username: str
- name: str
- slug: str
- num_snapshots: int
- snapshots: List[SnapshotSchema]
- @staticmethod
- def resolve_created_by_id(obj):
- return str(obj.created_by_id)
- @staticmethod
- def resolve_created_by_username(obj):
- User = get_user_model()
- return User.objects.get(id=obj.created_by_id).username
- @staticmethod
- def resolve_num_snapshots(obj, context):
- return obj.snapshot_set.all().distinct().count()
- @staticmethod
- def resolve_snapshots(obj, context):
- if context['request'].with_snapshots:
- return obj.snapshot_set.all().distinct()
- return Snapshot.objects.none()
- @router.get("/tags", response=List[TagSchema], url_name="get_tags")
- @paginate(CustomPagination)
- def get_tags(request):
- request.with_snapshots = False
- request.with_archiveresults = False
- return Tag.objects.all().distinct()
- @router.get("/tag/{tag_id}", response=TagSchema, url_name="get_tag")
- def get_tag(request, tag_id: str, with_snapshots: bool = True):
- request.with_snapshots = with_snapshots
- request.with_archiveresults = False
- try:
- return Tag.objects.get(id__icontains=tag_id)
- except (Tag.DoesNotExist, ValidationError):
- return Tag.objects.get(slug__icontains=tag_id)
- @router.get("/any/{id}", response=Union[SnapshotSchema, ArchiveResultSchema, TagSchema, CrawlSchema], url_name="get_any", summary="Get any object by its ID")
- def get_any(request, id: str):
- """Get any object by its ID (e.g. snapshot, archiveresult, tag, crawl, etc.)."""
- request.with_snapshots = False
- request.with_archiveresults = False
- for getter in [get_snapshot, get_archiveresult, get_tag]:
- try:
- response = getter(request, id)
- if response:
- return redirect(f"/api/v1/{response._meta.app_label}/{response._meta.model_name}/{response.id}?{request.META['QUERY_STRING']}")
- except Exception:
- pass
- try:
- from archivebox.api.v1_crawls import get_crawl
- response = get_crawl(request, id)
- if response:
- return redirect(f"/api/v1/{response._meta.app_label}/{response._meta.model_name}/{response.id}?{request.META['QUERY_STRING']}")
- except Exception:
- pass
- raise HttpError(404, 'Object with given ID not found')
- ### Tag Editor API Endpoints #########################################################################
- class TagAutocompleteSchema(Schema):
- tags: List[dict]
- class TagCreateSchema(Schema):
- name: str
- class TagCreateResponseSchema(Schema):
- success: bool
- tag_id: int
- tag_name: str
- created: bool
- class TagSnapshotRequestSchema(Schema):
- snapshot_id: str
- tag_name: Optional[str] = None
- tag_id: Optional[int] = None
- class TagSnapshotResponseSchema(Schema):
- success: bool
- tag_id: int
- tag_name: str
- @router.get("/tags/autocomplete/", response=TagAutocompleteSchema, url_name="tags_autocomplete")
- def tags_autocomplete(request, q: str = ""):
- """Return tags matching the query for autocomplete."""
- if not q:
- # Return all tags if no query (limited to 50)
- tags = Tag.objects.all().order_by('name')[:50]
- else:
- tags = Tag.objects.filter(name__icontains=q).order_by('name')[:20]
- return {
- 'tags': [{'id': tag.pk, 'name': tag.name, 'slug': tag.slug} for tag in tags]
- }
- @router.post("/tags/create/", response=TagCreateResponseSchema, url_name="tags_create")
- def tags_create(request, data: TagCreateSchema):
- """Create a new tag or return existing one."""
- name = data.name.strip()
- if not name:
- raise HttpError(400, 'Tag name is required')
- tag, created = Tag.objects.get_or_create(
- name__iexact=name,
- defaults={
- 'name': name,
- 'created_by': request.user if request.user.is_authenticated else None,
- }
- )
- # If found by case-insensitive match, use that tag
- if not created:
- tag = Tag.objects.filter(name__iexact=name).first()
- return {
- 'success': True,
- 'tag_id': tag.pk,
- 'tag_name': tag.name,
- 'created': created,
- }
- @router.post("/tags/add-to-snapshot/", response=TagSnapshotResponseSchema, url_name="tags_add_to_snapshot")
- def tags_add_to_snapshot(request, data: TagSnapshotRequestSchema):
- """Add a tag to a snapshot. Creates the tag if it doesn't exist."""
- # Get the snapshot
- try:
- snapshot = Snapshot.objects.get(
- Q(id__startswith=data.snapshot_id) | Q(timestamp__startswith=data.snapshot_id)
- )
- except Snapshot.DoesNotExist:
- raise HttpError(404, 'Snapshot not found')
- except Snapshot.MultipleObjectsReturned:
- snapshot = Snapshot.objects.filter(
- Q(id__startswith=data.snapshot_id) | Q(timestamp__startswith=data.snapshot_id)
- ).first()
- # Get or create the tag
- if data.tag_name:
- name = data.tag_name.strip()
- if not name:
- raise HttpError(400, 'Tag name is required')
- tag, _ = Tag.objects.get_or_create(
- name__iexact=name,
- defaults={
- 'name': name,
- 'created_by': request.user if request.user.is_authenticated else None,
- }
- )
- # If found by case-insensitive match, use that tag
- tag = Tag.objects.filter(name__iexact=name).first() or tag
- elif data.tag_id:
- try:
- tag = Tag.objects.get(pk=data.tag_id)
- except Tag.DoesNotExist:
- raise HttpError(404, 'Tag not found')
- else:
- raise HttpError(400, 'Either tag_name or tag_id is required')
- # Add the tag to the snapshot
- snapshot.tags.add(tag)
- return {
- 'success': True,
- 'tag_id': tag.pk,
- 'tag_name': tag.name,
- }
- @router.post("/tags/remove-from-snapshot/", response=TagSnapshotResponseSchema, url_name="tags_remove_from_snapshot")
- def tags_remove_from_snapshot(request, data: TagSnapshotRequestSchema):
- """Remove a tag from a snapshot."""
- # Get the snapshot
- try:
- snapshot = Snapshot.objects.get(
- Q(id__startswith=data.snapshot_id) | Q(timestamp__startswith=data.snapshot_id)
- )
- except Snapshot.DoesNotExist:
- raise HttpError(404, 'Snapshot not found')
- except Snapshot.MultipleObjectsReturned:
- snapshot = Snapshot.objects.filter(
- Q(id__startswith=data.snapshot_id) | Q(timestamp__startswith=data.snapshot_id)
- ).first()
- # Get the tag
- if data.tag_id:
- try:
- tag = Tag.objects.get(pk=data.tag_id)
- except Tag.DoesNotExist:
- raise HttpError(404, 'Tag not found')
- elif data.tag_name:
- try:
- tag = Tag.objects.get(name__iexact=data.tag_name.strip())
- except Tag.DoesNotExist:
- raise HttpError(404, 'Tag not found')
- else:
- raise HttpError(400, 'Either tag_name or tag_id is required')
- # Remove the tag from the snapshot
- snapshot.tags.remove(tag)
- return {
- 'success': True,
- 'tag_id': tag.pk,
- 'tag_name': tag.name,
- }
|