|
@@ -1,561 +0,0 @@
|
|
|
-__package__ = 'archivebox.plugantic'
|
|
|
|
|
-
|
|
|
|
|
-import os
|
|
|
|
|
-import shutil
|
|
|
|
|
-import operator
|
|
|
|
|
-
|
|
|
|
|
-from typing import Callable, Any, Optional, Type, Dict, Annotated, ClassVar, Literal, cast, TYPE_CHECKING
|
|
|
|
|
-from typing_extensions import Self
|
|
|
|
|
-from abc import ABC, abstractmethod
|
|
|
|
|
-from collections import namedtuple
|
|
|
|
|
-from pathlib import Path
|
|
|
|
|
-from subprocess import run, PIPE
|
|
|
|
|
-
|
|
|
|
|
-from pydantic_core import core_schema, ValidationError
|
|
|
|
|
-from pydantic import BaseModel, Field, TypeAdapter, AfterValidator, validate_call, GetCoreSchemaHandler
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-def func_takes_args_or_kwargs(lambda_func: Callable[..., Any]) -> bool:
|
|
|
|
|
- """returns True if a lambda func takes args/kwargs of any kind, otherwise false if it's pure/argless"""
|
|
|
|
|
- code = lambda_func.__code__
|
|
|
|
|
- has_args = code.co_argcount > 0
|
|
|
|
|
- has_varargs = code.co_flags & 0x04 != 0
|
|
|
|
|
- has_varkw = code.co_flags & 0x08 != 0
|
|
|
|
|
- return has_args or has_varargs or has_varkw
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-def is_semver_str(semver: Any) -> bool:
|
|
|
|
|
- if isinstance(semver, str):
|
|
|
|
|
- return (semver.count('.') == 2 and semver.replace('.', '').isdigit())
|
|
|
|
|
- return False
|
|
|
|
|
-
|
|
|
|
|
-def semver_to_str(semver: tuple[int, int, int] | str) -> str:
|
|
|
|
|
- if isinstance(semver, (list, tuple)):
|
|
|
|
|
- return '.'.join(str(chunk) for chunk in semver)
|
|
|
|
|
- if is_semver_str(semver):
|
|
|
|
|
- return semver
|
|
|
|
|
- raise ValidationError('Tried to convert invalid SemVer: {}'.format(semver))
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-SemVerTuple = namedtuple('SemVerTuple', ('major', 'minor', 'patch'), defaults=(0, 0, 0))
|
|
|
|
|
-SemVerParsableTypes = str | tuple[str | int, ...] | list[str | int]
|
|
|
|
|
-
|
|
|
|
|
-class SemVer(SemVerTuple):
|
|
|
|
|
- major: int
|
|
|
|
|
- minor: int = 0
|
|
|
|
|
- patch: int = 0
|
|
|
|
|
-
|
|
|
|
|
- if TYPE_CHECKING:
|
|
|
|
|
- full_text: str | None = ''
|
|
|
|
|
-
|
|
|
|
|
- def __new__(cls, *args, full_text=None, **kwargs):
|
|
|
|
|
- # '1.1.1'
|
|
|
|
|
- if len(args) == 1 and is_semver_str(args[0]):
|
|
|
|
|
- result = SemVer.parse(args[0])
|
|
|
|
|
-
|
|
|
|
|
- # ('1', '2', '3')
|
|
|
|
|
- elif len(args) == 1 and isinstance(args[0], (tuple, list)):
|
|
|
|
|
- result = SemVer.parse(args[0])
|
|
|
|
|
-
|
|
|
|
|
- # (1, '2', None)
|
|
|
|
|
- elif not all(isinstance(arg, (int, type(None))) for arg in args):
|
|
|
|
|
- result = SemVer.parse(args)
|
|
|
|
|
-
|
|
|
|
|
- # (None)
|
|
|
|
|
- elif all(chunk in ('', 0, None) for chunk in (*args, *kwargs.values())):
|
|
|
|
|
- result = None
|
|
|
|
|
-
|
|
|
|
|
- # 1, 2, 3
|
|
|
|
|
- else:
|
|
|
|
|
- result = SemVerTuple.__new__(cls, *args, **kwargs)
|
|
|
|
|
-
|
|
|
|
|
- if result is not None:
|
|
|
|
|
- # add first line as extra hidden metadata so it can be logged without having to re-run version cmd
|
|
|
|
|
- result.full_text = full_text or str(result)
|
|
|
|
|
- return result
|
|
|
|
|
-
|
|
|
|
|
- @classmethod
|
|
|
|
|
- def parse(cls, version_stdout: SemVerParsableTypes) -> Self | None:
|
|
|
|
|
- """
|
|
|
|
|
- parses a version tag string formatted like into (major, minor, patch) ints
|
|
|
|
|
- 'Google Chrome 124.0.6367.208' -> (124, 0, 6367)
|
|
|
|
|
- 'GNU Wget 1.24.5 built on darwin23.2.0.' -> (1, 24, 5)
|
|
|
|
|
- 'curl 8.4.0 (x86_64-apple-darwin23.0) ...' -> (8, 4, 0)
|
|
|
|
|
- '2024.04.09' -> (2024, 4, 9)
|
|
|
|
|
-
|
|
|
|
|
- """
|
|
|
|
|
- # print('INITIAL_VALUE', type(version_stdout).__name__, version_stdout)
|
|
|
|
|
-
|
|
|
|
|
- if isinstance(version_stdout, (tuple, list)):
|
|
|
|
|
- version_stdout = '.'.join(str(chunk) for chunk in version_stdout)
|
|
|
|
|
- elif isinstance(version_stdout, bytes):
|
|
|
|
|
- version_stdout = version_stdout.decode()
|
|
|
|
|
- elif not isinstance(version_stdout, str):
|
|
|
|
|
- version_stdout = str(version_stdout)
|
|
|
|
|
-
|
|
|
|
|
- # no text to work with, return None immediately
|
|
|
|
|
- if not version_stdout.strip():
|
|
|
|
|
- # raise Exception('Tried to parse semver from empty version output (is binary installed and available?)')
|
|
|
|
|
- return None
|
|
|
|
|
-
|
|
|
|
|
- just_numbers = lambda col: col.lower().strip('v').split('+')[0].split('-')[0].split('_')[0]
|
|
|
|
|
- contains_semver = lambda col: (
|
|
|
|
|
- col.count('.') in (1, 2, 3)
|
|
|
|
|
- and all(chunk.isdigit() for chunk in col.split('.')[:3]) # first 3 chunks can only be nums
|
|
|
|
|
- )
|
|
|
|
|
-
|
|
|
|
|
- full_text = version_stdout.split('\n')[0].strip()
|
|
|
|
|
- first_line_columns = full_text.split()[:4]
|
|
|
|
|
- version_columns = list(filter(contains_semver, map(just_numbers, first_line_columns)))
|
|
|
|
|
-
|
|
|
|
|
- # could not find any column of first line that looks like a version number, despite there being some text
|
|
|
|
|
- if not version_columns:
|
|
|
|
|
- # raise Exception('Failed to parse semver from version command output: {}'.format(' '.join(first_line_columns)))
|
|
|
|
|
- return None
|
|
|
|
|
-
|
|
|
|
|
- # take first col containing a semver, and truncate it to 3 chunks (e.g. 2024.04.09.91) -> (2024, 04, 09)
|
|
|
|
|
- first_version_tuple = version_columns[0].split('.', 3)[:3]
|
|
|
|
|
-
|
|
|
|
|
- # print('FINAL_VALUE', first_version_tuple)
|
|
|
|
|
-
|
|
|
|
|
- return cls(*(int(chunk) for chunk in first_version_tuple), full_text=full_text)
|
|
|
|
|
-
|
|
|
|
|
- def __str__(self):
|
|
|
|
|
- return '.'.join(str(chunk) for chunk in self)
|
|
|
|
|
-
|
|
|
|
|
- # @classmethod
|
|
|
|
|
- # def __get_pydantic_core_schema__(cls, source: Type[Any], handler: GetCoreSchemaHandler) -> core_schema.CoreSchema:
|
|
|
|
|
- # default_schema = handler(source)
|
|
|
|
|
- # return core_schema.no_info_after_validator_function(
|
|
|
|
|
- # cls.parse,
|
|
|
|
|
- # default_schema,
|
|
|
|
|
- # serialization=core_schema.plain_serializer_function_ser_schema(
|
|
|
|
|
- # lambda semver: str(semver),
|
|
|
|
|
- # info_arg=False,
|
|
|
|
|
- # return_schema=core_schema.str_schema(),
|
|
|
|
|
- # ),
|
|
|
|
|
- # )
|
|
|
|
|
-
|
|
|
|
|
-assert SemVer(None) == None
|
|
|
|
|
-assert SemVer('') == None
|
|
|
|
|
-assert SemVer.parse('') == None
|
|
|
|
|
-assert SemVer(1) == (1, 0, 0)
|
|
|
|
|
-assert SemVer(1, 2) == (1, 2, 0)
|
|
|
|
|
-assert SemVer('1.2+234234') == (1, 2, 0)
|
|
|
|
|
-assert SemVer((1, 2, 3)) == (1, 2, 3)
|
|
|
|
|
-assert getattr(SemVer((1, 2, 3)), 'full_text') == '1.2.3'
|
|
|
|
|
-assert SemVer(('1', '2', '3')) == (1, 2, 3)
|
|
|
|
|
-assert SemVer.parse('5.6.7') == (5, 6, 7)
|
|
|
|
|
-assert SemVer.parse('124.0.6367.208') == (124, 0, 6367)
|
|
|
|
|
-assert SemVer.parse('Google Chrome 124.1+234.234') == (124, 1, 0)
|
|
|
|
|
-assert SemVer.parse('Google Ch1rome 124.0.6367.208') == (124, 0, 6367)
|
|
|
|
|
-assert SemVer.parse('Google Chrome 124.0.6367.208+beta_234. 234.234.123\n123.456.324') == (124, 0, 6367)
|
|
|
|
|
-assert getattr(SemVer.parse('Google Chrome 124.0.6367.208+beta_234. 234.234.123\n123.456.324'), 'full_text') == 'Google Chrome 124.0.6367.208+beta_234. 234.234.123'
|
|
|
|
|
-assert SemVer.parse('Google Chrome') == None
|
|
|
|
|
-
|
|
|
|
|
-@validate_call
|
|
|
|
|
-def bin_name(bin_path_or_name: str | Path) -> str:
|
|
|
|
|
- name = Path(bin_path_or_name).name
|
|
|
|
|
- assert len(name) > 1
|
|
|
|
|
- assert name.replace('-', '').replace('_', '').replace('.', '').isalnum(), (
|
|
|
|
|
- f'Binary name can only contain a-Z0-9-_.: {name}')
|
|
|
|
|
- return name
|
|
|
|
|
-
|
|
|
|
|
-BinName = Annotated[str, AfterValidator(bin_name)]
|
|
|
|
|
-
|
|
|
|
|
-@validate_call
|
|
|
|
|
-def path_is_file(path: Path | str) -> Path:
|
|
|
|
|
- path = Path(path) if isinstance(path, str) else path
|
|
|
|
|
- assert path.is_file(), f'Path is not a file: {path}'
|
|
|
|
|
- return path
|
|
|
|
|
-
|
|
|
|
|
-HostExistsPath = Annotated[Path, AfterValidator(path_is_file)]
|
|
|
|
|
-
|
|
|
|
|
-@validate_call
|
|
|
|
|
-def path_is_executable(path: HostExistsPath) -> HostExistsPath:
|
|
|
|
|
- assert os.access(path, os.X_OK), f'Path is not executable (fix by running chmod +x {path})'
|
|
|
|
|
- return path
|
|
|
|
|
-
|
|
|
|
|
-@validate_call
|
|
|
|
|
-def path_is_script(path: HostExistsPath) -> HostExistsPath:
|
|
|
|
|
- SCRIPT_EXTENSIONS = ('.py', '.js', '.sh')
|
|
|
|
|
- assert path.suffix.lower() in SCRIPT_EXTENSIONS, 'Path is not a script (does not end in {})'.format(', '.join(SCRIPT_EXTENSIONS))
|
|
|
|
|
- return path
|
|
|
|
|
-
|
|
|
|
|
-HostExecutablePath = Annotated[HostExistsPath, AfterValidator(path_is_executable)]
|
|
|
|
|
-
|
|
|
|
|
-@validate_call
|
|
|
|
|
-def path_is_abspath(path: Path) -> Path:
|
|
|
|
|
- return path.resolve()
|
|
|
|
|
-
|
|
|
|
|
-HostAbsPath = Annotated[HostExistsPath, AfterValidator(path_is_abspath)]
|
|
|
|
|
-HostBinPath = Annotated[Path, AfterValidator(path_is_abspath), AfterValidator(path_is_file)]
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-@validate_call
|
|
|
|
|
-def bin_abspath(bin_path_or_name: BinName | Path) -> HostBinPath | None:
|
|
|
|
|
- assert bin_path_or_name
|
|
|
|
|
-
|
|
|
|
|
- if str(bin_path_or_name).startswith('/'):
|
|
|
|
|
- # already a path, get its absolute form
|
|
|
|
|
- abspath = Path(bin_path_or_name).resolve()
|
|
|
|
|
- else:
|
|
|
|
|
- # not a path yet, get path using os.which
|
|
|
|
|
- binpath = shutil.which(bin_path_or_name)
|
|
|
|
|
- if not binpath:
|
|
|
|
|
- return None
|
|
|
|
|
- abspath = Path(binpath).resolve()
|
|
|
|
|
-
|
|
|
|
|
- try:
|
|
|
|
|
- return TypeAdapter(HostBinPath).validate_python(abspath)
|
|
|
|
|
- except ValidationError:
|
|
|
|
|
- return None
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-@validate_call
|
|
|
|
|
-def bin_version(bin_path: HostBinPath, args=('--version',)) -> SemVer | None:
|
|
|
|
|
- return SemVer(run([bin_path, *args], stdout=PIPE).stdout.strip().decode())
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-class InstalledBin(BaseModel):
|
|
|
|
|
- abspath: HostBinPath
|
|
|
|
|
- version: SemVer
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-def is_valid_install_string(pkgs_str: str) -> str:
|
|
|
|
|
- """Make sure a string is a valid install string for a package manager, e.g. 'yt-dlp ffmpeg'"""
|
|
|
|
|
- assert pkgs_str
|
|
|
|
|
- assert all(len(pkg) > 1 for pkg in pkgs_str.split(' '))
|
|
|
|
|
- return pkgs_str
|
|
|
|
|
-
|
|
|
|
|
-def is_valid_python_dotted_import(import_str: str) -> str:
|
|
|
|
|
- assert import_str and import_str.replace('.', '').replace('_', '').isalnum()
|
|
|
|
|
- return import_str
|
|
|
|
|
-
|
|
|
|
|
-InstallStr = Annotated[str, AfterValidator(is_valid_install_string)]
|
|
|
|
|
-
|
|
|
|
|
-LazyImportStr = Annotated[str, AfterValidator(is_valid_python_dotted_import)]
|
|
|
|
|
-
|
|
|
|
|
-ProviderHandler = Callable[..., Any] | Callable[[], Any] # must take no args [], or [bin_name: str, **kwargs]
|
|
|
|
|
-#ProviderHandlerStr = Annotated[str, AfterValidator(lambda s: s.startswith('self.'))]
|
|
|
|
|
-ProviderHandlerRef = LazyImportStr | ProviderHandler
|
|
|
|
|
-ProviderLookupDict = Dict[str, LazyImportStr]
|
|
|
|
|
-ProviderType = Literal['abspath', 'version', 'subdeps', 'install']
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-# class Host(BaseModel):
|
|
|
|
|
-# machine: str
|
|
|
|
|
-# system: str
|
|
|
|
|
-# platform: str
|
|
|
|
|
-# in_docker: bool
|
|
|
|
|
-# in_qemu: bool
|
|
|
|
|
-# python: str
|
|
|
|
|
-
|
|
|
|
|
-BinProviderName = Literal['env', 'pip', 'apt', 'brew', 'npm', 'vendor']
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-class BinProvider(ABC, BaseModel):
|
|
|
|
|
- name: BinProviderName
|
|
|
|
|
-
|
|
|
|
|
- abspath_provider: ProviderLookupDict = Field(default={'*': 'self.on_get_abspath'}, exclude=True)
|
|
|
|
|
- version_provider: ProviderLookupDict = Field(default={'*': 'self.on_get_version'}, exclude=True)
|
|
|
|
|
- subdeps_provider: ProviderLookupDict = Field(default={'*': 'self.on_get_subdeps'}, exclude=True)
|
|
|
|
|
- install_provider: ProviderLookupDict = Field(default={'*': 'self.on_install'}, exclude=True)
|
|
|
|
|
-
|
|
|
|
|
- _abspath_cache: ClassVar = {}
|
|
|
|
|
- _version_cache: ClassVar = {}
|
|
|
|
|
- _install_cache: ClassVar = {}
|
|
|
|
|
-
|
|
|
|
|
- # def provider_version(self) -> SemVer | None:
|
|
|
|
|
- # """Version of the actual underlying package manager (e.g. pip v20.4.1)"""
|
|
|
|
|
- # if self.name in ('env', 'vendor'):
|
|
|
|
|
- # return SemVer('0.0.0')
|
|
|
|
|
- # installer_binpath = Path(shutil.which(self.name)).resolve()
|
|
|
|
|
- # return bin_version(installer_binpath)
|
|
|
|
|
-
|
|
|
|
|
- # def provider_host(self) -> Host:
|
|
|
|
|
- # """Information about the host env, archictecture, and OS needed to select & build packages"""
|
|
|
|
|
- # p = platform.uname()
|
|
|
|
|
- # return Host(
|
|
|
|
|
- # machine=p.machine,
|
|
|
|
|
- # system=p.system,
|
|
|
|
|
- # platform=platform.platform(),
|
|
|
|
|
- # python=sys.implementation.name,
|
|
|
|
|
- # in_docker=os.environ.get('IN_DOCKER', '').lower() == 'true',
|
|
|
|
|
- # in_qemu=os.environ.get('IN_QEMU', '').lower() == 'true',
|
|
|
|
|
- # )
|
|
|
|
|
-
|
|
|
|
|
- def get_default_providers(self):
|
|
|
|
|
- return self.get_providers_for_bin('*')
|
|
|
|
|
-
|
|
|
|
|
- def resolve_provider_func(self, provider_func: ProviderHandlerRef | None) -> ProviderHandler | None:
|
|
|
|
|
- if provider_func is None:
|
|
|
|
|
- return None
|
|
|
|
|
-
|
|
|
|
|
- # if provider_func is a dotted path to a function on self, swap it for the actual function
|
|
|
|
|
- if isinstance(provider_func, str) and provider_func.startswith('self.'):
|
|
|
|
|
- provider_func = getattr(self, provider_func.split('self.', 1)[-1])
|
|
|
|
|
-
|
|
|
|
|
- # if provider_func is a dot-formatted import string, import the function
|
|
|
|
|
- if isinstance(provider_func, str):
|
|
|
|
|
- from django.utils.module_loading import import_string
|
|
|
|
|
-
|
|
|
|
|
- package_name, module_name, classname, path = provider_func.split('.', 3) # -> abc, def, ghi.jkl
|
|
|
|
|
-
|
|
|
|
|
- # get .ghi.jkl nested attr present on module abc.def
|
|
|
|
|
- imported_module = import_string(f'{package_name}.{module_name}.{classname}')
|
|
|
|
|
- provider_func = operator.attrgetter(path)(imported_module)
|
|
|
|
|
-
|
|
|
|
|
- # # abc.def.ghi.jkl -> 1, 2, 3
|
|
|
|
|
- # for idx in range(1, len(path)):
|
|
|
|
|
- # parent_path = '.'.join(path[:-idx]) # abc.def.ghi
|
|
|
|
|
- # try:
|
|
|
|
|
- # parent_module = import_string(parent_path)
|
|
|
|
|
- # provider_func = getattr(parent_module, path[-idx])
|
|
|
|
|
- # except AttributeError, ImportError:
|
|
|
|
|
- # continue
|
|
|
|
|
-
|
|
|
|
|
- assert TypeAdapter(ProviderHandler).validate_python(provider_func), (
|
|
|
|
|
- f'{self.__class__.__name__} provider func for {bin_name} was not a function or dotted-import path: {provider_func}')
|
|
|
|
|
-
|
|
|
|
|
- return provider_func
|
|
|
|
|
-
|
|
|
|
|
- @validate_call
|
|
|
|
|
- def get_providers_for_bin(self, bin_name: str) -> ProviderLookupDict:
|
|
|
|
|
- providers_for_bin = {
|
|
|
|
|
- 'abspath': self.abspath_provider.get(bin_name),
|
|
|
|
|
- 'version': self.version_provider.get(bin_name),
|
|
|
|
|
- 'subdeps': self.subdeps_provider.get(bin_name),
|
|
|
|
|
- 'install': self.install_provider.get(bin_name),
|
|
|
|
|
- }
|
|
|
|
|
- only_set_providers_for_bin = {k: v for k, v in providers_for_bin.items() if v is not None}
|
|
|
|
|
-
|
|
|
|
|
- return only_set_providers_for_bin
|
|
|
|
|
-
|
|
|
|
|
- @validate_call
|
|
|
|
|
- def get_provider_for_action(self, bin_name: BinName, provider_type: ProviderType, default_provider: Optional[ProviderHandlerRef]=None, overrides: Optional[ProviderLookupDict]=None) -> ProviderHandler:
|
|
|
|
|
- """
|
|
|
|
|
- Get the provider func for a given key + Dict of provider callbacks + fallback default provider.
|
|
|
|
|
- e.g. get_provider_for_action(bin_name='yt-dlp', 'install', default_provider=self.on_install, ...) -> Callable
|
|
|
|
|
- """
|
|
|
|
|
-
|
|
|
|
|
- provider_func_ref = (
|
|
|
|
|
- (overrides or {}).get(provider_type)
|
|
|
|
|
- or self.get_providers_for_bin(bin_name).get(provider_type)
|
|
|
|
|
- or self.get_default_providers().get(provider_type)
|
|
|
|
|
- or default_provider
|
|
|
|
|
- )
|
|
|
|
|
- # print('getting provider for action', bin_name, provider_type, provider_func)
|
|
|
|
|
-
|
|
|
|
|
- provider_func = self.resolve_provider_func(provider_func_ref)
|
|
|
|
|
-
|
|
|
|
|
- assert provider_func, f'No {self.name} provider func was found for {bin_name} in: {self.__class__.__name__}.'
|
|
|
|
|
-
|
|
|
|
|
- return provider_func
|
|
|
|
|
-
|
|
|
|
|
- @validate_call
|
|
|
|
|
- def call_provider_for_action(self, bin_name: BinName, provider_type: ProviderType, default_provider: Optional[ProviderHandlerRef]=None, overrides: Optional[ProviderLookupDict]=None, **kwargs) -> Any:
|
|
|
|
|
- provider_func: ProviderHandler = self.get_provider_for_action(
|
|
|
|
|
- bin_name=bin_name,
|
|
|
|
|
- provider_type=provider_type,
|
|
|
|
|
- default_provider=default_provider,
|
|
|
|
|
- overrides=overrides,
|
|
|
|
|
- )
|
|
|
|
|
- if not func_takes_args_or_kwargs(provider_func):
|
|
|
|
|
- # if it's a pure argless lambdas, dont pass bin_path and other **kwargs
|
|
|
|
|
- provider_func_without_args = cast(Callable[[], Any], provider_func)
|
|
|
|
|
- return provider_func_without_args()
|
|
|
|
|
-
|
|
|
|
|
- provider_func = cast(Callable[..., Any], provider_func)
|
|
|
|
|
- return provider_func(bin_name, **kwargs)
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
- def on_get_abspath(self, bin_name: BinName, **_) -> HostBinPath | None:
|
|
|
|
|
- print(f'[*] {self.__class__.__name__}: Getting abspath for {bin_name}...')
|
|
|
|
|
- try:
|
|
|
|
|
- return bin_abspath(bin_name)
|
|
|
|
|
- except ValidationError:
|
|
|
|
|
- return None
|
|
|
|
|
-
|
|
|
|
|
- def on_get_version(self, bin_name: BinName, abspath: Optional[HostBinPath]=None, **_) -> SemVer | None:
|
|
|
|
|
- abspath = abspath or self._abspath_cache.get(bin_name) or self.get_abspath(bin_name)
|
|
|
|
|
- if not abspath: return None
|
|
|
|
|
-
|
|
|
|
|
- print(f'[*] {self.__class__.__name__}: Getting version for {bin_name}...')
|
|
|
|
|
- try:
|
|
|
|
|
- return bin_version(abspath)
|
|
|
|
|
- except ValidationError:
|
|
|
|
|
- return None
|
|
|
|
|
-
|
|
|
|
|
- def on_get_subdeps(self, bin_name: BinName, **_) -> InstallStr:
|
|
|
|
|
- print(f'[*] {self.__class__.__name__}: Getting subdependencies for {bin_name}')
|
|
|
|
|
- # ... subdependency calculation logic here
|
|
|
|
|
- return TypeAdapter(InstallStr).validate_python(bin_name)
|
|
|
|
|
-
|
|
|
|
|
- @abstractmethod
|
|
|
|
|
- def on_install(self, bin_name: BinName, subdeps: Optional[InstallStr]=None, **_):
|
|
|
|
|
- subdeps = subdeps or self.get_subdeps(bin_name)
|
|
|
|
|
- print(f'[*] {self.__class__.__name__}: Installing subdependencies for {bin_name} ({subdeps})')
|
|
|
|
|
- # ... install logic here
|
|
|
|
|
- assert True
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
- @validate_call
|
|
|
|
|
- def get_abspath(self, bin_name: BinName, overrides: Optional[ProviderLookupDict]=None) -> HostBinPath | None:
|
|
|
|
|
- abspath = self.call_provider_for_action(
|
|
|
|
|
- bin_name=bin_name,
|
|
|
|
|
- provider_type='abspath',
|
|
|
|
|
- default_provider=self.on_get_abspath,
|
|
|
|
|
- overrides=overrides,
|
|
|
|
|
- )
|
|
|
|
|
- if not abspath:
|
|
|
|
|
- return None
|
|
|
|
|
- result = TypeAdapter(HostBinPath).validate_python(abspath)
|
|
|
|
|
- self._abspath_cache[bin_name] = result
|
|
|
|
|
- return result
|
|
|
|
|
-
|
|
|
|
|
- @validate_call
|
|
|
|
|
- def get_version(self, bin_name: BinName, abspath: Optional[HostBinPath]=None, overrides: Optional[ProviderLookupDict]=None) -> SemVer | None:
|
|
|
|
|
- version = self.call_provider_for_action(
|
|
|
|
|
- bin_name=bin_name,
|
|
|
|
|
- provider_type='version',
|
|
|
|
|
- default_provider=self.on_get_version,
|
|
|
|
|
- overrides=overrides,
|
|
|
|
|
- abspath=abspath,
|
|
|
|
|
- )
|
|
|
|
|
- if not version:
|
|
|
|
|
- return None
|
|
|
|
|
- result = SemVer(version)
|
|
|
|
|
- self._version_cache[bin_name] = result
|
|
|
|
|
- return result
|
|
|
|
|
-
|
|
|
|
|
- @validate_call
|
|
|
|
|
- def get_subdeps(self, bin_name: BinName, overrides: Optional[ProviderLookupDict]=None) -> InstallStr:
|
|
|
|
|
- subdeps = self.call_provider_for_action(
|
|
|
|
|
- bin_name=bin_name,
|
|
|
|
|
- provider_type='subdeps',
|
|
|
|
|
- default_provider=self.on_get_subdeps,
|
|
|
|
|
- overrides=overrides,
|
|
|
|
|
- )
|
|
|
|
|
- if not subdeps:
|
|
|
|
|
- subdeps = bin_name
|
|
|
|
|
- result = TypeAdapter(InstallStr).validate_python(subdeps)
|
|
|
|
|
- return result
|
|
|
|
|
-
|
|
|
|
|
- @validate_call
|
|
|
|
|
- def install(self, bin_name: BinName, overrides: Optional[ProviderLookupDict]=None) -> InstalledBin | None:
|
|
|
|
|
- subdeps = self.get_subdeps(bin_name, overrides=overrides)
|
|
|
|
|
-
|
|
|
|
|
- self.call_provider_for_action(
|
|
|
|
|
- bin_name=bin_name,
|
|
|
|
|
- provider_type='install',
|
|
|
|
|
- default_provider=self.on_install,
|
|
|
|
|
- overrides=overrides,
|
|
|
|
|
- subdeps=subdeps,
|
|
|
|
|
- )
|
|
|
|
|
-
|
|
|
|
|
- installed_abspath = self.get_abspath(bin_name)
|
|
|
|
|
- assert installed_abspath, f'Unable to find {bin_name} abspath after installing with {self.name}'
|
|
|
|
|
-
|
|
|
|
|
- installed_version = self.get_version(bin_name, abspath=installed_abspath)
|
|
|
|
|
- assert installed_version, f'Unable to find {bin_name} version after installing with {self.name}'
|
|
|
|
|
-
|
|
|
|
|
- result = InstalledBin(abspath=installed_abspath, version=installed_version)
|
|
|
|
|
- self._install_cache[bin_name] = result
|
|
|
|
|
- return result
|
|
|
|
|
-
|
|
|
|
|
- @validate_call
|
|
|
|
|
- def load(self, bin_name: BinName, overrides: Optional[ProviderLookupDict]=None, cache: bool=False) -> InstalledBin | None:
|
|
|
|
|
- installed_abspath = None
|
|
|
|
|
- installed_version = None
|
|
|
|
|
-
|
|
|
|
|
- if cache:
|
|
|
|
|
- installed_bin = self._install_cache.get(bin_name)
|
|
|
|
|
- if installed_bin:
|
|
|
|
|
- return installed_bin
|
|
|
|
|
- installed_abspath = self._abspath_cache.get(bin_name)
|
|
|
|
|
- installed_version = self._version_cache.get(bin_name)
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
- installed_abspath = installed_abspath or self.get_abspath(bin_name, overrides=overrides)
|
|
|
|
|
- if not installed_abspath:
|
|
|
|
|
- return None
|
|
|
|
|
-
|
|
|
|
|
- installed_version = installed_version or self.get_version(bin_name, abspath=installed_abspath, overrides=overrides)
|
|
|
|
|
- if not installed_version:
|
|
|
|
|
- return None
|
|
|
|
|
-
|
|
|
|
|
- return InstalledBin(abspath=installed_abspath, version=installed_version)
|
|
|
|
|
-
|
|
|
|
|
- @validate_call
|
|
|
|
|
- def load_or_install(self, bin_name: BinName, overrides: Optional[ProviderLookupDict]=None, cache: bool=True) -> InstalledBin | None:
|
|
|
|
|
- installed = self.load(bin_name, overrides=overrides, cache=cache)
|
|
|
|
|
- if not installed:
|
|
|
|
|
- installed = self.install(bin_name, overrides=overrides)
|
|
|
|
|
- return installed
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-class PipProvider(BinProvider):
|
|
|
|
|
- name: BinProviderName = 'pip'
|
|
|
|
|
-
|
|
|
|
|
- def on_install(self, bin_name: str, subdeps: Optional[InstallStr]=None, **_):
|
|
|
|
|
- subdeps = subdeps or self.on_get_subdeps(bin_name)
|
|
|
|
|
- print(f'[*] {self.__class__.__name__}: Installing subdependencies for {bin_name} ({subdeps})')
|
|
|
|
|
-
|
|
|
|
|
- proc = run(['pip', 'install', '--upgrade', *subdeps.split(' ')], stdout=PIPE, stderr=PIPE)
|
|
|
|
|
-
|
|
|
|
|
- if proc.returncode != 0:
|
|
|
|
|
- print(proc.stdout.strip().decode())
|
|
|
|
|
- print(proc.stderr.strip().decode())
|
|
|
|
|
- raise Exception(f'{self.__class__.__name__}: install got returncode {proc.returncode} while installing {subdeps}: {subdeps}')
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-class AptProvider(BinProvider):
|
|
|
|
|
- name: BinProviderName = 'apt'
|
|
|
|
|
-
|
|
|
|
|
- subdeps_provider: ProviderLookupDict = {
|
|
|
|
|
- 'yt-dlp': lambda: 'yt-dlp ffmpeg',
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- def on_install(self, bin_name: BinName, subdeps: Optional[InstallStr]=None, **_):
|
|
|
|
|
- subdeps = subdeps or self.on_get_subdeps(bin_name)
|
|
|
|
|
- print(f'[*] {self.__class__.__name__}: Installing subdependencies for {bin_name} ({subdeps})')
|
|
|
|
|
-
|
|
|
|
|
- run(['apt-get', 'update', '-qq'])
|
|
|
|
|
- proc = run(['apt-get', 'install', '-y', *subdeps.split(' ')], stdout=PIPE, stderr=PIPE)
|
|
|
|
|
-
|
|
|
|
|
- if proc.returncode != 0:
|
|
|
|
|
- print(proc.stdout.strip().decode())
|
|
|
|
|
- print(proc.stderr.strip().decode())
|
|
|
|
|
- raise Exception(f'{self.__class__.__name__} install got returncode {proc.returncode} while installing {subdeps}: {subdeps}')
|
|
|
|
|
-
|
|
|
|
|
-class BrewProvider(BinProvider):
|
|
|
|
|
- name: BinProviderName = 'brew'
|
|
|
|
|
-
|
|
|
|
|
- def on_install(self, bin_name: str, subdeps: Optional[InstallStr]=None, **_):
|
|
|
|
|
- subdeps = subdeps or self.on_get_subdeps(bin_name)
|
|
|
|
|
- print(f'[*] {self.__class__.__name__}: Installing subdependencies for {bin_name} ({subdeps})')
|
|
|
|
|
-
|
|
|
|
|
- proc = run(['brew', 'install', *subdeps.split(' ')], stdout=PIPE, stderr=PIPE)
|
|
|
|
|
-
|
|
|
|
|
- if proc.returncode != 0:
|
|
|
|
|
- print(proc.stdout.strip().decode())
|
|
|
|
|
- print(proc.stderr.strip().decode())
|
|
|
|
|
- raise Exception(f'{self.__class__.__name__} install got returncode {proc.returncode} while installing {subdeps}: {subdeps}')
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-class EnvProvider(BinProvider):
|
|
|
|
|
- name: BinProviderName = 'env'
|
|
|
|
|
-
|
|
|
|
|
- abspath_provider: ProviderLookupDict = {
|
|
|
|
|
- # 'python': lambda: Path('/opt/homebrew/Cellar/[email protected]/3.10.14/Frameworks/Python.framework/Versions/3.10/bin/python3.10'),
|
|
|
|
|
- }
|
|
|
|
|
- version_provider: ProviderLookupDict = {
|
|
|
|
|
- # 'python': lambda: '{}.{}.{}'.format(*sys.version_info[:3]),
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- def on_install(self, bin_name: BinName, subdeps: Optional[InstallStr]=None, **_):
|
|
|
|
|
- """The env provider is ready-only and does not install any packages, so this is a no-op"""
|
|
|
|
|
- pass
|
|
|