uvicorn not optional
This commit is contained in:
parent
5280b45b8d
commit
b879840491
|
@ -1,10 +1,12 @@
|
|||
#!/usr/bin/env python3
|
||||
from functools import wraps
|
||||
""" Base ACL module that contains generic functions for domains ACL
|
||||
"""
|
||||
|
||||
def connected(func):
|
||||
""" Decorator that checks if the user object of the request has been set
|
||||
"""
|
||||
@wraps(func)
|
||||
def caller(req, *args, **kwargs):
|
||||
try:
|
||||
getattr(req.user, 'is_authenticated')
|
||||
|
|
|
@ -69,7 +69,7 @@ def match_route(app: ASGIApp, scope: Scope):
|
|||
domain : organigramme
|
||||
path : laboratoire/personnel
|
||||
"""
|
||||
_, result['version'], result['domain'], path = scope['path'].split('/', 3)
|
||||
_, result['domain'], path = scope['path'].split('/', 2)
|
||||
except ValueError as e:
|
||||
#404 Not found
|
||||
raise HTTPException(404)
|
||||
|
@ -78,10 +78,6 @@ def match_route(app: ASGIApp, scope: Scope):
|
|||
path = f'/{path}'
|
||||
|
||||
for route in app.routes:
|
||||
# Parse all routes
|
||||
match = route.matches(scope)
|
||||
if match[0] != Match.FULL:
|
||||
continue
|
||||
|
||||
if type(route) != Mount:
|
||||
""" The root app should not have exposed routes,
|
||||
|
@ -90,10 +86,16 @@ def match_route(app: ASGIApp, scope: Scope):
|
|||
continue
|
||||
|
||||
""" Clone the scope to assign the path to the path without the
|
||||
matching domain.
|
||||
matching domain, be careful to the "root_path" of the mounted domain.
|
||||
|
||||
@TODO
|
||||
Also, improper array unpacking may make crash the program without any
|
||||
explicit error, we may have to improve this as we only rely on this
|
||||
function to accomplish all the routing
|
||||
"""
|
||||
subscope = scope.copy()
|
||||
subscope['path'] = path
|
||||
_, result['domain'], subpath = path.split('/', 2)
|
||||
subscope['path'] = f'/{subpath}'
|
||||
|
||||
for mount_route in route.routes:
|
||||
# Parse all domain routes
|
||||
|
@ -102,8 +104,11 @@ def match_route(app: ASGIApp, scope: Scope):
|
|||
continue
|
||||
|
||||
# Route matches
|
||||
try:
|
||||
result['name'] = submatch[1]['endpoint'].__name__
|
||||
result['http_verb'] = scope['method']
|
||||
except Exception as e:
|
||||
print(e)
|
||||
|
||||
return result, submatch[1]['path_params']
|
||||
|
||||
|
@ -136,33 +141,22 @@ class AclCallerMiddleware(BaseHTTPMiddleware):
|
|||
d_match, path_params = match_route(app, scope)
|
||||
except HTTPException:
|
||||
return NotFoundResponse()
|
||||
except Exception as e:
|
||||
raise e
|
||||
|
||||
d_match, path_params = match_route(app, scope)
|
||||
|
||||
try:
|
||||
scope['acls'] = []
|
||||
"""
|
||||
for acl in AclView(**d_match).select():
|
||||
# retrieve related ACLs
|
||||
|
||||
if ('acl_function_name' not in acl.keys()
|
||||
or 'domain' not in acl.keys()):
|
||||
continue
|
||||
|
||||
scope['acls'].append(acl['acl_function_name'])
|
||||
acl_module = importlib.import_module(
|
||||
'.acl',
|
||||
'organigramme'
|
||||
)
|
||||
|
||||
try:
|
||||
acl_functions.append(
|
||||
getattr(acl_module.acl, acl_function_name))
|
||||
except AttributeError:
|
||||
|
||||
|
||||
if True: #function(AUTH, path_params):
|
||||
response = await call_next(request)
|
||||
break
|
||||
"""
|
||||
except StopIteration:
|
||||
# TODO : No ACL sur une route existante, prevenir l'admin?
|
||||
print("No ACL")
|
||||
|
@ -189,8 +183,12 @@ def mount_domains(app: Starlette, domains: list):
|
|||
|
||||
# Retrieve domain app according to domain details
|
||||
try:
|
||||
domain_app = importlib.import_module(
|
||||
f'{domain["name"]}.app').app
|
||||
print(f'Will import {domain["name"]}.app:app')
|
||||
#@TODO let the configuration come from the domain module - (or apidb)
|
||||
environ["HALFORM_DSN"] = "dbname=si user=si"
|
||||
domain_mod = importlib.import_module(
|
||||
f'{domain["name"]}.app')
|
||||
domain_app = domain_mod.app
|
||||
except ModuleNotFoundError:
|
||||
sys.stderr.write(
|
||||
f'Could not find module *{domain["name"]}* in sys.path\n')
|
||||
|
@ -198,6 +196,11 @@ def mount_domains(app: Starlette, domains: list):
|
|||
except ImportError:
|
||||
sys.stderr.write(f'Could not import *app* from *{domain}*')
|
||||
continue
|
||||
except Exception as e:
|
||||
sys.stderr.write(f'Error in import *{domain["name"]}*\n')
|
||||
print(e)
|
||||
continue
|
||||
|
||||
|
||||
# Alter the openapi_url so the /docs page doesn't try to get
|
||||
# /{name}/openapi.json (@TODO : retport the bug to FastAPI)
|
||||
|
@ -205,18 +208,21 @@ def mount_domains(app: Starlette, domains: list):
|
|||
|
||||
# Mount the domain app on the prefix
|
||||
# e.g. : /v4/organigramme
|
||||
app.mount('/{version}/{name}'.format(**domain), domain_app)
|
||||
try:
|
||||
app.mount('/{version}/{name}'.format(**domain), app=domain_app)
|
||||
except Exception as e:
|
||||
print(f'Failed to mount *{domain}*\n')
|
||||
|
||||
|
||||
def startup():
|
||||
global app
|
||||
# Mount the registered domains
|
||||
try:
|
||||
domains_list = [elt for elt in Domain().select()]
|
||||
mount_domains(app, domains_list)
|
||||
except Exception as e:
|
||||
sys.stderr.write('Error in the *domains* retrieval')
|
||||
sys.stderr.write(str(e))
|
||||
sys.exit(1)
|
||||
sys.stderr.write('Error in the *domains* retrieval\n')
|
||||
raise e
|
||||
|
||||
async def root(request):
|
||||
return JSONResponse({'payload': request.payload})
|
||||
|
|
Loading…
Reference in New Issue