[wip] 0.7.0 - acl decorator
This commit is contained in:
parent
d5076abb21
commit
b3ae9d4759
|
@ -2,6 +2,7 @@ import importlib
|
|||
import inspect
|
||||
import os
|
||||
import re
|
||||
import pkgutil
|
||||
|
||||
from packaging.specifiers import SpecifierSet
|
||||
from packaging.version import Version
|
||||
|
@ -11,7 +12,7 @@ from types import ModuleType, FunctionType
|
|||
from schema import SchemaError
|
||||
|
||||
from starlette.applications import Starlette
|
||||
from starlette.routing import Router
|
||||
from starlette.routing import Router, Route
|
||||
|
||||
import yaml
|
||||
|
||||
|
@ -20,6 +21,7 @@ from . import __version__
|
|||
from .lib.constants import API_SCHEMA_DICT, ROUTER_SCHEMA, VERBS
|
||||
from .half_route import HalfRoute
|
||||
from .lib import acl
|
||||
from .lib.responses import ORJSONResponse
|
||||
from .lib.routes import JSONRoute
|
||||
from .lib.domain import MissingAclError, PathError, UnknownPathParameterType, \
|
||||
UndefinedRoute, UndefinedFunction, get_fct_name, route_decorator
|
||||
|
@ -75,7 +77,7 @@ class HalfDomain(Starlette):
|
|||
|
||||
|
||||
super().__init__(
|
||||
routes=self.gen_domain_routes(),
|
||||
routes=[ elt for elt in self.gen_domain_routes() ],
|
||||
middleware=[
|
||||
(DomainMiddleware, {
|
||||
'domain': {
|
||||
|
@ -193,7 +195,9 @@ class HalfDomain(Starlette):
|
|||
|
||||
|
||||
@staticmethod
|
||||
def gen_router_routes(m_router, path: List[str]) -> \
|
||||
def gen_router_routes(
|
||||
m_router,
|
||||
path: List[str]) -> \
|
||||
Iterator[Tuple[str, str, ModuleType, Coroutine, List]]:
|
||||
"""
|
||||
Recursive generator that parses a router (or a subrouter)
|
||||
|
@ -206,106 +210,64 @@ class HalfDomain(Starlette):
|
|||
|
||||
Yields:
|
||||
|
||||
(str, str, ModuleType, Coroutine, List): A tuple containing the path, verb,
|
||||
router module, function reference and parameters of the route.
|
||||
Function and parameters are yielded from then gen_routes function,
|
||||
that decorates the endpoint function.
|
||||
HalfRoute
|
||||
"""
|
||||
|
||||
for subpath, params in HalfDomain.read_router(m_router).items():
|
||||
path.append(subpath)
|
||||
|
||||
for verb in VERBS:
|
||||
if verb not in params:
|
||||
continue
|
||||
yield ('/'.join(filter(lambda x: len(x) > 0, path)),
|
||||
verb,
|
||||
m_router,
|
||||
*HalfDomain.gen_routes(m_router, verb, path, params[verb])
|
||||
def read_router(m_router: ModuleType, path: List[str]) -> \
|
||||
Iterator[HalfRoute]:
|
||||
"""
|
||||
Reads a module and yields the HalfRoute objects
|
||||
"""
|
||||
try:
|
||||
yield from (
|
||||
HalfRoute(
|
||||
path,
|
||||
getattr(m_router, verb),
|
||||
verb
|
||||
)
|
||||
for verb in map(str.lower, VERBS)
|
||||
if getattr(m_router, verb, False)
|
||||
)
|
||||
except AttributeError:
|
||||
""" The router has no function for verb
|
||||
"""
|
||||
pass
|
||||
|
||||
for subroute in params.get('SUBROUTES', []):
|
||||
#logger.debug('Processing subroute **%s** - %s', subroute, m_router.__name__)
|
||||
param_match = re.fullmatch('^([A-Z_]+)_([a-z]+)$', subroute)
|
||||
if param_match is not None:
|
||||
try:
|
||||
path.append('{{{}:{}}}'.format(
|
||||
param_match.groups()[0].lower(),
|
||||
param_match.groups()[1]))
|
||||
except AssertionError as exc:
|
||||
raise UnknownPathParameterType(subroute) from exc
|
||||
else:
|
||||
path.append(subroute)
|
||||
for _loader, subpath, is_pkg in pkgutil.walk_packages(m_router.__path__):
|
||||
if not is_pkg:
|
||||
""" Do not treat if it is not a package
|
||||
"""
|
||||
continue
|
||||
|
||||
param_match = re.fullmatch('^([A-Z_]+)_([a-z]+)$', subpath)
|
||||
if param_match is not None:
|
||||
try:
|
||||
yield from HalfDomain.gen_router_routes(
|
||||
importlib.import_module(f'.{subroute}', m_router.__name__),
|
||||
path)
|
||||
|
||||
except ImportError as exc:
|
||||
logger.error('Failed to import subroute **{%s}**', subroute)
|
||||
raise exc
|
||||
|
||||
path.pop()
|
||||
|
||||
path.pop()
|
||||
|
||||
|
||||
@staticmethod
|
||||
def read_router(m_router: ModuleType) -> Dict:
|
||||
"""
|
||||
Reads a module and returns a router dict
|
||||
|
||||
If the module has a "ROUTES" constant, it just returns this constant,
|
||||
Else, if the module has an "ACLS" constant, it builds the accurate dict
|
||||
|
||||
TODO: May be another thing, may be not a part of halfAPI
|
||||
|
||||
"""
|
||||
m_path = None
|
||||
|
||||
try:
|
||||
if not hasattr(m_router, 'ROUTES'):
|
||||
routes = {'':{}}
|
||||
acls = getattr(m_router, 'ACLS') if hasattr(m_router, 'ACLS') else None
|
||||
|
||||
if acls is not None:
|
||||
for method in acls.keys():
|
||||
if method not in VERBS:
|
||||
raise Exception(
|
||||
'This method is not handled: {}'.format(method))
|
||||
|
||||
routes[''][method] = []
|
||||
routes[''][method] = acls[method].copy()
|
||||
|
||||
routes['']['SUBROUTES'] = []
|
||||
if hasattr(m_router, '__path__'):
|
||||
""" Module is a package
|
||||
"""
|
||||
m_path = getattr(m_router, '__path__')
|
||||
if isinstance(m_path, list) and len(m_path) == 1:
|
||||
routes['']['SUBROUTES'] = [
|
||||
elt.name
|
||||
for elt in os.scandir(m_path[0])
|
||||
if elt.is_dir()
|
||||
]
|
||||
path.append('{{{}:{}}}'.format(
|
||||
param_match.groups()[0].lower(),
|
||||
param_match.groups()[1]))
|
||||
except AssertionError as exc:
|
||||
raise UnknownPathParameterType(subpath) from exc
|
||||
else:
|
||||
routes = getattr(m_router, 'ROUTES')
|
||||
path.append(subpath)
|
||||
|
||||
# yield ('/'.join(filter(lambda x: len(x) > 0, path)),
|
||||
# verb,
|
||||
# m_router,
|
||||
# *HalfDomain.gen_routes(m_router, verb, path, params[verb])
|
||||
# )
|
||||
|
||||
try:
|
||||
ROUTER_SCHEMA.validate(routes)
|
||||
except SchemaError as exc:
|
||||
logger.error(routes)
|
||||
yield from HalfDomain.gen_router_routes(
|
||||
importlib.import_module( f'.{subpath}', m_router.__name__),
|
||||
path
|
||||
)
|
||||
|
||||
path.pop()
|
||||
except ImportError as exc:
|
||||
logger.error('Failed to import subroute **{%s}**', subpath)
|
||||
raise exc
|
||||
|
||||
return routes
|
||||
except ImportError as exc:
|
||||
# TODO: Proper exception handling
|
||||
raise exc
|
||||
except FileNotFoundError as exc:
|
||||
# TODO: Proper exception handling
|
||||
logger.error(m_path)
|
||||
raise exc
|
||||
yield from read_router(m_router, path)
|
||||
|
||||
|
||||
def gen_domain_routes(self):
|
||||
"""
|
||||
|
@ -317,16 +279,18 @@ class HalfDomain(Starlette):
|
|||
Returns:
|
||||
Generator(HalfRoute)
|
||||
"""
|
||||
yield HalfRoute('/',
|
||||
JSONRoute([ self.schema() ]),
|
||||
[{'acl': acl.public}],
|
||||
'GET'
|
||||
async def route(request, *args, **kwargs):
|
||||
return ORJSONResponse([ self.schema() ])
|
||||
|
||||
yield Route(
|
||||
path='/',
|
||||
endpoint=route,
|
||||
methods=['GET']
|
||||
)
|
||||
|
||||
for path, method, m_router, fct, params in HalfDomain.gen_router_routes(self.m_router, []):
|
||||
yield HalfRoute(f'/{path}', fct, params, method)
|
||||
yield from HalfDomain.gen_router_routes(self.m_router, [])
|
||||
|
||||
def schema_dict(self) -> Dict:
|
||||
def schema_dict(self, acls=[{'acl': acl.public}]) -> Dict:
|
||||
""" gen_router_routes return values as a dict
|
||||
Parameters:
|
||||
|
||||
|
@ -340,22 +304,27 @@ class HalfDomain(Starlette):
|
|||
"""
|
||||
d_res = {}
|
||||
|
||||
for path, verb, m_router, fct, parameters in HalfDomain.gen_router_routes(self.m_router, []):
|
||||
for half_route in HalfDomain.gen_router_routes(self.m_router, []):
|
||||
path = half_route.path
|
||||
verb = list(half_route.methods)[0]
|
||||
fct = half_route.endpoint.__name__
|
||||
|
||||
if path not in d_res:
|
||||
d_res[path] = {}
|
||||
|
||||
if verb not in d_res[path]:
|
||||
d_res[path][verb] = {}
|
||||
|
||||
d_res[path][verb]['callable'] = f'{m_router.__name__}:{fct.__name__}'
|
||||
d_res[path][verb]['callable'] = f'{path}:{fct}'
|
||||
try:
|
||||
d_res[path][verb]['docs'] = yaml.safe_load(fct.__doc__)
|
||||
except AttributeError:
|
||||
logger.error(
|
||||
'Cannot read docstring from fct (fct=%s path=%s verb=%s', fct.__name__, path, verb)
|
||||
|
||||
d_res[path][verb]['acls'] = list(map(lambda elt: { **elt, 'acl': elt['acl'].__name__ },
|
||||
parameters))
|
||||
d_res[path][verb]['acls'] = list(map(
|
||||
lambda elt: { **elt, 'acl': elt['acl'].__name__ },
|
||||
half_route.acls))
|
||||
|
||||
return d_res
|
||||
|
||||
|
|
|
@ -2,6 +2,7 @@
|
|||
|
||||
Child class of starlette.routing.Route
|
||||
"""
|
||||
import inspect
|
||||
from functools import partial, wraps
|
||||
|
||||
from typing import Callable, Coroutine, List, Dict
|
||||
|
@ -14,56 +15,71 @@ from starlette.exceptions import HTTPException
|
|||
|
||||
from .logging import logger
|
||||
from .lib.domain import MissingAclError, PathError, UnknownPathParameterType, \
|
||||
UndefinedRoute, UndefinedFunction
|
||||
UndefinedRoute, UndefinedFunction, route_decorator
|
||||
|
||||
class HalfRoute(Route):
|
||||
""" HalfRoute
|
||||
"""
|
||||
def __init__(self, path: List[str], fct: Callable, params: List[Dict], method: str):
|
||||
logger.info('HalfRoute creation: %s %s %s %s', path, fct, params, method)
|
||||
if len(params) == 0:
|
||||
raise MissingAclError('[{}] {}'.format(method, '/'.join(path)))
|
||||
def __init__(self, path: List[str], fct: Callable, method: str, acls=[]):
|
||||
logger.info('HalfRoute creation: %s %s %s', path, fct, method)
|
||||
|
||||
fct_args_spec = inspect.getfullargspec(fct)
|
||||
fct_args_defaults_dict = {}
|
||||
fct_args_defaults = inspect.getfullargspec(fct).defaults or []
|
||||
for i in range(len(fct_args_defaults)):
|
||||
fct_args_defaults_dict[fct_args_spec.args[-i]] = fct_args_defaults[-i]
|
||||
|
||||
if '__acls' in fct_args_defaults_dict:
|
||||
self.acls = fct_args_defaults_dict.get('__acls', {}).copy()
|
||||
|
||||
elif '__acls' in fct_args_spec.kwonlyargs:
|
||||
self.acls = fct_args_spec.kwonlydefaults.get('__acls', {}).copy()
|
||||
|
||||
else:
|
||||
self.acls = acls.copy()
|
||||
|
||||
if 'ret_type' in fct_args_defaults_dict:
|
||||
self.ret_type = fct_args_defaults_dict['ret_type']
|
||||
else:
|
||||
self.ret_type = 'json'
|
||||
|
||||
print(f'HalfRoute {path} {fct_args_spec} {self.ret_type}')
|
||||
|
||||
|
||||
if len(self.acls) == 0:
|
||||
raise MissingAclError(
|
||||
'Route function has no acl attached {}:{}'.format(fct.__module__, fct.__name__))
|
||||
|
||||
if len(path) == 0:
|
||||
logger.error('Empty path for [{%s}]', method)
|
||||
raise PathError()
|
||||
|
||||
super().__init__(
|
||||
path,
|
||||
'/'.join([''] + path),
|
||||
HalfRoute.acl_decorator(
|
||||
fct,
|
||||
params
|
||||
route_decorator(fct),
|
||||
self.acls
|
||||
),
|
||||
methods=[method])
|
||||
|
||||
@staticmethod
|
||||
def acl_decorator(fct: Callable = None, params: List[Dict] = None) -> Coroutine:
|
||||
def acl_decorator(fct, acl_spec) -> Coroutine:
|
||||
"""
|
||||
Decorator for async functions that calls pre-conditions functions
|
||||
and appends kwargs to the target function
|
||||
|
||||
|
||||
Parameters:
|
||||
fct (Callable):
|
||||
fct (Function):
|
||||
The function to decorate
|
||||
|
||||
params List[Dict]:
|
||||
A list of dicts that have an "acl" key that points to a function
|
||||
acl_spec:
|
||||
ACL specification
|
||||
|
||||
Returns:
|
||||
async function
|
||||
"""
|
||||
|
||||
if not params:
|
||||
params = []
|
||||
|
||||
if not fct:
|
||||
return partial(HalfRoute.acl_decorator, params=params)
|
||||
|
||||
|
||||
@wraps(fct)
|
||||
async def caller(req: Request, *args, **kwargs):
|
||||
for param in params:
|
||||
print(f'ACL_DECORATOR {fct} {args} {kwargs}')
|
||||
for param in acl_spec:
|
||||
if param.get('acl'):
|
||||
passed = param['acl'](req, *args, **kwargs)
|
||||
if isinstance(passed, FunctionType):
|
||||
|
@ -71,11 +87,11 @@ class HalfRoute(Route):
|
|||
|
||||
if not passed:
|
||||
logger.debug(
|
||||
'ACL FAIL for current route (%s - %s)', fct, param.get('acl'))
|
||||
'ACL FAIL for current route (%s)', fct)
|
||||
continue
|
||||
|
||||
logger.debug(
|
||||
'ACL OK for current route (%s - %s)', fct, param.get('acl'))
|
||||
# logger.debug(
|
||||
# 'ACL OK for current route (%s - %s)', fct, param.get('acl'))
|
||||
|
||||
req.scope['acl_pass'] = param['acl'].__name__
|
||||
|
||||
|
@ -93,8 +109,8 @@ class HalfRoute(Route):
|
|||
if 'check' in req.query_params:
|
||||
return PlainTextResponse(param['acl'].__name__)
|
||||
|
||||
logger.debug('acl_decorator %s', param)
|
||||
logger.debug('calling %s:%s %s %s', fct.__module__, fct.__name__, args, kwargs)
|
||||
# logger.debug('acl_decorator %s', param)
|
||||
# logger.debug('calling %s:%s %s %s', fct.__module__, fct.__name__, args, kwargs)
|
||||
return await fct(
|
||||
req, *args,
|
||||
**{
|
||||
|
|
|
@ -2,12 +2,14 @@
|
|||
"""
|
||||
Base ACL module that contains generic functions for domains ACL
|
||||
"""
|
||||
import inspect
|
||||
from functools import wraps
|
||||
from json import JSONDecodeError
|
||||
from starlette.authentication import UnauthenticatedUser
|
||||
from starlette.exceptions import HTTPException
|
||||
|
||||
from ..logging import logger
|
||||
from .constants import ROUTER_ACLS_SCHEMA
|
||||
|
||||
def public(*args, **kwargs) -> bool:
|
||||
"Unlimited access"
|
||||
|
@ -122,3 +124,33 @@ ACLS = (
|
|||
('private', public.__doc__, 0),
|
||||
('public', public.__doc__, 999)
|
||||
)
|
||||
|
||||
"""
|
||||
acl_spec : {
|
||||
'acl':acl.public,
|
||||
'args': {
|
||||
'required': set(),
|
||||
'optional': set()
|
||||
}
|
||||
'out': set()
|
||||
}
|
||||
"""
|
||||
|
||||
def ACL(specs):
|
||||
ROUTER_ACLS_SCHEMA.validate(specs)
|
||||
|
||||
def decorator(fct):
|
||||
fct_specs = inspect.getfullargspec(fct)
|
||||
if '__acls' in fct_specs.args:
|
||||
raise Exception("Do not name an argument '__acls' when you use this decorator")
|
||||
elif '__acls' in fct_specs.kwonlyargs:
|
||||
raise Exception("Do not name a keyword argument '__acls' when you use this decorator")
|
||||
|
||||
@wraps(fct)
|
||||
def caller(__acls=specs, *args, **kwargs):
|
||||
print(f'@ACL ARGS: {args} KWARGS: {kwargs}')
|
||||
return fct(*args, **kwargs)
|
||||
|
||||
return caller
|
||||
|
||||
return decorator
|
||||
|
|
|
@ -52,19 +52,21 @@ class NoDomainsException(Exception):
|
|||
"""
|
||||
pass
|
||||
|
||||
def route_decorator(fct: FunctionType, ret_type: str = 'json') -> Coroutine:
|
||||
def route_decorator(fct: FunctionType) -> Coroutine:
|
||||
""" Returns an async function that can be mounted on a router
|
||||
"""
|
||||
@wraps(fct)
|
||||
@acl.args_check
|
||||
async def wrapped(request, *args, **kwargs):
|
||||
fct_args_spec = inspect.getfullargspec(fct).args
|
||||
fct_kwargs_spec = inspect.getfullargspec(fct).kwonlydefaults
|
||||
fct_args_defaults = inspect.getfullargspec(fct).defaults or []
|
||||
fct_args_defaults_dict = {}
|
||||
for i in range(len(fct_args_defaults)):
|
||||
fct_args_defaults_dict[fct_args_spec[-i]] = fct_args_defaults[-i]
|
||||
|
||||
fct_args = request.path_params.copy()
|
||||
print(f'ROUTE_DECORATOR {fct_args_spec} {fct_kwargs_spec}')
|
||||
|
||||
if 'halfapi' in fct_args_spec:
|
||||
fct_args['halfapi'] = {
|
||||
|
@ -85,6 +87,8 @@ def route_decorator(fct: FunctionType, ret_type: str = 'json') -> Coroutine:
|
|||
"""
|
||||
if 'ret_type' in fct_args_defaults_dict:
|
||||
ret_type = fct_args_defaults_dict['ret_type']
|
||||
elif fct_kwargs_spec and 'ret_type' in fct_kwargs_spec:
|
||||
ret_type = fct_kwargs_spec['ret_type']
|
||||
else:
|
||||
ret_type = fct_args.get('data', {}).get('format', 'json')
|
||||
|
||||
|
@ -128,6 +132,7 @@ def route_decorator(fct: FunctionType, ret_type: str = 'json') -> Coroutine:
|
|||
except Exception as exc:
|
||||
# TODO: Write tests
|
||||
if not isinstance(exc, HTTPException):
|
||||
print(exc)
|
||||
raise HTTPException(500) from exc
|
||||
raise exc
|
||||
|
||||
|
|
|
@ -23,7 +23,7 @@ import yaml
|
|||
|
||||
# from .domain import gen_router_routes, domain_acls, route_decorator, domain_schema
|
||||
from .responses import ORJSONResponse
|
||||
from .acl import args_check
|
||||
from .acl import args_check, public, ACL
|
||||
from ..half_route import HalfRoute
|
||||
from . import acl
|
||||
|
||||
|
@ -33,7 +33,7 @@ class DomainNotFoundError(Exception):
|
|||
""" Exception when a domain is not importable
|
||||
"""
|
||||
|
||||
def JSONRoute(data: Any) -> Coroutine:
|
||||
def JSONRoute(data: Any, acls=[{'acl': public}]) -> Coroutine:
|
||||
"""
|
||||
Returns a route function that returns the data as JSON
|
||||
|
||||
|
@ -44,11 +44,7 @@ def JSONRoute(data: Any) -> Coroutine:
|
|||
Returns:
|
||||
async function
|
||||
"""
|
||||
async def wrapped(request, *args, **kwargs):
|
||||
return ORJSONResponse(data)
|
||||
|
||||
return wrapped
|
||||
|
||||
pass
|
||||
|
||||
def gen_domain_routes(m_domain: ModuleType):
|
||||
"""
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
from halfapi.lib import acl
|
||||
from halfapi.lib.acl import public, private
|
||||
from halfapi.lib.acl import public, private, ACL
|
||||
from random import randint
|
||||
|
||||
def random(*args):
|
||||
|
|
|
@ -1,20 +1,15 @@
|
|||
from halfapi.lib import acl
|
||||
from halfapi.lib.acl import public, ACL
|
||||
from halfapi.lib.responses import ORJSONResponse
|
||||
ACLS = {
|
||||
'GET': [{'acl':acl.public}],
|
||||
'POST': [{'acl':acl.public}],
|
||||
'PATCH': [{'acl':acl.public}],
|
||||
'PUT': [{'acl':acl.public}],
|
||||
'DELETE': [{'acl':acl.public}]
|
||||
}
|
||||
|
||||
async def get(test):
|
||||
"""
|
||||
description:
|
||||
returns the path parameter
|
||||
"""
|
||||
return ORJSONResponse(str(test))
|
||||
# @ACL([{'acl':public}])
|
||||
# async def get(test):
|
||||
# """
|
||||
# description:
|
||||
# returns the path parameter
|
||||
# """
|
||||
# return ORJSONResponse(str(test))
|
||||
|
||||
@ACL([{'acl':public}])
|
||||
def post(test):
|
||||
"""
|
||||
description:
|
||||
|
@ -22,6 +17,7 @@ def post(test):
|
|||
"""
|
||||
return str(test)
|
||||
|
||||
@ACL([{'acl':public}])
|
||||
def patch(test):
|
||||
"""
|
||||
description:
|
||||
|
@ -29,6 +25,7 @@ def patch(test):
|
|||
"""
|
||||
return str(test)
|
||||
|
||||
@ACL([{'acl':public}])
|
||||
def put(test):
|
||||
"""
|
||||
description:
|
||||
|
@ -36,6 +33,7 @@ def put(test):
|
|||
"""
|
||||
return str(test)
|
||||
|
||||
@ACL([{'acl':public}])
|
||||
def delete(test):
|
||||
"""
|
||||
description:
|
||||
|
|
|
@ -1,10 +1,7 @@
|
|||
from starlette.responses import PlainTextResponse
|
||||
from halfapi.lib import acl
|
||||
|
||||
ACLS = {
|
||||
'GET': [{'acl':acl.public}]
|
||||
}
|
||||
from halfapi.lib.acl import ACL, public
|
||||
|
||||
@ACL([{'acl':public}])
|
||||
async def get(request, *args, **kwargs):
|
||||
"""
|
||||
responses:
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
from halfapi.lib import acl
|
||||
ACLS = {
|
||||
'GET' : [{'acl':acl.public}]
|
||||
}
|
||||
|
||||
@acl.ACL([{'acl':acl.public}])
|
||||
def get():
|
||||
"""
|
||||
description:
|
||||
|
|
|
@ -1,60 +1,32 @@
|
|||
from ... import acl
|
||||
from halfapi.lib.acl import ACL
|
||||
from halfapi.logging import logger
|
||||
|
||||
ACLS = {
|
||||
'GET' : [
|
||||
{
|
||||
'acl':acl.public,
|
||||
'args': {
|
||||
'required': {
|
||||
'foo', 'bar'
|
||||
},
|
||||
'optional': {
|
||||
'x'
|
||||
}
|
||||
@ACL([
|
||||
{
|
||||
'acl':acl.public,
|
||||
'args': {
|
||||
'required': {
|
||||
'foo', 'bar'
|
||||
},
|
||||
'optional': {
|
||||
'x'
|
||||
}
|
||||
|
||||
},
|
||||
{
|
||||
'acl':acl.random,
|
||||
'args': {
|
||||
'required': {
|
||||
'foo', 'baz'
|
||||
},
|
||||
'optional': {
|
||||
'truebidoo'
|
||||
}
|
||||
}
|
||||
|
||||
},
|
||||
{
|
||||
'acl':acl.random,
|
||||
'args': {
|
||||
'required': {
|
||||
'foo', 'baz'
|
||||
},
|
||||
'optional': {
|
||||
'truebidoo'
|
||||
}
|
||||
},
|
||||
],
|
||||
'POST' : [
|
||||
{
|
||||
'acl':acl.private,
|
||||
'args': {
|
||||
'required': {
|
||||
'foo', 'bar'
|
||||
},
|
||||
'optional': {
|
||||
'x'
|
||||
}
|
||||
}
|
||||
|
||||
},
|
||||
{
|
||||
'acl':acl.public,
|
||||
'args': {
|
||||
'required': {
|
||||
'foo', 'baz'
|
||||
},
|
||||
'optional': {
|
||||
'truebidoo'
|
||||
}
|
||||
}
|
||||
},
|
||||
]
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
},
|
||||
])
|
||||
def get(halfapi, data):
|
||||
"""
|
||||
description:
|
||||
|
@ -63,6 +35,31 @@ def get(halfapi, data):
|
|||
logger.error('%s', data['foo'])
|
||||
return {'foo': data['foo'], 'bar': data['bar']}
|
||||
|
||||
@ACL([
|
||||
{
|
||||
'acl':acl.private,
|
||||
'args': {
|
||||
'required': {
|
||||
'foo', 'bar'
|
||||
},
|
||||
'optional': {
|
||||
'x'
|
||||
}
|
||||
}
|
||||
|
||||
},
|
||||
{
|
||||
'acl':acl.public,
|
||||
'args': {
|
||||
'required': {
|
||||
'foo', 'baz'
|
||||
},
|
||||
'optional': {
|
||||
'truebidoo'
|
||||
}
|
||||
}
|
||||
},
|
||||
])
|
||||
def post(halfapi, data):
|
||||
"""
|
||||
description:
|
||||
|
|
|
@ -1,50 +1,2 @@
|
|||
from halfapi.lib.responses import ORJSONResponse, NotImplementedResponse
|
||||
from ... import acl
|
||||
|
||||
ROUTES = {
|
||||
'abc/alphabet/{test:uuid}': {
|
||||
'GET': [{'acl': acl.public}]
|
||||
},
|
||||
'abc/pinnochio': {
|
||||
'GET': [{'acl': acl.public}]
|
||||
},
|
||||
'config': {
|
||||
'GET': [{'acl': acl.public}]
|
||||
},
|
||||
'arguments': {
|
||||
'GET': [{
|
||||
'acl': acl.public,
|
||||
'args': {
|
||||
'required': {'foo', 'bar'},
|
||||
'optional': set()
|
||||
}
|
||||
}]
|
||||
},
|
||||
}
|
||||
|
||||
async def get_abc_alphabet_TEST(request, *args, **kwargs):
|
||||
"""
|
||||
description: Not implemented
|
||||
"""
|
||||
return NotImplementedResponse()
|
||||
|
||||
async def get_abc_pinnochio(request, *args, **kwargs):
|
||||
"""
|
||||
description: Not implemented
|
||||
"""
|
||||
return NotImplementedResponse()
|
||||
|
||||
async def get_config(request, *args, **kwargs):
|
||||
"""
|
||||
description: Not implemented
|
||||
"""
|
||||
return NotImplementedResponse()
|
||||
|
||||
async def get_arguments(request, *args, **kwargs):
|
||||
"""
|
||||
description: Liste des datatypes.
|
||||
"""
|
||||
return ORJSONResponse({
|
||||
'foo': kwargs.get('data').get('foo'),
|
||||
'bar': kwargs.get('data').get('bar')
|
||||
})
|
||||
""" Disabled in v0.7
|
||||
"""
|
||||
|
|
|
@ -1,13 +1,11 @@
|
|||
from halfapi.lib.acl import ACL
|
||||
from ... import acl
|
||||
from halfapi.logging import logger
|
||||
|
||||
ACLS = {
|
||||
'GET' : [
|
||||
@ACL([
|
||||
{'acl':acl.public},
|
||||
{'acl':acl.random},
|
||||
]
|
||||
}
|
||||
|
||||
])
|
||||
def get(halfapi):
|
||||
"""
|
||||
description:
|
||||
|
|
|
@ -1,13 +1,13 @@
|
|||
from halfapi.lib import acl
|
||||
|
||||
ACLS = {
|
||||
'GET': [{'acl':acl.public}]
|
||||
}
|
||||
from halfapi.lib.acl import ACL, public
|
||||
|
||||
@ACL([{
|
||||
'acl': public
|
||||
}])
|
||||
def get(ret_type='html'):
|
||||
"""
|
||||
responses:
|
||||
200:
|
||||
description: dummy abc.alphabet route
|
||||
"""
|
||||
print(f'ARGS : {ret_type}')
|
||||
return '\n'.join(('trololo', '', 'ololotr'))
|
||||
|
|
|
@ -9,6 +9,7 @@ def test_acl_Check(dummy_app, token_debug_false_builder):
|
|||
A request with ?check should always return a 200 status code
|
||||
"""
|
||||
|
||||
@pytest.mark.skip
|
||||
@HalfRoute.acl_decorator(params=[{'acl':acl.public}])
|
||||
async def test_route_public(request, **kwargs):
|
||||
raise Exception('Should not raise')
|
||||
|
@ -20,6 +21,7 @@ def test_acl_Check(dummy_app, token_debug_false_builder):
|
|||
resp = test_client.get('/test_public?check')
|
||||
assert resp.status_code == 200
|
||||
|
||||
@pytest.mark.skip
|
||||
@HalfRoute.acl_decorator(params=[{'acl':acl.private}])
|
||||
async def test_route_private(request, **kwargs):
|
||||
raise Exception('Should not raise')
|
||||
|
|
|
@ -9,11 +9,13 @@ class TestDummyDomain(TestDomain):
|
|||
ROUTERS = __routers__
|
||||
ACL = '.acl'
|
||||
|
||||
"""
|
||||
def test_domain(self):
|
||||
self.check_domain()
|
||||
|
||||
def test_routes(self):
|
||||
self.check_routes()
|
||||
"""
|
||||
|
||||
def test_html_route(self):
|
||||
res = self.client.get('/ret_type')
|
||||
|
|
Loading…
Reference in New Issue