| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484 |
- __package__ = 'abx'
- __id__ = 'abx'
- __label__ = 'ABX'
- __author__ = 'Nick Sweeting'
- __homepage__ = 'https://github.com/ArchiveBox'
- __order__ = 0
- import inspect
- import importlib
- import itertools
- from pathlib import Path
- from typing import Dict, Callable, List, Set, Tuple, Iterable, Any, TypeVar, TypedDict, Type, cast, Generic, Mapping, overload, Final, ParamSpec, Literal, Protocol
- from types import ModuleType
- from typing_extensions import Annotated
- from functools import cache
- from benedict import benedict
- from pydantic import AfterValidator
- from pluggy import HookimplMarker, PluginManager, HookimplOpts, HookspecOpts, HookCaller
- ParamsT = ParamSpec("ParamsT")
- ReturnT = TypeVar('ReturnT')
- class HookSpecDecoratorThatReturnsFirstResult(Protocol):
- def __call__(self, func: Callable[ParamsT, ReturnT]) -> Callable[ParamsT, ReturnT]: ...
- class HookSpecDecoratorThatReturnsListResults(Protocol):
- def __call__(self, func: Callable[ParamsT, ReturnT]) -> Callable[ParamsT, List[ReturnT]]: ...
- class TypedHookspecMarker:
- """
- Improved version of pluggy.HookspecMarker that supports type inference of hookspecs with firstresult=True|False correctly
- https://github.com/pytest-dev/pluggy/issues/191
- """
- __slots__ = ('project_name',)
-
- def __init__(self, project_name: str) -> None:
- self.project_name: Final[str] = project_name
- # handle @hookspec(firstresult=False) -> List[ReturnT] (test_firstresult_False_hookspec)
- @overload
- def __call__(
- self,
- function: None = ...,
- firstresult: Literal[False] = ...,
- historic: bool = ...,
- warn_on_impl: Warning | None = ...,
- warn_on_impl_args: Mapping[str, Warning] | None = ...,
- ) -> HookSpecDecoratorThatReturnsListResults: ...
- # handle @hookspec(firstresult=True) -> ReturnT (test_firstresult_True_hookspec)
- @overload
- def __call__(
- self,
- function: None = ...,
- firstresult: Literal[True] = ...,
- historic: bool = ...,
- warn_on_impl: Warning | None = ...,
- warn_on_impl_args: Mapping[str, Warning] | None = ...,
- ) -> HookSpecDecoratorThatReturnsFirstResult: ...
-
- # handle @hookspec -> List[ReturnT] (test_normal_hookspec)
- # order matters!!! this one has to come last
- @overload
- def __call__(
- self,
- function: Callable[ParamsT, ReturnT] = ...,
- firstresult: Literal[False] = ...,
- historic: bool = ...,
- warn_on_impl: None = ...,
- warn_on_impl_args: None = ...,
- ) -> Callable[ParamsT, List[ReturnT]]: ...
- def __call__(
- self,
- function: Callable[ParamsT, ReturnT] | None = None,
- firstresult: bool = False,
- historic: bool = False,
- warn_on_impl: Warning | None = None,
- warn_on_impl_args: Mapping[str, Warning] | None = None,
- ) -> Callable[ParamsT, List[ReturnT]] | HookSpecDecoratorThatReturnsListResults | HookSpecDecoratorThatReturnsFirstResult:
-
- def setattr_hookspec_opts(func) -> Callable:
- if historic and firstresult:
- raise ValueError("cannot have a historic firstresult hook")
- opts: HookspecOpts = {
- "firstresult": firstresult,
- "historic": historic,
- "warn_on_impl": warn_on_impl,
- "warn_on_impl_args": warn_on_impl_args,
- }
- setattr(func, self.project_name + "_spec", opts)
- return func
- if function is not None:
- return setattr_hookspec_opts(function)
- else:
- return setattr_hookspec_opts
- spec = hookspec = TypedHookspecMarker("abx")
- impl = hookimpl = HookimplMarker("abx")
- def is_valid_attr_name(x: str) -> str:
- assert x.isidentifier() and not x.startswith('_')
- return x
- def is_valid_module_name(x: str) -> str:
- assert x.isidentifier() and not x.startswith('_') and x.islower()
- return x
- AttrName = Annotated[str, AfterValidator(is_valid_attr_name)]
- PluginId = Annotated[str, AfterValidator(is_valid_module_name)]
- class PluginInfo(TypedDict, total=True):
- id: PluginId
- package: AttrName
- label: str
- version: str
- author: str
- homepage: str
- dependencies: List[str]
-
- source_code: str
- hooks: Dict[AttrName, Callable]
- module: ModuleType
-
- PluginSpec = TypeVar("PluginSpec")
- class ABXPluginManager(PluginManager, Generic[PluginSpec]):
- """
- Patch to fix pluggy's PluginManager to work with pydantic models.
- See: https://github.com/pytest-dev/pluggy/pull/536
- """
-
- # enable static type checking of pm.hook.call() calls
- # https://stackoverflow.com/a/62871889/2156113
- # https://github.com/pytest-dev/pluggy/issues/191
- hook: PluginSpec
-
- def create_typed_hookcaller(self, name: str, module_or_class: Type[PluginSpec], spec_opts: HookspecOpts) -> HookCaller:
- """
- create a new HookCaller subclass with a modified __signature__
- so that the return type is correct and args are converted to kwargs
- """
- TypedHookCaller = type('TypedHookCaller', (HookCaller,), {})
-
- hookspec_signature = inspect.signature(getattr(module_or_class, name))
- hookspec_return_type = hookspec_signature.return_annotation
-
- # replace return type with list if firstresult=False
- hookcall_return_type = hookspec_return_type if spec_opts['firstresult'] else List[hookspec_return_type]
-
- # replace each arg with kwarg equivalent (pm.hook.call() only accepts kwargs)
- args_as_kwargs = [
- param.replace(kind=inspect.Parameter.KEYWORD_ONLY) if param.name != 'self' else param
- for param in hookspec_signature.parameters.values()
- ]
- TypedHookCaller.__signature__ = hookspec_signature.replace(parameters=args_as_kwargs, return_annotation=hookcall_return_type)
- TypedHookCaller.__name__ = f'{name}_HookCaller'
-
- return TypedHookCaller(name, self._hookexec, module_or_class, spec_opts)
-
- def add_hookspecs(self, module_or_class: Type[PluginSpec]) -> None:
- """Add HookSpecs from the given class, (generic type allows us to enforce types of pm.hook.call() statically)"""
- names = []
- for name in dir(module_or_class):
- spec_opts = self.parse_hookspec_opts(module_or_class, name)
- if spec_opts is not None:
- hc: HookCaller | None = getattr(self.hook, name, None)
- if hc is None:
- hc = self.create_typed_hookcaller(name, module_or_class, spec_opts)
- setattr(self.hook, name, hc)
- else:
- # Plugins registered this hook without knowing the spec.
- hc.set_specification(module_or_class, spec_opts)
- for hookfunction in hc.get_hookimpls():
- self._verify_hook(hc, hookfunction)
- names.append(name)
-
- if not names:
- raise ValueError(
- f"did not find any {self.project_name!r} hooks in {module_or_class!r}"
- )
- def parse_hookimpl_opts(self, plugin, name: str) -> HookimplOpts | None:
- # IMPORTANT: @property methods can have side effects, and are never hookimpl
- # if attr is a property, skip it in advance
- # plugin_class = plugin if inspect.isclass(plugin) else type(plugin)
- if isinstance(getattr(plugin, name, None), property):
- return None
-
- try:
- return super().parse_hookimpl_opts(plugin, name)
- except AttributeError:
- return None
- pm = ABXPluginManager("abx")
- def get_plugin_order(plugin: PluginId | Path | ModuleType | Type) -> Tuple[int, Path]:
- assert plugin
- plugin_module = None
- plugin_dir = None
-
- if isinstance(plugin, str) or isinstance(plugin, Path):
- if str(plugin).endswith('.py'):
- plugin_dir = Path(plugin).parent
- elif '/' in str(plugin):
- # assume it's a path to a plugin directory
- plugin_dir = Path(plugin)
- elif str(plugin).isidentifier():
- pass
- elif inspect.ismodule(plugin):
- plugin_module = plugin
- plugin_dir = Path(str(plugin_module.__file__)).parent
- elif inspect.isclass(plugin):
- plugin_module = plugin
- plugin_dir = Path(inspect.getfile(plugin)).parent
- else:
- raise ValueError(f'Invalid plugin, cannot get order: {plugin}')
- if plugin_dir:
- try:
- # if .plugin_order file exists, use it to set the load priority
- order = int((plugin_dir / '.plugin_order').read_text())
- assert -1000000 < order < 100000000
- return (order, plugin_dir)
- except FileNotFoundError:
- pass
-
- if plugin_module:
- order = getattr(plugin_module, '__order__', 999)
- else:
- order = 999
-
- assert order is not None
- assert plugin_dir
-
- return (order, plugin_dir)
- # @cache
- def get_plugin(plugin: PluginId | ModuleType | Type) -> PluginInfo:
- assert plugin
-
- # import the plugin module by its name
- if isinstance(plugin, str):
- module = importlib.import_module(plugin)
- # print('IMPORTED PLUGIN:', plugin)
- plugin = getattr(module, 'PLUGIN_SPEC', getattr(module, 'PLUGIN', module))
- elif inspect.ismodule(plugin):
- module = plugin
- plugin = getattr(module, 'PLUGIN_SPEC', getattr(module, 'PLUGIN', module))
- elif inspect.isclass(plugin):
- module = inspect.getmodule(plugin)
- else:
- raise ValueError(f'Invalid plugin, must be a module, class, or plugin ID (package name): {plugin}')
-
- assert module
-
- plugin_file = Path(inspect.getfile(module))
- plugin_package = module.__package__ or module.__name__
- plugin_id = plugin_package.replace('.', '_')
-
- # load the plugin info from the plugin/__init__.py __attr__s if they exist
- plugin_module_attrs = {
- 'label': getattr(module, '__label__', plugin_id),
- 'version': getattr(module, '__version__', '0.0.1'),
- 'author': getattr(module, '__author__', 'ArchiveBox'),
- 'homepage': getattr(module, '__homepage__', 'https://github.com/ArchiveBox'),
- 'dependencies': getattr(module, '__dependencies__', []),
- }
- # load the plugin info from the plugin/pyproject.toml file if it has one
- plugin_toml_info = {}
- try:
- # try loading ./pyproject.toml first in case the plugin is a bare python file not inside a package dir
- plugin_toml_info = benedict.from_toml((plugin_file.parent / 'pyproject.toml').read_text()).project
- except Exception:
- try:
- # try loading ../pyproject.toml next in case the plugin is in a packge dir
- plugin_toml_info = benedict.from_toml((plugin_file.parent.parent / 'pyproject.toml').read_text()).project
- except Exception:
- # print('WARNING: could not detect pyproject.toml for PLUGIN:', plugin_id, plugin_file.parent, 'ERROR:', e)
- pass
-
-
- assert plugin_id
- assert plugin_package
- assert module.__file__
-
- # merge the plugin info from all sources + add dyanmically calculated info
- return cast(PluginInfo, benedict(PluginInfo(**{
- 'id': plugin_id,
- **plugin_module_attrs,
- **plugin_toml_info,
- 'package': plugin_package,
- 'source_code': module.__file__,
- 'order': get_plugin_order(plugin),
- 'hooks': get_plugin_hooks(plugin),
- 'module': module,
- 'plugin': plugin,
- })))
- def get_all_plugins() -> Dict[PluginId, PluginInfo]:
- """Get the metadata for all the plugins registered with Pluggy."""
- plugins = {}
- for plugin_module in pm.get_plugins():
- plugin_info = get_plugin(plugin=plugin_module)
- assert 'id' in plugin_info
- plugins[plugin_info['id']] = plugin_info
- return benedict(plugins)
- def get_all_hook_names() -> Set[str]:
- """Get a set of all hook names across all plugins"""
- return {
- hook_name
- for plugin_module in pm.get_plugins()
- for hook_name in get_plugin_hooks(plugin_module)
- }
-
- def get_all_hook_specs() -> Dict[str, Dict[str, Any]]:
- """Get a set of all hookspec methods defined in all plugins (useful for type checking if a pm.hook.call() is valid)"""
- hook_specs = {}
-
- for hook_name in get_all_hook_names():
- for plugin_module in pm.get_plugins():
- if hasattr(plugin_module, hook_name):
- hookspecopts = pm.parse_hookspec_opts(plugin_module, hook_name)
- if hookspecopts:
- method = getattr(plugin_module, hook_name)
- signature = inspect.signature(method)
- return_type = signature.return_annotation if signature.return_annotation != inspect._empty else None
-
- if hookspecopts.get('firstresult'):
- return_type = return_type
- else:
- # if not firstresult, return_type is a sequence
- return_type = List[return_type]
-
- call_signature = signature.replace(return_annotation=return_type)
- method = lambda *args, **kwargs: getattr(pm.hook, hook_name)(*args, **kwargs)
- method.__signature__ = call_signature
- method.__name__ = hook_name
- method.__package__ = plugin_module.__package__
-
- hook_specs[hook_name] = {
- 'name': hook_name,
- 'method': method,
- 'signature': call_signature,
- 'hookspec_opts': hookspecopts,
- 'hookspec_signature': signature,
- 'hookspec_plugin': plugin_module.__package__,
- }
- return hook_specs
-
- ###### PLUGIN DISCOVERY AND LOADING ########################################################
- def find_plugins_in_dir(plugins_dir: Path) -> Dict[PluginId, Path]:
- """
- Find all the plugins in a given directory. Just looks for an __init__.py file.
- """
- python_dirs = plugins_dir.glob("*/__init__.py")
- sorted_python_dirs = sorted(python_dirs, key=lambda p: get_plugin_order(plugin=p) or 500)
-
- return {
- plugin_entrypoint.parent.name: plugin_entrypoint.parent
- for plugin_entrypoint in sorted_python_dirs
- if plugin_entrypoint.parent.name not in ('abx', 'core')
- }
- def get_pip_installed_plugins(group: PluginId='abx') -> Dict[PluginId, Path]:
- """replaces pm.load_setuptools_entrypoints("abx"), finds plugins that registered entrypoints via pip"""
- import importlib.metadata
- DETECTED_PLUGINS = {} # module_name: module_dir_path
- for dist in list(importlib.metadata.distributions()):
- for entrypoint in dist.entry_points:
- if entrypoint.group != group or pm.is_blocked(entrypoint.name):
- continue
- DETECTED_PLUGINS[entrypoint.name] = Path(entrypoint.load().__file__).parent
- # pm.register(plugin, name=ep.name)
- # pm._plugin_distinfo.append((plugin, DistFacade(dist)))
- return DETECTED_PLUGINS
- # Load all plugins from pip packages, archivebox built-ins, and user plugins
- def load_plugins(plugins: Iterable[PluginId | ModuleType | Type] | Dict[PluginId, Path]):
- """
- Load all the plugins from a dictionary of module names and directory paths.
- """
- PLUGINS_TO_LOAD = []
- LOADED_PLUGINS = {}
-
- for plugin in plugins:
- plugin_info = get_plugin(plugin)
- assert plugin_info, f'No plugin metadata found for {plugin}'
- assert 'id' in plugin_info and 'module' in plugin_info
- if plugin_info['module'] in pm.get_plugins():
- LOADED_PLUGINS[plugin_info['id']] = plugin_info
- continue
- else:
- PLUGINS_TO_LOAD.append(plugin_info)
- PLUGINS_TO_LOAD = sorted(PLUGINS_TO_LOAD, key=lambda x: x['order'])
-
- for plugin_info in PLUGINS_TO_LOAD:
- pm.register(plugin_info['module'])
- LOADED_PLUGINS[plugin_info['id']] = plugin_info
- # print(f' √ Loaded plugin: {plugin_id}')
- return benedict(LOADED_PLUGINS)
- @cache
- def get_plugin_hooks(plugin: PluginId | ModuleType | Type | None) -> Dict[AttrName, Callable]:
- """Get all the functions marked with @hookimpl on a module."""
- if not plugin:
- return {}
-
- hooks = {}
-
- if isinstance(plugin, str):
- plugin_module = importlib.import_module(plugin)
- elif inspect.ismodule(plugin) or inspect.isclass(plugin):
- plugin_module = plugin
- else:
- raise ValueError(f'Invalid plugin, cannot get hooks: {plugin}')
-
- for attr_name in dir(plugin_module):
- if attr_name.startswith('_'):
- continue
- try:
- attr = getattr(plugin_module, attr_name)
- if isinstance(attr, Callable):
- if pm.parse_hookimpl_opts(plugin_module, attr_name):
- hooks[attr_name] = attr
- except Exception as e:
- print(f'Error getting hookimpls for {plugin}: {e}')
- return hooks
- ReturnT = TypeVar('ReturnT')
- def as_list(results: List[List[ReturnT]]) -> List[ReturnT]:
- """Flatten a list of lists returned by a pm.hook.call() into a single list"""
- return list(itertools.chain(*results))
- def as_dict(results: List[Dict[PluginId, ReturnT]]) -> Dict[PluginId, ReturnT]:
- """Flatten a list of dicts returned by a pm.hook.call() into a single dict"""
-
- if isinstance(results, (dict, benedict)):
- results_list = results.values()
- else:
- results_list = results
-
- return benedict({
- result_id: result
- for plugin_results in results_list
- for result_id, result in plugin_results.items()
- })
|