diff --git a/NEW b/NEW index c8a334f..7aea259 100644 --- a/NEW +++ b/NEW @@ -1,21 +1,4 @@ -source file -| parse -entries: name, conf, global options -| dedupe -entries: names, conf, global options -| dispatch -one future per module -| run; token queue; result queue -result task, runner task -| receive -(names, result) -| transform back -(name, result) -| runner task done -| write record file -file - TODO: -* dedupe & cache +* pass `tries` to `httpclient` * update tests * update README diff --git a/nvchecker/core.py b/nvchecker/core.py index d0295ba..9ed8407 100644 --- a/nvchecker/core.py +++ b/nvchecker/core.py @@ -26,6 +26,7 @@ from .lib import nicelogger from . import slogconf from .util import ( Entry, Entries, KeyManager, RawResult, Result, VersData, + FunctionWorker, ) from . import __version__ from .sortversion import sort_version_keys @@ -194,10 +195,20 @@ def dispatch( ret = [] for mod, tasks in mods.values(): - worker = mod.Worker( # type: ignore + if hasattr(mod, 'Worker'): + worker_cls = mod.Worker # type: ignore + else: + worker_cls = FunctionWorker + + worker = worker_cls( token_q, result_q, tasks, tries, keymanager, ) + if worker_cls is FunctionWorker: + func = mod.get_version # type: ignore + cacher = getattr(mod, 'cacher', None) + worker.set_func(func, cacher) + ret.append(worker.run()) return ret diff --git a/nvchecker/util.py b/nvchecker/util.py index 676727c..a6904a7 100644 --- a/nvchecker/util.py +++ b/nvchecker/util.py @@ -3,11 +3,13 @@ from __future__ import annotations +import asyncio from asyncio import Queue import contextlib from typing import ( Dict, Optional, List, AsyncGenerator, NamedTuple, Union, - Any, Tuple, + Any, Tuple, Coroutine, Callable, + TYPE_CHECKING, ) from pathlib import Path @@ -16,6 +18,7 @@ import toml Entry = Dict[str, Any] Entries = Dict[str, Entry] VersData = Dict[str, str] +VersionResult = Union[None, str, List[str], Exception] class KeyManager: def __init__( @@ -31,6 +34,16 @@ class KeyManager: def get_key(self, name: str) -> Optional[str]: return self.keys.get(name) +class RawResult(NamedTuple): + name: str + version: VersionResult + conf: Entry + +class Result(NamedTuple): + name: str + version: str + conf: Entry + class BaseWorker: def __init__( self, @@ -54,19 +67,91 @@ class BaseWorker: finally: await self.token_q.put(token) -class RawResult(NamedTuple): - name: str - version: Union[Exception, List[str], str] - conf: Entry +if TYPE_CHECKING: + from typing_extensions import Protocol + class GetVersionFunc(Protocol): + async def __call__( + self, + name: str, conf: Entry, + *, + keymanager: KeyManager, + ) -> VersionResult: + ... +else: + GetVersionFunc = Any -class Result(NamedTuple): - name: str - version: str - conf: Entry +Cacher = Callable[[str, Entry], str] -def conf_cacheable_with_name(key): - def get_cacheable_conf(name, conf): - conf = dict(conf) - conf[key] = conf.get(key) or name - return conf - return get_cacheable_conf +class FunctionWorker(BaseWorker): + func = None + cacher = None + + cache: Dict[str, Union[ + VersionResult, + asyncio.Task, + ]] + lock: asyncio.Lock + + def set_func( + self, + func: GetVersionFunc, + cacher: Optional[Cacher], + ) -> None: + self.func = func + self.cacher = cacher + if cacher: + self.cache = {} + self.lock = asyncio.Lock() + + async def run(self) -> None: + assert self.func is not None + futures = [ + self.run_one(name, entry) + for name, entry in self.tasks + ] + for fu in asyncio.as_completed(futures): + await fu + + async def run_one( + self, name: str, entry: Entry, + ) -> None: + assert self.func is not None + + try: + async with self.acquire_token(): + if self.cacher: + version = await self.run_one_may_cache( + name, entry) + else: + version = await self.func( + name, entry, keymanager = self.keymanager, + ) + await self.result_q.put(RawResult(name, version, entry)) + except Exception as e: + await self.result_q.put(RawResult(name, e, entry)) + + async def run_one_may_cache( + self, name: str, entry: Entry, + ) -> VersionResult: + assert self.cacher is not None + assert self.func is not None + + key = self.cacher(name, entry) + + async with self.lock: + cached = self.cache.get(key) + if cached is None: + coro = self.func( + name, entry, keymanager = self.keymanager, + ) + fu = asyncio.create_task(coro) + self.cache[key] = fu + + if asyncio.isfuture(cached): # pending + return await cached # type: ignore + elif cached is not None: # cached + return cached # type: ignore + else: # not cached + version = await fu + self.cache[key] = version + return version diff --git a/nvchecker_source/aur.py b/nvchecker_source/aur.py index 107674f..e0a23a6 100644 --- a/nvchecker_source/aur.py +++ b/nvchecker_source/aur.py @@ -4,28 +4,25 @@ import structlog from datetime import datetime import asyncio -from typing import Iterable, Dict, List, Tuple, Any +from typing import Iterable, Dict, List, Tuple, Any, Optional -from nvchecker.util import ( - Entry, BaseWorker, RawResult, - conf_cacheable_with_name, -) +from nvchecker.util import Entry, BaseWorker, RawResult from nvchecker.httpclient import session # type: ignore -get_cacheable_conf = conf_cacheable_with_name('aur') - logger = structlog.get_logger(logger_name=__name__) AUR_URL = 'https://aur.archlinux.org/rpc/' class AurResults: + cache: Dict[str, Optional[Dict[str, Any]]] + def __init__(self) -> None: self.cache = {} async def get_multiple( self, aurnames: Iterable[str], - ) -> Dict[str, Dict[str, Any]]: + ) -> Dict[str, Optional[Dict[str, Any]]]: params = [('v', '5'), ('type', 'info')] params.extend(('arg[]', name) for name in aurnames if name not in self.cache) diff --git a/nvchecker-old/source/cmd.py b/nvchecker_source/cmd.py similarity index 81% rename from nvchecker-old/source/cmd.py rename to nvchecker_source/cmd.py index e5a4933..6242b1a 100644 --- a/nvchecker-old/source/cmd.py +++ b/nvchecker_source/cmd.py @@ -1,5 +1,5 @@ # MIT licensed -# Copyright (c) 2013-2017 lilydjwg , et al. +# Copyright (c) 2013-2020 lilydjwg , et al. import asyncio @@ -7,7 +7,10 @@ import structlog logger = structlog.get_logger(logger_name=__name__) -async def get_version(name, conf, **kwargs): +def cacher(name, conf): + return conf['cmd'] + +async def get_version(name, conf, *, keymanager=None): cmd = conf['cmd'] p = await asyncio.create_subprocess_shell( cmd,