[middleware] put AclCallerMiddleware in lib file

This commit is contained in:
Maxime Alves LIRMM@home 2020-07-08 17:06:47 +02:00
parent 17261f7da9
commit 3fda051180
3 changed files with 140 additions and 145 deletions

1
.gitignore vendored
View File

@ -14,7 +14,6 @@ dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/

View File

@ -6,13 +6,10 @@ from os import environ
# asgi framework
from starlette.applications import Starlette
from starlette.exceptions import HTTPException
from starlette.middleware import Middleware
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
from starlette.responses import Response, JSONResponse
from starlette.routing import Route, Match, Mount
from starlette.types import ASGIApp, Receive, Scope, Send
from starlette.types import ASGIApp
from starlette.middleware.authentication import AuthenticationMiddleware
# typing
@ -20,149 +17,12 @@ from typing import Any, Awaitable, Callable, MutableMapping
RequestResponseEndpoint = Callable[ [Request], Awaitable[Response] ]
# hop-generated classes
from .models.api.acl_function import AclFunction
from .models.api.domain import Domain
from .models.api.route import Route
from .models.api.view.acl import Acl as AclView
from .models.api.view.route import Route as RouteView
# module libraries
from .lib.responses import ForbiddenResponse, NotFoundResponse
from .lib.responses import *
from .lib.jwt_middleware import JWTAuthenticationBackend
def match_route(app: ASGIApp, scope: Scope):
""" Checks all routes from "app" and checks if it matches with the one from
scope
Parameters:
- app (ASGIApp): The Starlette instance
- scope (MutableMapping[str, Any]): The requests scope
Returns:
- (dict, dict): The first dict of the tuple is the details on the
route, the second one is the path parameters
Raises:
HTTPException
"""
""" The *result* variable is fitted to the filter that will be applied when
searching the route in the database.
Refer to the database documentation for more details on the api.route
table.
"""
result = {
'domain': None,
'name': None,
'http_verb': None,
'version': None
}
try:
""" Identification of the parts of the path
Examples :
version : v4
domain : organigramme
path : laboratoire/personnel
"""
_, result['domain'], path = scope['path'].split('/', 2)
except ValueError as e:
#404 Not found
raise HTTPException(404)
# Prefix the path with "/"
path = f'/{path}'
for route in app.routes:
if type(route) != Mount:
""" The root app should not have exposed routes,
only the mounted domains have some.
"""
continue
""" Clone the scope to assign the path to the path without the
matching domain, be careful to the "root_path" of the mounted domain.
@TODO
Also, improper array unpacking may make crash the program without any
explicit error, we may have to improve this as we only rely on this
function to accomplish all the routing
"""
subscope = scope.copy()
_, result['domain'], subpath = path.split('/', 2)
subscope['path'] = f'/{subpath}'
for mount_route in route.routes:
# Parse all domain routes
submatch = mount_route.matches(subscope)
if submatch[0] != Match.FULL:
continue
# Route matches
try:
result['name'] = submatch[1]['endpoint'].__name__
result['http_verb'] = scope['method']
except Exception as e:
print(e)
return result, submatch[1]['path_params']
raise HTTPException(404)
class AclCallerMiddleware(BaseHTTPMiddleware):
async def __call__(self, scope:Scope, receive: Receive, send: Send) -> None:
""" Points out to the domain which ACL function it should call
Parameters :
- request (Request): The current request
- call_next (RequestResponseEndpoint): The next middleware/route function
Return:
Response
"""
print('Hit AclCallerMiddleware of API')
if scope['type'] != 'http':
await self.app(scope, receive, send)
return
if scope['path'].split('/')[-1] not in ['docs','openapi.json','redoc']:
# routes in the the database, the others being
# docs/openapi.json/redoc
try:
d_match, path_params = match_route(app, scope)
except HTTPException:
return NotFoundResponse()
except Exception as e:
raise e
d_match, path_params = match_route(app, scope)
try:
scope['acls'] = []
for acl in AclView(**d_match).select():
# retrieve related ACLs
if ('acl_function_name' not in acl.keys()
or 'domain' not in acl.keys()):
continue
scope['acls'].append(acl['acl_function_name'])
except StopIteration:
# TODO : No ACL sur une route existante, prevenir l'admin?
print("No ACL")
pass
return await self.app(scope, receive, send)
from .lib.acl_caller_middleware import AclCallerMiddleware
def mount_domains(app: ASGIApp, domains: list):
@ -240,5 +100,5 @@ app = Starlette(
Middleware(AuthenticationMiddleware, backend=JWTAuthenticationBackend(secret_key=environ.get('HALFORM_SECRET'))),
Middleware(AclCallerMiddleware),
],
on_startup=[startup],
on_startup=[startup]
)

View File

@ -0,0 +1,136 @@
#!/usr/bin/env python3
from starlette.exceptions import HTTPException
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.routing import Match, Mount
from starlette.types import ASGIApp, Receive, Scope, Send
from .models.api.view.acl import Acl as AclView
def match_route(scope: Scope):
""" Checks all routes from "app" and checks if it matches with the one from
scope
Parameters:
- app (ASGIApp): The Starlette instance
- scope (MutableMapping[str, Any]): The requests scope
Returns:
- (dict, dict): The first dict of the tuple is the details on the
route, the second one is the path parameters
Raises:
HTTPException
"""
""" The *result* variable is fitted to the filter that will be applied when
searching the route in the database.
Refer to the database documentation for more details on the api.route
table.
"""
from halfapi.app import app
result = {
'domain': None,
'name': None,
'http_verb': None,
'version': None
}
try:
""" Identification of the parts of the path
Examples :
version : v4
domain : organigramme
path : laboratoire/personnel
"""
_, result['domain'], path = scope['path'].split('/', 2)
except ValueError as e:
#404 Not found
raise HTTPException(404)
# Prefix the path with "/"
path = f'/{path}'
for route in app.routes:
if type(route) != Mount:
""" The root app should not have exposed routes,
only the mounted domains have some.
"""
continue
""" Clone the scope to assign the path to the path without the
matching domain, be careful to the "root_path" of the mounted domain.
@TODO
Also, improper array unpacking may make crash the program without any
explicit error, we may have to improve this as we only rely on this
function to accomplish all the routing
"""
subscope = scope.copy()
_, result['domain'], subpath = path.split('/', 2)
subscope['path'] = f'/{subpath}'
for mount_route in route.routes:
# Parse all domain routes
submatch = mount_route.matches(subscope)
if submatch[0] != Match.FULL:
continue
# Route matches
try:
result['name'] = submatch[1]['endpoint'].__name__
result['http_verb'] = scope['method']
except Exception as e:
print(e)
return result, submatch[1]['path_params']
raise HTTPException(404)
class AclCallerMiddleware(BaseHTTPMiddleware):
async def __call__(self, scope:Scope, receive: Receive, send: Send) -> None:
""" Points out to the domain which ACL function it should call
Parameters :
- request (Request): The current request
- call_next (RequestResponseEndpoint): The next middleware/route function
Return:
Response
"""
print('Hit AclCallerMiddleware of API')
if scope['type'] != 'http':
await self.app(scope, receive, send)
return
if scope['path'].split('/')[-1] not in ['docs','openapi.json','redoc']:
# routes in the the database, the others being
# docs/openapi.json/redoc
d_match, path_params = match_route(scope)
try:
scope['acls'] = []
for acl in AclView(**d_match).select():
# retrieve related ACLs
if ('acl_function_name' not in acl.keys()
or 'domain' not in acl.keys()):
continue
scope['acls'].append(acl['acl_function_name'])
except StopIteration:
# TODO : No ACL sur une route existante, prevenir l'admin?
print("No ACL")
pass
return await self.app(scope, receive, send)