halfapi/halfapi/half_domain.py

374 lines
12 KiB
Python

import importlib
import inspect
import os
import re
from packaging.specifiers import SpecifierSet
from packaging.version import Version
from typing import Coroutine, Dict, Iterator, List, Tuple
from types import ModuleType, FunctionType
from schema import SchemaError
from starlette.applications import Starlette
from starlette.routing import Router
import yaml
from . import __version__
from .lib.constants import API_SCHEMA_DICT, ROUTER_SCHEMA, VERBS
from .half_route import HalfRoute
from .lib import acl
from .lib.routes import JSONRoute
from .lib.domain import MissingAclError, PathError, UnknownPathParameterType, \
UndefinedRoute, UndefinedFunction, get_fct_name, route_decorator
from .lib.domain_middleware import DomainMiddleware
from .logging import logger
class HalfDomain(Starlette):
def __init__(self, domain, module=None, router=None, acl=None, app=None):
"""
Parameters:
domain (str): Module name (should be importable)
router (str): Router name (should be importable from domain module
defaults to __router__ variable from domain module)
app (HalfAPI): The app instance
"""
self.app = app
self.m_domain = importlib.import_module(domain) if module is None else module
d_domain = getattr(self.m_domain, 'domain', domain)
self.name = d_domain['name']
self.id = d_domain['id']
self.version = d_domain['version']
self.halfapi_version = d_domain.get('halfapi_version', __version__)
self.deps = d_domain.get('deps', tuple())
if not router:
self.router = d_domain.get('routers', '.routers')
else:
self.router = router
self.m_router = None
try:
self.m_router = importlib.import_module(self.router, self.m_domain.__package__)
except AttributeError:
raise Exception('no router module')
self.m_acl = HalfDomain.m_acl(self.m_domain, acl)
self.config = { **app.config }
logger.info('HalfDomain creation %s %s', domain, self.config)
for elt in self.deps:
package, version = elt
specifier = SpecifierSet(version)
package_module = importlib.import_module(package)
if Version(package_module.__version__) not in specifier:
raise Exception(
'Wrong version for package {} version {} (excepting {})'.format(
package, package_module.__version__, specifier
))
super().__init__(
routes=self.gen_domain_routes(),
middleware=[
(DomainMiddleware, {
'domain': {
'name': self.name,
'id': self.id,
'version': self.version,
'halfapi_version': self.halfapi_version,
'config': self.config.get('domain', {}).get(self.name, {}).get('config', {})
}
})
]
)
@staticmethod
def m_acl(module, acl=None):
""" Returns the imported acl module for the domain module
"""
if not acl:
acl = getattr(module, '__acl__', '.acl')
return importlib.import_module(acl, module.__package__)
@staticmethod
def acls(module, acl=None):
""" Returns the ACLS constant for the given domain
"""
m_acl = HalfDomain.m_acl(module, acl)
try:
return getattr(m_acl, 'ACLS')
except AttributeError:
raise Exception(f'Missing acl.ACLS constant in module {m_acl.__package__}')
@staticmethod
def acls_route(domain, module_path=None, acl=None):
""" Dictionary of acls
Format :
{
[acl_name]: {
callable: fct_reference,
docs: fct_docstring,
result: fct_result
}
}
"""
d_res = {}
module = importlib.import_module(domain) \
if module_path is None \
else importlib.import_module(module_path)
m_acl = HalfDomain.m_acl(module, acl)
for acl_name, doc, order in HalfDomain.acls(
module,
acl=acl):
fct = getattr(m_acl, acl_name)
d_res[acl_name] = {
'callable': fct,
'docs': doc,
'result': None
}
return d_res
# def schema(self):
@staticmethod
def gen_routes(m_router: ModuleType,
verb: str,
path: List[str],
params: List[Dict]) -> Tuple[FunctionType, Dict]:
"""
Returns a tuple of the function associatied to the verb and path arguments,
and the dictionary of it's acls
Parameters:
- m_router (ModuleType): The module containing the function definition
- verb (str): The HTTP verb for the route (GET, POST, ...)
- path (List): The route path, as a list (each item being a level of
deepness), from the lowest level (domain) to the highest
- params (Dict): The acl list of the following format :
[{'acl': Function, 'args': {'required': [], 'optional': []}}]
Returns:
(Function, Dict): The destination function and the acl dictionary
"""
if len(params) == 0:
raise MissingAclError('[{}] {}'.format(verb, '/'.join(path)))
if len(path) == 0:
logger.error('Empty path for [{%s}]', verb)
raise PathError()
fct_name = get_fct_name(verb, path[-1])
if hasattr(m_router, fct_name):
fct = getattr(m_router, fct_name)
else:
raise UndefinedFunction('{}.{}'.format(m_router.__name__, fct_name or ''))
if not inspect.iscoroutinefunction(fct):
return route_decorator(fct), params
# TODO: Remove when using only sync functions
return acl.args_check(fct), params
@staticmethod
def gen_router_routes(m_router, path: List[str]) -> \
Iterator[Tuple[str, str, ModuleType, Coroutine, List]]:
"""
Recursive generator that parses a router (or a subrouter)
and yields from gen_routes
Parameters:
- m_router (ModuleType): The currently treated router module
- path (List[str]): The current path stack
Yields:
(str, str, ModuleType, Coroutine, List): A tuple containing the path, verb,
router module, function reference and parameters of the route.
Function and parameters are yielded from then gen_routes function,
that decorates the endpoint function.
"""
for subpath, params in HalfDomain.read_router(m_router).items():
path.append(subpath)
for verb in VERBS:
if verb not in params:
continue
yield ('/'.join(filter(lambda x: len(x) > 0, path)),
verb,
m_router,
*HalfDomain.gen_routes(m_router, verb, path, params[verb])
)
for subroute in params.get('SUBROUTES', []):
#logger.debug('Processing subroute **%s** - %s', subroute, m_router.__name__)
param_match = re.fullmatch('^([A-Z_]+)_([a-z]+)$', subroute)
if param_match is not None:
try:
path.append('{{{}:{}}}'.format(
param_match.groups()[0].lower(),
param_match.groups()[1]))
except AssertionError as exc:
raise UnknownPathParameterType(subroute) from exc
else:
path.append(subroute)
try:
yield from HalfDomain.gen_router_routes(
importlib.import_module(f'.{subroute}', m_router.__name__),
path)
except ImportError as exc:
logger.error('Failed to import subroute **{%s}**', subroute)
raise exc
path.pop()
path.pop()
@staticmethod
def read_router(m_router: ModuleType) -> Dict:
"""
Reads a module and returns a router dict
If the module has a "ROUTES" constant, it just returns this constant,
Else, if the module has an "ACLS" constant, it builds the accurate dict
TODO: May be another thing, may be not a part of halfAPI
"""
m_path = None
try:
if not hasattr(m_router, 'ROUTES'):
routes = {'':{}}
acls = getattr(m_router, 'ACLS') if hasattr(m_router, 'ACLS') else None
if acls is not None:
for method in acls.keys():
if method not in VERBS:
raise Exception(
'This method is not handled: {}'.format(method))
routes[''][method] = []
routes[''][method] = acls[method].copy()
routes['']['SUBROUTES'] = []
if hasattr(m_router, '__path__'):
""" Module is a package
"""
m_path = getattr(m_router, '__path__')
if isinstance(m_path, list) and len(m_path) == 1:
routes['']['SUBROUTES'] = [
elt.name
for elt in os.scandir(m_path[0])
if elt.is_dir()
]
else:
routes = getattr(m_router, 'ROUTES')
try:
ROUTER_SCHEMA.validate(routes)
except SchemaError as exc:
logger.error(routes)
raise exc
return routes
except ImportError as exc:
# TODO: Proper exception handling
raise exc
except FileNotFoundError as exc:
# TODO: Proper exception handling
logger.error(m_path)
raise exc
def gen_domain_routes(self):
"""
Yields the Route objects for a domain
Parameters:
m_domains: ModuleType
Returns:
Generator(HalfRoute)
"""
yield HalfRoute('/',
JSONRoute([ self.schema() ]),
[{'acl': acl.public}],
'GET'
)
for path, method, m_router, fct, params in HalfDomain.gen_router_routes(self.m_router, []):
yield HalfRoute(f'/{path}', fct, params, method)
def schema_dict(self) -> Dict:
""" gen_router_routes return values as a dict
Parameters:
m_router (ModuleType): The domain routers' module
Returns:
Dict: Schema of dict is halfapi.lib.constants.DOMAIN_SCHEMA
@TODO: Should be a "router_schema_dict" function
"""
d_res = {}
for path, verb, m_router, fct, parameters in HalfDomain.gen_router_routes(self.m_router, []):
if path not in d_res:
d_res[path] = {}
if verb not in d_res[path]:
d_res[path][verb] = {}
d_res[path][verb]['callable'] = f'{m_router.__name__}:{fct.__name__}'
try:
d_res[path][verb]['docs'] = yaml.safe_load(fct.__doc__)
except AttributeError:
logger.error(
'Cannot read docstring from fct (fct=%s path=%s verb=%s', fct.__name__, path, verb)
d_res[path][verb]['acls'] = list(map(lambda elt: { **elt, 'acl': elt['acl'].__name__ },
parameters))
return d_res
def schema(self) -> Dict:
schema = { **API_SCHEMA_DICT }
schema['domain'] = {
'name': self.name,
'id': self.id,
'version': getattr(self.m_domain, '__version__', ''),
'patch_release': getattr(self.m_domain, '__patch_release__', ''),
'routers': self.m_router.__name__,
'acls': tuple(getattr(self.m_acl, 'ACLS', ()))
}
schema['paths'] = self.schema_dict()
return schema