Explorar o código

API fixes and add actors endpoints

Nick Sweeting hai 1 ano
pai
achega
8f8fbbb7a2

+ 9 - 1
archivebox/abid_utils/models.py

@@ -13,6 +13,7 @@ from django.contrib import admin
 from django.core.exceptions import ValidationError, NON_FIELD_ERRORS
 from django.db import models
 from django.utils import timezone
+from django.utils.functional import classproperty
 from django.db.utils import OperationalError
 from django.contrib.auth import get_user_model
 from django.urls import reverse_lazy
@@ -94,13 +95,19 @@ class ABIDModel(models.Model):
     class Meta(TypedModelMeta):
         abstract = True
 
+    @classproperty
+    def TYPE(cls) -> str:
+        """Get the full Python dotted-import path for this model, e.g. 'core.models.Snapshot'"""
+        return f'{cls.__module__}.{cls.__name__}'
+    
     @admin.display(description='Summary')
     def __str__(self) -> str:
         return f'[{self.abid or (self.abid_prefix + "NEW")}] {self.__class__.__name__} {eval(self.abid_uri_src)}'
 
     def __init__(self, *args: Any, **kwargs: Any) -> None:
         """Overriden __init__ method ensures we have a stable creation timestamp that fields can use within initialization code pre-saving to DB."""
-        super().__init__(*args, **kwargs)
+        super().__init__(*args, **kwargs)   # type: ignore
+        
         # pre-compute a stable timestamp of the obj init time (with abid.ts precision limit applied) for use when object is first created,
         # some other fields depend on a timestamp at creation time, and it's nice to have one common timestamp they can all share.
         # Used as an alternative to auto_now_add=True + auto_now=True which can produce two different times & requires saving to DB to get the TS.
@@ -165,6 +172,7 @@ class ABIDModel(models.Model):
     def id_from_abid(cls, abid: str) -> str:
         return str(cls.objects.only('pk').get(abid=cls.abid_prefix + str(abid).split('_', 1)[-1]).pk)
 
+
     @property
     def ABID_SOURCES(self) -> Dict[str, str]:
         """"Get the dict of fresh ABID component values based on the live object's properties."""

+ 4 - 4
archivebox/actors/actor.py

@@ -60,9 +60,9 @@ class ActorType(Generic[ModelType]):
     Model: Type[ModelType]
     StateMachineClass: Type[StateMachine]
     
-    STATE_FIELD_NAME: ClassVar[str]
-    ACTIVE_STATE: ClassVar[ObjectState]
-    FINAL_STATES: ClassVar[ObjectStateList]
+    STATE_FIELD_NAME: ClassVar[str] = 'status'
+    ACTIVE_STATE: ClassVar[ObjectState] = 'started'
+    FINAL_STATES: ClassVar[ObjectStateList]                               # e.g. ['succeeded', 'failed', 'skipped'] or ['sealed']
     EVENT_NAME: ClassVar[str] = 'tick'                                    # the event name to trigger on the obj.sm: StateMachine (usually 'tick')
     
     CLAIM_ORDER: ClassVar[tuple[str, ...]] = ('retry_at',)                # the .order(*args) to claim the queue objects in, use ('?',) for random order
@@ -294,7 +294,7 @@ class ActorType(Generic[ModelType]):
     
     @classproperty
     def final_q(cls) -> Q:
-        """Get the filter for objects that are in a final state"""
+        """Get the filter for objects that are already completed / in a final state"""
         return Q(**{f'{cls.STATE_FIELD_NAME}__in': [cls._state_to_str(s) for s in cls.FINAL_STATES]})
     
     @classproperty

+ 2 - 3
archivebox/api/auth.py

@@ -1,15 +1,14 @@
 __package__ = 'archivebox.api'
 
-from typing import Any, Optional, cast
+from typing import Optional, cast
 from datetime import timedelta
 
 from django.http import HttpRequest
 from django.utils import timezone
-from django.contrib.auth import login
 from django.contrib.auth import authenticate
 from django.contrib.auth.models import AbstractBaseUser
 
-from ninja.security import HttpBearer, APIKeyQuery, APIKeyHeader, HttpBasicAuth, django_auth_superuser
+from ninja.security import HttpBearer, APIKeyQuery, APIKeyHeader, HttpBasicAuth
 from ninja.errors import HttpError
 
 

+ 117 - 0
archivebox/api/v1_actors.py

@@ -0,0 +1,117 @@
+__package__ = 'archivebox.api'
+
+from uuid import UUID
+from typing import List, Any
+from datetime import datetime
+
+
+from ninja import Router, Schema
+
+from .auth import API_AUTH_METHODS
+
+router = Router(tags=['Workers and Tasks'], auth=API_AUTH_METHODS)
+
+
+class TaskSchema(Schema):
+    TYPE: str
+    
+    id: UUID
+    abid: str
+    description: str
+
+    status: str
+    retry_at: datetime | None
+    
+    created_at: datetime
+    modified_at: datetime
+    created_by_id: int
+    
+    @staticmethod
+    def resolve_description(obj) -> str:
+        return str(obj)
+
+
+class ActorSchema(Schema):
+    # TYPE: str = 'actors.actor.ActorType'
+
+    # name: str
+    #pid: int | None
+    idle_count: int
+    launch_kwargs: dict[str, Any]
+    mode: str
+    
+    model: str
+    statemachine: str
+    STATE_FIELD_NAME: str
+    # ACTIVE_STATE: str
+    FINAL_STATES: list[str]
+    EVENT_NAME: str
+    CLAIM_ORDER: list[str]
+    CLAIM_FROM_TOP_N: int
+    CLAIM_ATOMIC: bool
+    MAX_TICK_TIME: int
+    MAX_CONCURRENT_ACTORS: int
+    
+    queue: list[TaskSchema]
+    past: list[TaskSchema]
+    
+    @staticmethod
+    def resolve_model(obj) -> str:
+        return obj.Model.__name__
+    
+    @staticmethod
+    def resolve_statemachine(obj) -> str:
+        return obj.StateMachineClass.__name__
+    
+    @staticmethod
+    def resolve_name(obj) -> str:
+        return str(obj)
+
+    # @staticmethod
+    # def resolve_ACTIVE_STATE(obj) -> str:
+    #     return str(obj.ACTIVE_STATE)
+    
+    @staticmethod
+    def resolve_FINAL_STATES(obj) -> list[str]:
+        return [str(state) for state in obj.FINAL_STATES]
+
+    @staticmethod
+    def resolve_queue(obj) -> list[TaskSchema]:
+        return [obj for obj in obj.qs.filter(obj.pending_q | obj.future_q | obj.active_q | obj.stalled_q).order_by('-retry_at')]
+
+    @staticmethod
+    def resolve_past(obj) -> list[TaskSchema]:
+        return [obj for obj in obj.qs.filter(obj.final_q).order_by('-modified_at')]
+
+
+class OrchestratorSchema(Schema):
+    # TYPE: str = 'actors.orchestrator.Orchestrator'
+
+    #pid: int | None
+    exit_on_idle: bool
+    mode: str
+
+    actors: list[ActorSchema]
+    
+    @staticmethod
+    def resolve_actors(obj) -> list[ActorSchema]:
+        return [actor() for actor in obj.actor_types.values()]
+
+
[email protected]("/orchestrators", response=List[OrchestratorSchema], url_name="get_orchestrators")
+def get_orchestrators(request):
+    """List all the task orchestrators (aka Orchestrators) that are currently running"""
+
+    from actors.orchestrator import Orchestrator
+    orchestrator = Orchestrator()
+
+    return [orchestrator]
+
+
[email protected]("/actors", response=List[ActorSchema], url_name="get_actors")
+def get_actors(request):
+    """List all the task consumer workers (aka Actors) that are currently running"""
+
+    from actors.orchestrator import Orchestrator
+    orchestrator = Orchestrator()
+    return orchestrator.actor_types.values()

+ 1 - 0
archivebox/api/v1_api.py

@@ -40,6 +40,7 @@ def register_urls(api: NinjaAPI) -> NinjaAPI:
     api.add_router('/auth/',     'api.v1_auth.router')
     api.add_router('/core/',     'api.v1_core.router')
     api.add_router('/cli/',      'api.v1_cli.router')
+    api.add_router('/jobs/',     'api.v1_actors.router')
     return api
 
 

+ 5 - 2
archivebox/api/v1_cli.py

@@ -1,5 +1,6 @@
 __package__ = 'archivebox.api'
 
+import json
 from typing import List, Dict, Any, Optional
 from enum import Enum
 
@@ -30,6 +31,7 @@ class CLICommandResponseSchema(Schema):
     success: bool
     errors: List[str]
     result: JSONType
+    result_format: str = 'str'
     stdout: str
     stderr: str
 
@@ -97,7 +99,7 @@ class ListCommandSchema(Schema):
     sort: str = 'bookmarked_at'
     as_json: bool = True
     as_html: bool = False
-    as_csv: str | bool = 'timestamp,url'
+    as_csv: str | None = 'timestamp,url'
     with_headers: bool = False
 
 class RemoveCommandSchema(Schema):
@@ -182,7 +184,7 @@ def cli_schedule(request, args: ScheduleCommandSchema):
 
 
 
[email protected]("/list", response=CLICommandResponseSchema, summary='archivebox list [args] [filter_patterns]')
[email protected]("/list", response=CLICommandResponseSchema, summary='archivebox list [args] [filter_patterns] (use this endpoint with ?filter_type=search to search for snapshots)')
 def cli_list(request, args: ListCommandSchema):
     result = list_all(
         filter_patterns=args.filter_patterns,
@@ -200,6 +202,7 @@ def cli_list(request, args: ListCommandSchema):
     result_format = 'txt'
     if args.as_json:
         result_format = "json"
+        result = json.loads(result)
     elif args.as_html:
         result_format = "html"
     elif args.as_csv:

+ 45 - 27
archivebox/api/v1_core.py

@@ -8,6 +8,7 @@ from datetime import datetime
 from django.db.models import Q
 from django.core.exceptions import ValidationError
 from django.contrib.auth import get_user_model
+from django.shortcuts import redirect
 
 from ninja import Router, Schema, FilterSchema, Field, Query
 from ninja.pagination import paginate, PaginationBase
@@ -66,38 +67,36 @@ class MinimalArchiveResultSchema(Schema):
     id: UUID
     abid: str
 
-    modified_at: datetime
-    created_at: datetime
+    created_at: datetime | None
+    modified_at: datetime | None
     created_by_id: str
     created_by_username: str
 
-    extractor: str
-    cmd_version: Optional[str]
-    cmd: List[str]
-    pwd: str
     status: str
-    output: str
+    retry_at: datetime | None
+    
+    extractor: str
+    cmd_version: str | None
+    cmd: list[str] | None
+    pwd: str | None
+    output: str | None
 
-    start_ts: Optional[datetime]
-    end_ts: Optional[datetime]
+    start_ts: datetime | None
+    end_ts: datetime | None
 
     @staticmethod
     def resolve_created_by_id(obj):
         return str(obj.created_by_id)
     
     @staticmethod
-    def resolve_created_by_username(obj):
+    def resolve_created_by_username(obj) -> str:
         User = get_user_model()
-        return User.objects.get(id=obj.created_by_id).username
+        return User.objects.filter(pk=obj.created_by_id).values_list('username', flat=True)[0]
 
     @staticmethod
     def resolve_abid(obj):
         return str(obj.ABID)
 
-    @staticmethod
-    def resolve_created_at(obj):
-        return obj.start_ts
-
     @staticmethod
     def resolve_snapshot_timestamp(obj):
         return obj.snapshot.timestamp
@@ -203,6 +202,9 @@ class SnapshotSchema(Schema):
     created_by_username: str
     created_at: datetime
     modified_at: datetime
+    
+    status: str
+    retry_at: datetime | None
 
     bookmarked_at: datetime
     downloaded_at: Optional[datetime]
@@ -421,6 +423,9 @@ class SeedSchema(Schema):
         User = get_user_model()
         return User.objects.get(id=obj.created_by_id).username
     
[email protected]("/seeds", response=List[SeedSchema], url_name="get_seeds")
+def get_seeds(request):
+    return Seed.objects.all().distinct()
 
 @router.get("/seed/{seed_id}", response=SeedSchema, url_name="get_seed")
 def get_seed(request, seed_id: str):
@@ -445,11 +450,12 @@ class CrawlSchema(Schema):
     created_at: datetime
     created_by_id: str
     created_by_username: str
+    
+    status: str
+    retry_at: datetime | None
 
     seed: SeedSchema
     max_depth: int
-    status: str
-    retry_at: datetime
     
     # snapshots: List[SnapshotSchema]
 
@@ -469,9 +475,14 @@ class CrawlSchema(Schema):
         return Snapshot.objects.none()
 
 
[email protected]("/crawls", response=List[CrawlSchema], url_name="get_crawls")
+def get_crawls(request):
+    return Crawl.objects.all().distinct()
+
 @router.get("/crawl/{crawl_id}", response=CrawlSchema, url_name="get_crawl")
 def get_crawl(request, crawl_id: str, with_snapshots: bool=False, with_archiveresults: bool=False):
     """Get a specific Crawl by id or abid."""
+    
     crawl = None
     request.with_snapshots = with_snapshots
     request.with_archiveresults = with_archiveresults
@@ -488,9 +499,10 @@ def get_crawl(request, crawl_id: str, with_snapshots: bool=False, with_archivere
     return crawl
 
 
-# [..., CrawlSchema]
[email protected]("/any/{abid}", response=Union[SnapshotSchema, ArchiveResultSchema, TagSchema], url_name="get_any")
[email protected]("/any/{abid}", response=Union[SnapshotSchema, ArchiveResultSchema, TagSchema, SeedSchema, CrawlSchema], url_name="get_any", summary="Get any object by its ABID or ID (e.g. snapshot, archiveresult, tag, seed, crawl, etc.)")
 def get_any(request, abid: str):
+    """Get any object by its ABID or ID (e.g. snapshot, archiveresult, tag, seed, crawl, etc.)."""
+    
     request.with_snapshots = False
     request.with_archiveresults = False
 
@@ -516,12 +528,18 @@ def get_any(request, abid: str):
     except Exception:
         pass
     
-    # try:
-    #     response = response or get_crawl(request, abid)
-    # except Exception:
-    #     pass
-
-    if not response:
-        raise HttpError(404, 'Object with given ABID not found')
+    try:
+        response = response or get_seed(request, abid)
+    except Exception:
+        pass
+    
+    try:
+        response = response or get_crawl(request, abid)
+    except Exception:
+        pass
+    
+    if response:
+        app_label, model_name = response._meta.app_label, response._meta.model_name
+        return redirect(f"/api/v1/{app_label}/{model_name}/{response.abid}?{request.META['QUERY_STRING']}")
 
-    return response
+    raise HttpError(404, 'Object with given ABID not found')

+ 9 - 2
archivebox/core/asgi.py

@@ -12,8 +12,12 @@ from archivebox.config.django import setup_django
 setup_django(in_memory_db=False, check_db=True)
 
 
+# from channels.auth import AuthMiddlewareStack
+# from channels.security.websocket import AllowedHostsOriginValidator
+from channels.routing import ProtocolTypeRouter  # , URLRouter
 from django.core.asgi import get_asgi_application
-from channels.routing import ProtocolTypeRouter
+
+# from core.routing import websocket_urlpatterns
 
 
 django_asgi_app = get_asgi_application()
@@ -21,6 +25,9 @@ django_asgi_app = get_asgi_application()
 application = ProtocolTypeRouter(
     {
         "http": django_asgi_app,
-        # Just HTTP for now. (We can add other protocols later.)
+        # only if we need websocket support later:
+        # "websocket": AllowedHostsOriginValidator(
+        #     AuthMiddlewareStack(URLRouter(websocket_urlpatterns))
+        # ),
     }
 )

+ 6 - 0
archivebox/core/settings.py

@@ -326,6 +326,12 @@ STORAGES = {
     # },
 }
 
+CHANNEL_LAYERS = {
+    "default": {
+        "BACKEND": "channels.layers.InMemoryChannelLayer"
+    }
+}
+
 ################################################################################
 ### Security Settings
 ################################################################################

+ 2 - 2
archivebox/crawls/models.py

@@ -150,8 +150,8 @@ class Crawl(ABIDModel, ModelWithHealthStats, ModelWithStateMachine):
         parser = (self.seed and self.seed.extractor) or 'auto'
         created_at = self.created_at.strftime("%Y-%m-%d %H:%M") if self.created_at else '<no timestamp set>'
         if self.id and self.seed:
-            return f'\\[{self.ABID}] {url[:64]} ({parser}) @ {created_at} ({self.label or "Untitled Crawl"})'
-        return f'\\[{self.abid_prefix}****not*saved*yet****] {url[:64]} ({parser}) @ {created_at} ({self.label or "Untitled Crawl"})'
+            return f'[{self.ABID}] {url[:64]} ({parser}) @ {created_at} ({self.label or "Untitled Crawl"})'
+        return f'[{self.abid_prefix}****not*saved*yet****] {url[:64]} ({parser}) @ {created_at} ({self.label or "Untitled Crawl"})'
         
     @classmethod
     def from_seed(cls, seed: Seed, max_depth: int=0, persona: str='Default', tags_str: str='', config: dict|None=None, created_by: int|None=None):

+ 25 - 7
archivebox/index/json.py

@@ -20,7 +20,7 @@ from archivebox.misc.util import enforce_types
 
 
 @enforce_types
-def generate_json_index_from_links(links: List[Link], with_headers: bool):
+def generate_json_index_from_links(links: List[Link], with_headers: bool=False):
     MAIN_INDEX_HEADER = {
         'info': 'This is an index of site data archived by ArchiveBox: The self-hosted web archive.',
         'schema': 'archivebox.index.json',
@@ -33,9 +33,9 @@ def generate_json_index_from_links(links: List[Link], with_headers: bool):
             'docs': 'https://github.com/ArchiveBox/ArchiveBox/wiki',
             'source': 'https://github.com/ArchiveBox/ArchiveBox',
             'issues': 'https://github.com/ArchiveBox/ArchiveBox/issues',
-            'dependencies': dict(abx.pm.hook.get_BINARIES()),
+            'dependencies': abx.as_dict(abx.pm.hook.get_BINARIES()),
         },
-    }
+    } if with_headers else {}
     
     if with_headers:
         output = {
@@ -137,13 +137,16 @@ class ExtendedEncoder(pyjson.JSONEncoder):
     """
 
     def default(self, obj):
-        cls_name = obj.__class__.__name__
+        cls_name = type(obj).__name__
 
         if hasattr(obj, '_asdict'):
             return obj._asdict()
 
         elif isinstance(obj, bytes):
             return obj.decode()
+        
+        elif isinstance(obj, Path):
+            return str(obj)
 
         elif isinstance(obj, datetime):
             return obj.isoformat()
@@ -152,12 +155,27 @@ class ExtendedEncoder(pyjson.JSONEncoder):
             return '{}: {}'.format(obj.__class__.__name__, obj)
 
         elif cls_name in ('dict_items', 'dict_keys', 'dict_values'):
-            return tuple(obj)
+            return list(obj)
+        
+        try:
+            return dict(obj)
+        except Exception:
+            pass
+        
+        try:
+            return list(obj)
+        except Exception:
+            pass
+        
+        try:
+            return str(obj)
+        except Exception:
+            pass
 
         return pyjson.JSONEncoder.default(self, obj)
 
 
 @enforce_types
-def to_json(obj: Any, indent: Optional[int]=4, sort_keys: bool=True, cls=ExtendedEncoder) -> str:
-    return pyjson.dumps(obj, indent=indent, sort_keys=sort_keys, cls=ExtendedEncoder)
+def to_json(obj: Any, indent: Optional[int]=4, sort_keys: bool=True, cls=ExtendedEncoder, default=None) -> str:
+    return pyjson.dumps(obj, indent=indent, sort_keys=sort_keys, cls=ExtendedEncoder, default=default)
 

+ 4 - 4
archivebox/main.py

@@ -944,7 +944,7 @@ def list_all(filter_patterns_str: Optional[str]=None,
              json: bool=False,
              html: bool=False,
              with_headers: bool=False,
-             out_dir: Path=DATA_DIR) -> Iterable[Link]:
+             out_dir: Path=DATA_DIR):
     """List, filter, and export information about archive entries"""
     
     check_data_folder()
@@ -976,15 +976,15 @@ def list_all(filter_patterns_str: Optional[str]=None,
     )
 
     if json: 
-        output = generate_json_index_from_links(folders.values(), with_headers)
+        output = generate_json_index_from_links(folders.values(), with_headers=with_headers)
     elif html:
-        output = generate_index_from_links(folders.values(), with_headers)
+        output = generate_index_from_links(folders.values(), with_headers=with_headers)
     elif csv:
         output = links_to_csv(folders.values(), cols=csv.split(','), header=with_headers)
     else:
         output = printable_folders(folders, with_headers=with_headers)
     print(output)
-    return folders
+    return output
 
 
 @enforce_types