diff --git a/halfapi/__init__.py b/halfapi/__init__.py index 7b49754..10aa592 100644 --- a/halfapi/__init__.py +++ b/halfapi/__init__.py @@ -1,5 +1,5 @@ #!/usr/bin/env python3 -__version__ = '0.5.0' +__version__ = '0.5.1' def version(): return f'HalfAPI version:{__version__}' diff --git a/halfapi/lib/domain.py b/halfapi/lib/domain.py index 34845d2..c310a9b 100644 --- a/halfapi/lib/domain.py +++ b/halfapi/lib/domain.py @@ -175,3 +175,47 @@ def d_domains(config) -> Dict[str, ModuleType]: except ImportError as exc: logger.error('Could not load a domain : %s', exc) raise exc + +def router_acls(route_params: Dict, path: List, m_router: ModuleType) -> Generator: + d_res = {'fqtn': route_params.get('FQTN')} + + for verb in VERBS: + params = route_params.get(verb) + if params is None: + continue + if len(params) == 0: + logger.error('No ACL for route [{%s}] %s', verb, "/".join(path)) + else: + for param in params: + acl = param.get('acl') + yield acl.__name__, acl + + +def domain_acls(m_router, path): + if not hasattr(m_router, 'ROUTES'): + logger.error('Missing *ROUTES* constant in *%s*', m_router.__name__) + raise Exception(f'No ROUTES constant for {m_router.__name__}') + + + routes = m_router.ROUTES + + for subpath, route_params in routes.items(): + path.append(subpath) + + yield from router_acls(route_params, path, m_router) + + subroutes = route_params.get('SUBROUTES', []) + for subroute in subroutes: + logger.debug('Processing subroute **%s** - %s', subroute, m_router.__name__) + path.append(subroute) + try: + submod = importlib.import_module(f'.{subroute}', m_router.__name__) + except ImportError as exc: + logger.error('Failed to import subroute **{%s}**', subroute) + raise exc + + yield from domain_acls(submod, path) + + path.pop() + + path.pop() diff --git a/halfapi/lib/routes.py b/halfapi/lib/routes.py index cdc8cf6..0a50c7e 100644 --- a/halfapi/lib/routes.py +++ b/halfapi/lib/routes.py @@ -24,7 +24,8 @@ from starlette.routing import Route from starlette.requests import Request from starlette.responses import Response, PlainTextResponse -from halfapi.lib.domain import gen_router_routes, VERBS +from halfapi.lib.domain import gen_router_routes, VERBS, domain_acls +from ..conf import DOMAINSDICT logger = logging.getLogger('uvicorn.asgi') @@ -163,11 +164,15 @@ def api_acls(request): """ Returns the list of possible ACLs """ res = {} + domains = DOMAINSDICT() doc = 'doc' in request.query_params - for domain, d_domain_acl in request.scope['acl'].items(): + for domain, m_domain in domains.items(): res[domain] = {} - for acl_name, fct in d_domain_acl.items(): + for acl_name, fct in domain_acls(m_domain, [domain]): fct_result = fct.__doc__.strip() if doc and fct.__doc__ else fct(request) + if acl_name in res[domain]: + continue + if isinstance(fct_result, FunctionType): fct_result = fct()(request) res[domain][acl_name] = fct_result