support simpler source with only one get_version funcion
also caching is working now
This commit is contained in:
parent
0232d0fb4f
commit
435edf8589
19
NEW
19
NEW
|
@ -1,21 +1,4 @@
|
|||
source file
|
||||
| parse
|
||||
entries: name, conf, global options
|
||||
| dedupe
|
||||
entries: names, conf, global options
|
||||
| dispatch
|
||||
one future per module
|
||||
| run; token queue; result queue
|
||||
result task, runner task
|
||||
| receive
|
||||
(names, result)
|
||||
| transform back
|
||||
(name, result)
|
||||
| runner task done
|
||||
| write record file
|
||||
file
|
||||
|
||||
TODO:
|
||||
* dedupe & cache
|
||||
* pass `tries` to `httpclient`
|
||||
* update tests
|
||||
* update README
|
||||
|
|
|
@ -26,6 +26,7 @@ from .lib import nicelogger
|
|||
from . import slogconf
|
||||
from .util import (
|
||||
Entry, Entries, KeyManager, RawResult, Result, VersData,
|
||||
FunctionWorker,
|
||||
)
|
||||
from . import __version__
|
||||
from .sortversion import sort_version_keys
|
||||
|
@ -194,10 +195,20 @@ def dispatch(
|
|||
|
||||
ret = []
|
||||
for mod, tasks in mods.values():
|
||||
worker = mod.Worker( # type: ignore
|
||||
if hasattr(mod, 'Worker'):
|
||||
worker_cls = mod.Worker # type: ignore
|
||||
else:
|
||||
worker_cls = FunctionWorker
|
||||
|
||||
worker = worker_cls(
|
||||
token_q, result_q, tasks,
|
||||
tries, keymanager,
|
||||
)
|
||||
if worker_cls is FunctionWorker:
|
||||
func = mod.get_version # type: ignore
|
||||
cacher = getattr(mod, 'cacher', None)
|
||||
worker.set_func(func, cacher)
|
||||
|
||||
ret.append(worker.run())
|
||||
|
||||
return ret
|
||||
|
|
|
@ -3,11 +3,13 @@
|
|||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from asyncio import Queue
|
||||
import contextlib
|
||||
from typing import (
|
||||
Dict, Optional, List, AsyncGenerator, NamedTuple, Union,
|
||||
Any, Tuple,
|
||||
Any, Tuple, Coroutine, Callable,
|
||||
TYPE_CHECKING,
|
||||
)
|
||||
from pathlib import Path
|
||||
|
||||
|
@ -16,6 +18,7 @@ import toml
|
|||
Entry = Dict[str, Any]
|
||||
Entries = Dict[str, Entry]
|
||||
VersData = Dict[str, str]
|
||||
VersionResult = Union[None, str, List[str], Exception]
|
||||
|
||||
class KeyManager:
|
||||
def __init__(
|
||||
|
@ -31,6 +34,16 @@ class KeyManager:
|
|||
def get_key(self, name: str) -> Optional[str]:
|
||||
return self.keys.get(name)
|
||||
|
||||
class RawResult(NamedTuple):
|
||||
name: str
|
||||
version: VersionResult
|
||||
conf: Entry
|
||||
|
||||
class Result(NamedTuple):
|
||||
name: str
|
||||
version: str
|
||||
conf: Entry
|
||||
|
||||
class BaseWorker:
|
||||
def __init__(
|
||||
self,
|
||||
|
@ -54,19 +67,91 @@ class BaseWorker:
|
|||
finally:
|
||||
await self.token_q.put(token)
|
||||
|
||||
class RawResult(NamedTuple):
|
||||
name: str
|
||||
version: Union[Exception, List[str], str]
|
||||
conf: Entry
|
||||
if TYPE_CHECKING:
|
||||
from typing_extensions import Protocol
|
||||
class GetVersionFunc(Protocol):
|
||||
async def __call__(
|
||||
self,
|
||||
name: str, conf: Entry,
|
||||
*,
|
||||
keymanager: KeyManager,
|
||||
) -> VersionResult:
|
||||
...
|
||||
else:
|
||||
GetVersionFunc = Any
|
||||
|
||||
class Result(NamedTuple):
|
||||
name: str
|
||||
version: str
|
||||
conf: Entry
|
||||
Cacher = Callable[[str, Entry], str]
|
||||
|
||||
def conf_cacheable_with_name(key):
|
||||
def get_cacheable_conf(name, conf):
|
||||
conf = dict(conf)
|
||||
conf[key] = conf.get(key) or name
|
||||
return conf
|
||||
return get_cacheable_conf
|
||||
class FunctionWorker(BaseWorker):
|
||||
func = None
|
||||
cacher = None
|
||||
|
||||
cache: Dict[str, Union[
|
||||
VersionResult,
|
||||
asyncio.Task,
|
||||
]]
|
||||
lock: asyncio.Lock
|
||||
|
||||
def set_func(
|
||||
self,
|
||||
func: GetVersionFunc,
|
||||
cacher: Optional[Cacher],
|
||||
) -> None:
|
||||
self.func = func
|
||||
self.cacher = cacher
|
||||
if cacher:
|
||||
self.cache = {}
|
||||
self.lock = asyncio.Lock()
|
||||
|
||||
async def run(self) -> None:
|
||||
assert self.func is not None
|
||||
futures = [
|
||||
self.run_one(name, entry)
|
||||
for name, entry in self.tasks
|
||||
]
|
||||
for fu in asyncio.as_completed(futures):
|
||||
await fu
|
||||
|
||||
async def run_one(
|
||||
self, name: str, entry: Entry,
|
||||
) -> None:
|
||||
assert self.func is not None
|
||||
|
||||
try:
|
||||
async with self.acquire_token():
|
||||
if self.cacher:
|
||||
version = await self.run_one_may_cache(
|
||||
name, entry)
|
||||
else:
|
||||
version = await self.func(
|
||||
name, entry, keymanager = self.keymanager,
|
||||
)
|
||||
await self.result_q.put(RawResult(name, version, entry))
|
||||
except Exception as e:
|
||||
await self.result_q.put(RawResult(name, e, entry))
|
||||
|
||||
async def run_one_may_cache(
|
||||
self, name: str, entry: Entry,
|
||||
) -> VersionResult:
|
||||
assert self.cacher is not None
|
||||
assert self.func is not None
|
||||
|
||||
key = self.cacher(name, entry)
|
||||
|
||||
async with self.lock:
|
||||
cached = self.cache.get(key)
|
||||
if cached is None:
|
||||
coro = self.func(
|
||||
name, entry, keymanager = self.keymanager,
|
||||
)
|
||||
fu = asyncio.create_task(coro)
|
||||
self.cache[key] = fu
|
||||
|
||||
if asyncio.isfuture(cached): # pending
|
||||
return await cached # type: ignore
|
||||
elif cached is not None: # cached
|
||||
return cached # type: ignore
|
||||
else: # not cached
|
||||
version = await fu
|
||||
self.cache[key] = version
|
||||
return version
|
||||
|
|
|
@ -4,28 +4,25 @@
|
|||
import structlog
|
||||
from datetime import datetime
|
||||
import asyncio
|
||||
from typing import Iterable, Dict, List, Tuple, Any
|
||||
from typing import Iterable, Dict, List, Tuple, Any, Optional
|
||||
|
||||
from nvchecker.util import (
|
||||
Entry, BaseWorker, RawResult,
|
||||
conf_cacheable_with_name,
|
||||
)
|
||||
from nvchecker.util import Entry, BaseWorker, RawResult
|
||||
from nvchecker.httpclient import session # type: ignore
|
||||
|
||||
get_cacheable_conf = conf_cacheable_with_name('aur')
|
||||
|
||||
logger = structlog.get_logger(logger_name=__name__)
|
||||
|
||||
AUR_URL = 'https://aur.archlinux.org/rpc/'
|
||||
|
||||
class AurResults:
|
||||
cache: Dict[str, Optional[Dict[str, Any]]]
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.cache = {}
|
||||
|
||||
async def get_multiple(
|
||||
self,
|
||||
aurnames: Iterable[str],
|
||||
) -> Dict[str, Dict[str, Any]]:
|
||||
) -> Dict[str, Optional[Dict[str, Any]]]:
|
||||
params = [('v', '5'), ('type', 'info')]
|
||||
params.extend(('arg[]', name) for name in aurnames
|
||||
if name not in self.cache)
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
# MIT licensed
|
||||
# Copyright (c) 2013-2017 lilydjwg <lilydjwg@gmail.com>, et al.
|
||||
# Copyright (c) 2013-2020 lilydjwg <lilydjwg@gmail.com>, et al.
|
||||
|
||||
import asyncio
|
||||
|
||||
|
@ -7,7 +7,10 @@ import structlog
|
|||
|
||||
logger = structlog.get_logger(logger_name=__name__)
|
||||
|
||||
async def get_version(name, conf, **kwargs):
|
||||
def cacher(name, conf):
|
||||
return conf['cmd']
|
||||
|
||||
async def get_version(name, conf, *, keymanager=None):
|
||||
cmd = conf['cmd']
|
||||
p = await asyncio.create_subprocess_shell(
|
||||
cmd,
|
Loading…
Reference in New Issue