[lib.*] Refactor libs

This commit is contained in:
Maxime Alves LIRMM@home 2021-06-17 18:17:51 +02:00
parent ed7485a8a1
commit 6181592692
7 changed files with 187 additions and 110 deletions

View File

@ -43,7 +43,7 @@ class HalfAPI:
if config:
SECRET = config.get('SECRET')
PRODUCTION = config.get('PRODUCTION')
DOMAINS = config.get('DOMAINS')
DOMAINS = config.get('DOMAINS', {})
CONFIG = config.get('CONFIG', {
'domains': DOMAINS
})
@ -94,7 +94,6 @@ class HalfAPI:
}
)
print(f'HALFAPI CONFIG: {CONFIG}')
self.application.add_middleware(
DomainMiddleware,
config=CONFIG

1
halfapi/lib/constants.py Normal file
View File

@ -0,0 +1 @@
VERBS = ('GET', 'POST', 'PUT', 'PATCH', 'DELETE')

View File

@ -4,31 +4,65 @@ lib/domain.py The domain-scoped utility functions
"""
import os
import re
import sys
import importlib
import inspect
import logging
from types import ModuleType, FunctionType
from typing import Callable, Generator, Dict, List
from typing import Any, Callable, Coroutine, Generator
from typing import Dict, List, Tuple, Iterator
import inspect
from starlette.exceptions import HTTPException
from halfapi.lib import acl
from halfapi.lib.responses import ORJSONResponse
from halfapi.lib.router import read_router
from halfapi.lib.constants import VERBS
logger = logging.getLogger("uvicorn.asgi")
VERBS = ('GET', 'POST', 'PUT', 'PATCH', 'DELETE')
class MissingAclError(Exception):
pass
class PathError(Exception):
pass
def route_decorator(fct: Callable = None, ret_type: str = 'json'):
class UnknownPathParameterType(Exception):
pass
class UndefinedRoute(Exception):
pass
class UndefinedFunction(Exception):
pass
def route_decorator(fct: FunctionType, ret_type: str = 'json') -> Coroutine:
""" Returns an async function that can be mounted on a router
"""
if ret_type == 'json':
@acl.args_check
async def wrapped(request, *args, **kwargs):
return ORJSONResponse(
fct(**request.path_params, halfapi={'user': request.user if
fct_args_spec = inspect.getfullargspec(fct).args
fct_args = request.path_params.copy()
if 'halfapi' in fct_args_spec:
fct_args['halfapi'] = {
'user': request.user if
'user' in request else None,
'config': request.scope['config']}, data=kwargs.get('data')))
'config': request.scope['config']
}
if 'data' in fct_args_spec:
fct_args['data'] = kwargs.get('data')
try:
return ORJSONResponse(fct(**fct_args))
except NotImplementedError as exc:
raise HTTPException(501) from exc
else:
raise Exception('Return type not available')
@ -79,63 +113,53 @@ def get_fct_name(http_verb: str, path: str) -> str:
return '_'.join(fct_name)
def gen_routes(route_params: Dict, path: List, m_router: ModuleType) -> Generator:
def gen_routes(m_router: ModuleType,
verb: str,
path: List[str],
params: List[Dict]) -> Tuple[FunctionType, Dict]:
"""
Generates a tuple of the following form for a specific path:
"/path/to/route", {
"GET": {
"fct": endpoint_fct,
"params": [
{ "acl": acl_fct, [...] }
]
},
[...]
}
Returns a tuple of the function associatied to the verb and path arguments,
and the dictionary of it's acls
Parameters:
- m_router (ModuleType): The module containing the function definition
- route_params (Dict): Contains the following keys :
- one or more HTTP VERB (if none, route is not treated)
- one or zero FQTN (if none, fqtn is set to None)
- verb (str): The HTTP verb for the route (GET, POST, ...)
- path (List): The route path, as a list (each item being a level of
deepness), from the lowest level (domain) to the highest
- m_router (ModuleType): The parent router module
- params (Dict): The acl list of the following format :
[{'acl': Function, 'args': {'required': [], 'optional': []}}]
Yields:
(str, Dict): The path routes description
Returns:
(Function, Dict): The destination function and the acl dictionary
"""
d_res = {'fqtn': route_params.get('FQTN')}
if len(params) == 0:
raise MissingAclError('[{}] {}'.format(verb, '/'.join(path)))
for verb in VERBS:
params = route_params.get(verb)
if params is None:
continue
if len(params) == 0:
logger.error('No ACL for route [{%s}] %s', verb, "/".join(path))
if len(path) == 0:
logger.error('Empty path for [{%s}]', verb)
raise PathError()
try:
fct_name = get_fct_name(verb, path[-1])
fct = getattr(m_router, fct_name)
logger.debug('%s defined in %s', fct.__name__, m_router.__name__)
except AttributeError as exc:
logger.error('%s is not defined in %s', fct_name, m_router.__name__)
continue
if not inspect.iscoroutinefunction(fct):
fct = route_decorator(fct)
d_res[verb] = {'fct': fct, 'params': params}
if len(d_res.keys()) > 1:
yield f"/{'/'.join([ elt for elt in path if elt ])}", d_res
fct_name = get_fct_name(verb, path[-1])
if hasattr(m_router, fct_name):
fct = getattr(m_router, fct_name)
else:
raise UndefinedFunction('{}.{}'.format(m_router.__name__, fct_name or ''))
def gen_router_routes(m_router: ModuleType, path: List[str]) -> Generator:
if not inspect.iscoroutinefunction(fct):
return route_decorator(fct), params
else:
return fct, params
def gen_router_routes(m_router: ModuleType, path: List[str]) -> \
Iterator[Tuple[str, str, Coroutine, List]]:
"""
Recursive generatore that parses a router (or a subrouter)
and yields from gen_routes
@ -147,53 +171,43 @@ def gen_router_routes(m_router: ModuleType, path: List[str]) -> Generator:
Yields:
(str, Dict): The path routes description from **gen_routes**
(str, str, Coroutine, List): A tuple containing the path, verb,
function and parameters of the route
"""
if not hasattr(m_router, 'ROUTES'):
routes = {'':{}}
acls = m_router.ACLS if hasattr(m_router, 'ACLS') else {}
for verb in VERBS:
if not hasattr(m_router, verb.lower()):
continue
""" There is a "verb" route in the router
"""
if verb.upper() not in acls:
continue
routes[''][verb.upper()] = []
routes[''][verb.upper()] = m_router.ACLS[verb.upper()].copy()
routes['']['SUBROUTES'] = []
for item in os.listdir(list(m_router.__path__)[0]):
if os.path.isdir(os.path.join(list(m_router.__path__)[0], item)):
routes['']['SUBROUTES'].append(item)
else:
routes = getattr(m_router, 'ROUTES')
for subpath, route_params in routes.items():
for subpath, params in read_router(m_router).items():
path.append(subpath)
yield from gen_routes(route_params, path, m_router)
for verb in VERBS:
if verb not in params:
continue
yield ('/'.join(filter(lambda x: len(x) > 0, path)),
verb,
*gen_routes(m_router, verb, path, params[verb])
)
subroutes = route_params.get('SUBROUTES', [])
for subroute in subroutes:
logger.debug('Processing subroute **%s** - %s', subroute, m_router.__name__)
if ':' in subroute:
path.append(f'{{{subroute}}}')
for subroute in params.get('SUBROUTES', []):
#logger.debug('Processing subroute **%s** - %s', subroute, m_router.__name__)
param_match = re.fullmatch('^([A-Z_]+)_([a-z]+)$', subroute)
if param_match is not None:
try:
path.append('{{{}:{}}}'.format(
param_match.groups()[0].lower(),
param_match.groups()[1]))
except AssertionError:
raise UnknownPathParameterType(subroute)
else:
path.append(subroute)
try:
submod = importlib.import_module(f'.{subroute}', m_router.__name__)
yield from gen_router_routes(
importlib.import_module(f'.{subroute}', m_router.__name__),
path)
except ImportError as exc:
logger.error('Failed to import subroute **{%s}**', subroute)
raise exc
yield from gen_router_routes(submod, path)
path.pop()
path.pop()

View File

@ -40,10 +40,17 @@ class DomainMiddleware(BaseHTTPMiddleware):
"""
domain = scope['path'].split('/')[1]
self.domains = d_domains(self.config)
self.domains = self.config.get('domains', {})
if domain in self.domains:
if len(domain) == 0:
for domain in self.domains:
self.api[domain], self.acl[domain] = api_routes(self.domains[domain])
elif domain in self.domains:
self.api[domain], self.acl[domain] = api_routes(self.domains[domain])
else:
logger.error('domain not in self.domains %s / %s',
scope['path'],
self.domains)
scope_ = scope.copy()
scope_['domains'] = self.domains

44
halfapi/lib/router.py Normal file
View File

@ -0,0 +1,44 @@
import os
from types import ModuleType
from typing import Dict
from halfapi.lib.constants import VERBS
def read_router(m_router: ModuleType) -> Dict:
"""
Reads a module and returns a router dict
"""
if not hasattr(m_router, 'ROUTES'):
routes = {'':{}}
acls = getattr(m_router, 'ACLS') if hasattr(m_router, 'ACLS') else None
if acls is not None:
for verb in VERBS:
if not hasattr(m_router, verb.lower()):
continue
""" There is a "verb" route in the router
"""
if verb.upper() not in acls:
continue
routes[''][verb.upper()] = []
routes[''][verb.upper()] = acls[verb.upper()].copy()
routes['']['SUBROUTES'] = []
if hasattr(m_router, '__path__'):
""" Module is a package
"""
m_path = getattr(m_router, '__path__')
if isinstance(m_path, list) and len(m_path) == 1:
routes['']['SUBROUTES'] = [
elt.name
for elt in os.scandir(m_path[0])
if elt.is_dir()
]
else:
routes = getattr(m_router, 'ROUTES')
return routes

View File

@ -16,7 +16,7 @@ Exception :
from datetime import datetime
from functools import partial, wraps
import logging
from typing import Callable, List, Dict, Generator
from typing import Callable, List, Dict, Generator, Tuple
from types import ModuleType, FunctionType
from starlette.exceptions import HTTPException
@ -105,22 +105,19 @@ def gen_starlette_routes(d_domains: Dict[str, ModuleType]) -> Generator:
"""
for domain_name, m_domain in d_domains.items():
for path, d_route in gen_router_routes(m_domain, [domain_name]):
for verb in VERBS:
if verb not in d_route.keys():
continue
yield (
Route(path,
route_acl_decorator(
d_route[verb]['fct'],
d_route[verb]['params']
),
methods=[verb])
)
for path, verb, fct, params in gen_router_routes(m_domain, []):
yield (
Route(f'/{domain_name}/{path}',
route_acl_decorator(
fct,
params
),
methods=[verb])
)
def api_routes(m_dom: ModuleType) -> Generator:
def api_routes(m_dom: ModuleType) -> Tuple[Dict, Dict]:
"""
Yields the description objects for HalfAPI app routes
@ -128,7 +125,7 @@ def api_routes(m_dom: ModuleType) -> Generator:
m_dom (ModuleType): the halfapi module
Returns:
Generator(Dict)
(Dict, Dict)
"""
d_acls = {}
@ -137,6 +134,7 @@ def api_routes(m_dom: ModuleType) -> Generator:
l_params = []
for param in params:
if 'acl' not in param.keys() or not param['acl']:
continue
@ -149,13 +147,10 @@ def api_routes(m_dom: ModuleType) -> Generator:
return l_params
d_res = {}
for path, d_route in gen_router_routes(m_dom, [m_dom.__name__]):
d_res[path] = {'fqtn': d_route['fqtn'] }
for verb in VERBS:
if verb not in d_route.keys():
continue
d_res[path][verb] = str_acl(d_route[verb]['params'])
for path, verb, fct, params in gen_router_routes(m_dom, []):
if path not in d_res:
d_res[path] = {}
d_res[path][verb] = str_acl(params)
return d_res, d_acls
@ -196,7 +191,7 @@ def debug_routes():
yield Route('/halfapi/log', debug_log)
async def error_code(request: Request, *args, **kwargs):
code = request.path_params.get('code')
code = request.path_params['code']
raise HTTPException(code)
yield Route('/halfapi/error/{code:int}', error_code)

View File

@ -10,13 +10,16 @@ Constant :
SCHEMAS (starlette.schemas.SchemaGenerator)
"""
import logging
from typing import Dict
from starlette.schemas import SchemaGenerator
from starlette.exceptions import HTTPException
from .routes import gen_starlette_routes, api_acls
from .responses import ORJSONResponse
logger = logging.getLogger('uvicorn.asgi')
SCHEMAS = SchemaGenerator(
{"openapi": "3.0.0", "info": {"title": "HalfAPI", "version": "1.0"}}
)
@ -28,6 +31,20 @@ async def get_api_routes(request, *args, **kwargs):
"""
return ORJSONResponse(request.scope['api'])
def get_api_domain_routes(domain):
async def wrapped(request, *args, **kwargs):
"""
description: Returns the current API routes description (HalfAPI 0.2.1)
as a JSON object
"""
if domain in request.scope['api']:
return ORJSONResponse(request.scope['api'][domain])
else:
raise HTTPException(404)
return wrapped
async def schema_json(request, *args, **kwargs):
"""