Bläddra i källkod

add proper typechecked json parsing and dumping

Nick Sweeting 6 år sedan
förälder
incheckning
73f46b0b29
3 ändrade filer med 75 tillägg och 22 borttagningar
  1. 1 7
      archivebox/index.py
  2. 72 13
      archivebox/schema.py
  3. 2 2
      archivebox/util.py

+ 1 - 7
archivebox/index.py

@@ -121,18 +121,12 @@ def write_json_links_index(links: List[Link], out_dir: str=OUTPUT_DIR) -> None:
 def parse_json_links_index(out_dir: str=OUTPUT_DIR) -> Iterator[Link]:
     """parse a archive index json file and return the list of links"""
 
-    allowed_fields = {f.name for f in fields(Link)}
-
     index_path = os.path.join(out_dir, 'index.json')
     if os.path.exists(index_path):
         with open(index_path, 'r', encoding='utf-8') as f:
             links = json.load(f)['links']
             for link_json in links:
-                yield Link(**{
-                    key: val
-                    for key, val in link_json.items()
-                    if key in allowed_fields
-                })
+                yield Link.from_json(link_json)
 
     return ()
 

+ 72 - 13
archivebox/schema.py

@@ -4,7 +4,7 @@ from datetime import datetime
 
 from typing import List, Dict, Any, Optional, Union
 
-from dataclasses import dataclass, asdict, field
+from dataclasses import dataclass, asdict, field, fields
 
 
 class ArchiveError(Exception):
@@ -28,11 +28,38 @@ class ArchiveResult:
     schema: str = 'ArchiveResult'
 
     def __post_init__(self):
-        assert self.schema == self.__class__.__name__
+        self.typecheck()
 
     def _asdict(self):
         return asdict(self)
 
+    def typecheck(self) -> None:
+        assert self.schema == self.__class__.__name__
+        assert isinstance(self.status, str) and self.status
+        assert isinstance(self.start_ts, datetime)
+        assert isinstance(self.end_ts, datetime)
+        assert isinstance(self.cmd, list)
+        assert all(isinstance(arg, str) and arg for arg in self.cmd)
+        assert self.pwd is None or isinstance(self.pwd, str) and self.pwd
+        assert self.cmd_version is None or isinstance(self.cmd_version, str) and self.cmd_version
+        assert self.output is None or isinstance(self.output, (str, Exception))
+        if isinstance(self.output, str):
+            assert self.output
+
+    @classmethod
+    def from_json(cls, json_info):
+        from .util import parse_date
+
+        allowed_fields = {f.name for f in fields(cls)}
+        info = {
+            key: val
+            for key, val in json_info.items()
+            if key in allowed_fields
+        }
+        info['start_ts'] = parse_date(info['start_ts'])
+        info['end_ts'] = parse_date(info['end_ts'])
+        return cls(**info)
+
     @property
     def duration(self) -> int:
         return (self.end_ts - self.start_ts).seconds
@@ -49,17 +76,7 @@ class Link:
     schema: str = 'Link'
 
     def __post_init__(self):
-        """fix any history result items to be type-checked ArchiveResults"""
-        assert self.schema == self.__class__.__name__
-        cast_history = {}
-        for method, method_history in self.history.items():
-            cast_history[method] = []
-            for result in method_history:
-                if isinstance(result, dict):
-                    result = ArchiveResult(**result)
-                cast_history[method].append(result)
-
-        object.__setattr__(self, 'history', cast_history)
+        self.typecheck()
 
     def overwrite(self, **kwargs):
         """pure functional version of dict.update that returns a new instance"""
@@ -76,6 +93,22 @@ class Link:
         if not self.timestamp or not other.timestamp:
             return 
         return float(self.timestamp) > float(other.timestamp)
+
+    def typecheck(self) -> None:
+        assert self.schema == self.__class__.__name__
+        assert isinstance(self.timestamp, str) and self.timestamp
+        assert self.timestamp.replace('.', '').isdigit()
+        assert isinstance(self.url, str) and '://' in self.url
+        assert self.updated is None or isinstance(self.updated, datetime)
+        assert self.title is None or isinstance(self.title, str) and self.title
+        assert self.tags is None or isinstance(self.tags, str) and self.tags
+        assert isinstance(self.sources, list)
+        assert all(isinstance(source, str) and source for source in self.sources)
+        assert isinstance(self.history, dict)
+        for method, results in self.history.items():
+            assert isinstance(method, str) and method
+            assert isinstance(results, list)
+            assert all(isinstance(result, ArchiveResult) for result in results)
     
     def _asdict(self, extended=False):
         info = {
@@ -108,6 +141,32 @@ class Link:
             })
         return info
 
+    @classmethod
+    def from_json(cls, json_info):
+        from .util import parse_date
+        
+        allowed_fields = {f.name for f in fields(cls)}
+        info = {
+            key: val
+            for key, val in json_info.items()
+            if key in allowed_fields
+        }
+        info['updated'] = parse_date(info['updated'])
+
+        json_history = info['history']
+        cast_history = {}
+
+        for method, method_history in json_history.items():
+            cast_history[method] = []
+            for json_result in method_history:
+                assert isinstance(json_result, dict), 'Items in Link["history"][method] must be dicts'
+                cast_result = ArchiveResult.from_json(json_result)
+                cast_history[method].append(cast_result)
+
+        info['history'] = cast_history
+        return cls(**info)
+
+
     @property
     def link_dir(self) -> str:
         from .config import ARCHIVE_DIR

+ 2 - 2
archivebox/util.py

@@ -675,8 +675,8 @@ class ExtendedEncoder(JSONEncoder):
         return JSONEncoder.default(self, obj)
 
 
-def atomic_write(contents: Union[dict, str], path: str):
-    """Safe atomic file write and swap using a tmp file"""
+def atomic_write(contents: Union[dict, str], path: str) -> None:
+    """Safe atomic write to filesystem by writing to temp file + atomic rename"""
     try:
         tmp_file = '{}.tmp'.format(path)
         with open(tmp_file, 'w+', encoding='utf-8') as f: