| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454 |
- """
- WARNING: THIS FILE IS ALL LEGACY CODE TO BE REMOVED.
- DO NOT ADD ANY NEW FEATURES TO THIS FILE, NEW CODE GOES HERE: core/models.py
- These are the old types we used to use before ArchiveBox v0.4 (before we switched to Django).
- """
- __package__ = 'archivebox.index'
- from pathlib import Path
- from datetime import datetime, timezone, timedelta
- from typing import List, Dict, Any, Optional, Union, ClassVar
- from pydantic import BaseModel, ConfigDict, Field, field_validator, computed_field
- from benedict import benedict
- from archivebox.config import ARCHIVE_DIR, CONSTANTS
- from archivebox.misc.util import parse_date
- class ArchiveError(Exception):
- def __init__(self, message, hints=None):
- super().__init__(message)
- self.hints = hints
- # Type aliases
- LinkDict = Dict[str, Any]
- ArchiveOutput = Union[str, Exception, None]
- class ArchiveResult(BaseModel):
- model_config = ConfigDict(arbitrary_types_allowed=True)
-
- TYPE: str = 'index.schema.ArchiveResult'
- cmd: list[str]
- pwd: str | None = None
- cmd_version: str | None = None
- output: ArchiveOutput | None = None
- status: str
- start_ts: datetime
- end_ts: datetime
- index_texts: list[str] | None = None
- # Class variables for compatibility
- _field_names: ClassVar[list[str] | None] = None
- @field_validator('status')
- @classmethod
- def validate_status(cls, v: str) -> str:
- if not v:
- raise ValueError('status must be a non-empty string')
- return v
- @field_validator('cmd')
- @classmethod
- def validate_cmd(cls, v: List[str]) -> List[str]:
- if not all(isinstance(arg, str) and arg for arg in v):
- raise ValueError('all command arguments must be non-empty strings')
- return v
- @field_validator('pwd')
- @classmethod
- def validate_pwd(cls, v: Optional[str]) -> Optional[str]:
- if v == '': # Convert empty string to None for consistency
- return None
- return v
- @field_validator('cmd_version')
- @classmethod
- def validate_cmd_version(cls, v: Optional[str]) -> Optional[str]:
- if v == '': # Convert empty string to None for consistency
- return None
- return v
- def model_dump(self, **kwargs) -> dict:
- """Backwards compatible with _asdict()"""
- return super().model_dump(**kwargs)
- @classmethod
- def field_names(cls) -> List[str]:
- """Get all field names of the model"""
- if cls._field_names is None:
- cls._field_names = list(cls.model_fields.keys())
- return cls._field_names
- @classmethod
- def guess_ts(cls, dict_info: dict) -> tuple[datetime, datetime]:
- """Guess timestamps from dictionary info"""
-
- parsed_timestamp = parse_date(dict_info["timestamp"])
- start_ts = parsed_timestamp
- end_ts = parsed_timestamp + timedelta(seconds=int(dict_info["duration"]))
- return start_ts, end_ts
- @classmethod
- def from_json(cls, json_info: dict, guess: bool = False) -> 'ArchiveResult':
- """Create instance from JSON data"""
-
- info = {
- key: val
- for key, val in json_info.items()
- if key in cls.field_names()
- }
- if guess:
- if "start_ts" not in info:
- info["start_ts"], info["end_ts"] = cls.guess_ts(json_info)
- else:
- info['start_ts'] = parse_date(info['start_ts'])
- info['end_ts'] = parse_date(info['end_ts'])
-
- if "pwd" not in info:
- info["pwd"] = str(ARCHIVE_DIR / json_info["timestamp"])
- if "cmd_version" not in info:
- info["cmd_version"] = "Undefined"
- if "cmd" not in info:
- info["cmd"] = []
- else:
- info['start_ts'] = parse_date(info['start_ts'])
- info['end_ts'] = parse_date(info['end_ts'])
- info['cmd_version'] = info.get('cmd_version')
- # Handle string command as list
- if isinstance(info.get("cmd"), str):
- info["cmd"] = [info["cmd"]]
- return cls(**info)
- def to_dict(self, *keys: str) -> dict:
- """Convert to dictionary, optionally filtering by keys"""
- data = self.model_dump()
- if keys:
- return {k: v for k, v in data.items() if k in keys}
- return data
- def to_json(self, indent: int = 4, sort_keys: bool = True) -> str:
- """Convert to JSON string"""
- return self.model_dump_json(indent=indent, exclude_none=True)
- def to_csv(self, cols: Optional[List[str]] = None, separator: str = ',', ljust: int = 0) -> str:
- """Convert to CSV string"""
- data = self.model_dump()
- cols = cols or self.field_names()
- return separator.join(str(data.get(col, '')).ljust(ljust) for col in cols)
- @computed_field
- def duration(self) -> int:
- """Calculate duration in seconds between start and end timestamps"""
- return int((self.end_ts - self.start_ts).total_seconds())
-
-
- class Link(BaseModel):
- model_config = ConfigDict(arbitrary_types_allowed=True)
-
- TYPE: str = 'index.schema.Link'
- timestamp: str
- url: str
- title: str | None = None
- tags: str | None = None
- sources: list[str] = Field(default_factory=list)
- history: dict[str, list[ArchiveResult]] = Field(default_factory=dict)
- downloaded_at: datetime | None = None
- # Class variables for compatibility
- _field_names: ClassVar[list[str] | None] = None
- def __str__(self) -> str:
- return f'[{self.timestamp}] {self.url} "{self.title}"'
- def __eq__(self, other: Any) -> bool:
- if not isinstance(other, Link):
- return NotImplemented
- return self.url == other.url
- def __gt__(self, other: Any) -> bool:
- if not isinstance(other, Link):
- return NotImplemented
- if not self.timestamp or not other.timestamp:
- return NotImplemented
- return float(self.timestamp) > float(other.timestamp)
- @field_validator('timestamp')
- @classmethod
- def validate_timestamp(cls, v: str) -> str:
- if not v:
- raise ValueError('timestamp must be a non-empty string')
- if not v.replace('.', '').isdigit():
- raise ValueError('timestamp must be a float str')
- return v
- @field_validator('url')
- @classmethod
- def validate_url(cls, v: str) -> str:
- if not v or '://' not in v:
- raise ValueError('url must be a valid URL string')
- return v
- @field_validator('title')
- @classmethod
- def validate_title(cls, v: Optional[str]) -> Optional[str]:
- if v is not None and not v:
- raise ValueError('title must be a non-empty string if provided')
- return v
- @field_validator('sources')
- @classmethod
- def validate_sources(cls, v: List[str]) -> List[str]:
- if not all(isinstance(source, str) and source for source in v):
- raise ValueError('all sources must be non-empty strings')
- return v
- # Backwards compatibility methods
- def _asdict(self, extended: bool = False) -> dict:
- return benedict(self)
- def overwrite(self, **kwargs) -> 'Link':
- """Pure functional version of dict.update that returns a new instance"""
- current_data = self.model_dump()
- current_data.update(kwargs)
- return Link(**current_data)
- @classmethod
- def field_names(cls) -> list[str]:
- if cls._field_names is None:
- cls._field_names = list(cls.model_fields.keys())
- return cls._field_names
- @classmethod
- def from_json(cls, json_info: dict, guess: bool = False) -> 'Link':
- info = {
- key: val
- for key, val in json_info.items()
- if key in cls.field_names()
- }
-
- # Handle downloaded_at
- info['downloaded_at'] = cls._parse_date(info.get('updated') or info.get('downloaded_at'))
- info['sources'] = info.get('sources') or []
- # Handle history
- json_history = info.get('history') or {}
- cast_history = {}
- for method, method_history in json_history.items():
- cast_history[method] = []
- for json_result in method_history:
- assert isinstance(json_result, dict), 'Items in Link["history"][method] must be dicts'
- cast_result = ArchiveResult.from_json(json_result, guess)
- cast_history[method].append(cast_result)
- info['history'] = cast_history
- return cls(**info)
- def to_json(self, indent: int = 4, sort_keys: bool = True) -> str:
- return self.model_dump_json(indent=indent)
- def to_csv(self, cols: Optional[List[str]] = None, separator: str = ',', ljust: int = 0) -> str:
- data = self.model_dump()
- cols = cols or self.field_names()
- return separator.join(str(data.get(col, '')).ljust(ljust) for col in cols)
- # Properties for compatibility
- @property
- def link_dir(self) -> str:
- return str(ARCHIVE_DIR / self.timestamp)
- @property
- def archive_path(self) -> str:
- return f'{CONSTANTS.ARCHIVE_DIR_NAME}/{self.timestamp}'
- @computed_field
- def bookmarked_date(self) -> Optional[str]:
- max_ts = (datetime.now(timezone.utc) + timedelta(days=30)).timestamp()
- if self.timestamp and self.timestamp.replace('.', '').isdigit():
- if 0 < float(self.timestamp) < max_ts:
- return self._ts_to_date_str(datetime.fromtimestamp(float(self.timestamp)))
- return str(self.timestamp)
- return None
- @computed_field
- def downloaded_datestr(self) -> Optional[str]:
- return self._ts_to_date_str(self.downloaded_at) if self.downloaded_at else None
- @property
- def archive_dates(self) -> list[datetime]:
- return [
- self._parse_date(result.start_ts) # type: ignore
- for results in self.history.values()
- for result in results
- ]
- @property
- def oldest_archive_date(self) -> Optional[datetime]:
- dates = self.archive_dates
- return min(dates) if dates else None
- @property
- def newest_archive_date(self) -> Optional[datetime]:
- dates = self.archive_dates
- return max(dates) if dates else None
- @property
- def num_outputs(self) -> int:
- try:
- return self.as_snapshot().num_outputs
- except Exception:
- return 0
- @property
- def num_failures(self) -> int:
- return sum(
- 1 for results in self.history.values()
- for result in results
- if result.status == 'failed')
- def latest_outputs(self, status: Optional[str] = None) -> dict[str, Any]:
- """Get the latest output that each archive method produced for link"""
- ARCHIVE_METHODS = (
- 'title', 'favicon', 'wget', 'warc', 'singlefile', 'pdf',
- 'screenshot', 'dom', 'git', 'media', 'archive_org',
- )
- latest: Dict[str, Any] = {}
- for archive_method in ARCHIVE_METHODS:
- # get most recent succesful result in history for each archive method
- history = self.history.get(archive_method) or []
- history = list(filter(lambda result: result.output, reversed(history)))
- if status is not None:
- history = list(filter(lambda result: result.status == status, history))
- history = list(history)
- latest[archive_method] = history[0].output if history else None
- return latest
- def canonical_outputs(self) -> Dict[str, Optional[str]]:
- """Predict the expected output paths that should be present after archiving"""
- # You'll need to implement the actual logic based on your requirements
- # TODO: banish this awful duplication from the codebase and import these
- # from their respective extractor files
- from abx_plugin_favicon.config import FAVICON_CONFIG
- canonical = {
- 'index_path': 'index.html',
- 'favicon_path': 'favicon.ico',
- 'google_favicon_path': FAVICON_CONFIG.FAVICON_PROVIDER.format(self.domain),
- 'wget_path': f'warc/{self.timestamp}',
- 'warc_path': 'warc/',
- 'singlefile_path': 'singlefile.html',
- 'readability_path': 'readability/content.html',
- 'mercury_path': 'mercury/content.html',
- 'htmltotext_path': 'htmltotext.txt',
- 'pdf_path': 'output.pdf',
- 'screenshot_path': 'screenshot.png',
- 'dom_path': 'output.html',
- 'archive_org_path': f'https://web.archive.org/web/{self.base_url}',
- 'git_path': 'git/',
- 'media_path': 'media/',
- 'headers_path': 'headers.json',
- }
-
- if self.is_static:
- static_path = f'warc/{self.timestamp}'
- canonical.update({
- 'title': self.basename,
- 'wget_path': static_path,
- 'pdf_path': static_path,
- 'screenshot_path': static_path,
- 'dom_path': static_path,
- 'singlefile_path': static_path,
- 'readability_path': static_path,
- 'mercury_path': static_path,
- 'htmltotext_path': static_path,
- })
- return canonical
- # URL helper properties
- @property
- def url_hash(self) -> str:
- # Implement your URL hashing logic here
- from hashlib import sha256
- return sha256(self.url.encode()).hexdigest()[:8]
- @property
- def scheme(self) -> str:
- return self.url.split('://')[0]
- @property
- def domain(self) -> str:
- return self.url.split('://')[1].split('/')[0]
- @property
- def path(self) -> str:
- parts = self.url.split('://', 1)
- return '/' + parts[1].split('/', 1)[1] if len(parts) > 1 and '/' in parts[1] else '/'
- @property
- def basename(self) -> str:
- return self.path.split('/')[-1]
- @property
- def extension(self) -> str:
- basename = self.basename
- return basename.split('.')[-1] if '.' in basename else ''
- @property
- def base_url(self) -> str:
- return f'{self.scheme}://{self.domain}'
- @property
- def is_static(self) -> bool:
- static_extensions = {'.pdf', '.jpg', '.jpeg', '.png', '.gif', '.webp', '.svg', '.mp4', '.mp3', '.wav', '.webm'}
- return any(self.url.lower().endswith(ext) for ext in static_extensions)
- @property
- def is_archived(self) -> bool:
- output_paths = (
- self.domain,
- 'output.html',
- 'output.pdf',
- 'screenshot.png',
- 'singlefile.html',
- 'readability/content.html',
- 'mercury/content.html',
- 'htmltotext.txt',
- 'media',
- 'git',
- )
- return any((Path(ARCHIVE_DIR) / self.timestamp / path).exists() for path in output_paths)
- def as_snapshot(self):
- """Implement this based on your Django model requirements"""
- from core.models import Snapshot
- return Snapshot.objects.get(url=self.url)
- # Helper methods
- @staticmethod
- def _ts_to_date_str(dt: Optional[datetime]) -> Optional[str]:
- return dt.strftime('%Y-%m-%d %H:%M:%S') if dt else None
- @staticmethod
- def _parse_date(date_str: Optional[str]) -> Optional[datetime]:
- if not date_str:
- return None
- try:
- return datetime.fromisoformat(date_str.replace('Z', '+00:00'))
- except ValueError:
- try:
- return datetime.fromtimestamp(float(date_str))
- except (ValueError, TypeError):
- return None
|