From 3fda051180c33bcd0e16f647d29f138f67599378 Mon Sep 17 00:00:00 2001 From: "Maxime Alves LIRMM@home" Date: Wed, 8 Jul 2020 17:06:47 +0200 Subject: [PATCH] [middleware] put AclCallerMiddleware in lib file --- .gitignore | 1 - halfapi/app.py | 148 +-------------------------- halfapi/lib/acl_caller_middleware.py | 136 ++++++++++++++++++++++++ 3 files changed, 140 insertions(+), 145 deletions(-) create mode 100644 halfapi/lib/acl_caller_middleware.py diff --git a/.gitignore b/.gitignore index c9f3862..a90298f 100644 --- a/.gitignore +++ b/.gitignore @@ -14,7 +14,6 @@ dist/ downloads/ eggs/ .eggs/ -lib/ lib64/ parts/ sdist/ diff --git a/halfapi/app.py b/halfapi/app.py index 164c11e..040ea35 100644 --- a/halfapi/app.py +++ b/halfapi/app.py @@ -6,13 +6,10 @@ from os import environ # asgi framework from starlette.applications import Starlette -from starlette.exceptions import HTTPException from starlette.middleware import Middleware -from starlette.middleware.base import BaseHTTPMiddleware from starlette.requests import Request from starlette.responses import Response, JSONResponse -from starlette.routing import Route, Match, Mount -from starlette.types import ASGIApp, Receive, Scope, Send +from starlette.types import ASGIApp from starlette.middleware.authentication import AuthenticationMiddleware # typing @@ -20,149 +17,12 @@ from typing import Any, Awaitable, Callable, MutableMapping RequestResponseEndpoint = Callable[ [Request], Awaitable[Response] ] # hop-generated classes -from .models.api.acl_function import AclFunction from .models.api.domain import Domain -from .models.api.route import Route -from .models.api.view.acl import Acl as AclView -from .models.api.view.route import Route as RouteView # module libraries -from .lib.responses import ForbiddenResponse, NotFoundResponse +from .lib.responses import * from .lib.jwt_middleware import JWTAuthenticationBackend - -def match_route(app: ASGIApp, scope: Scope): - """ Checks all routes from "app" and checks if it matches with the one from - scope - - Parameters: - - - app (ASGIApp): The Starlette instance - - scope (MutableMapping[str, Any]): The requests scope - - Returns: - - - (dict, dict): The first dict of the tuple is the details on the - route, the second one is the path parameters - - Raises: - - HTTPException - """ - - """ The *result* variable is fitted to the filter that will be applied when - searching the route in the database. - Refer to the database documentation for more details on the api.route - table. - """ - result = { - 'domain': None, - 'name': None, - 'http_verb': None, - 'version': None - } - - try: - """ Identification of the parts of the path - - Examples : - version : v4 - domain : organigramme - path : laboratoire/personnel - """ - _, result['domain'], path = scope['path'].split('/', 2) - except ValueError as e: - #404 Not found - raise HTTPException(404) - - # Prefix the path with "/" - path = f'/{path}' - - for route in app.routes: - - if type(route) != Mount: - """ The root app should not have exposed routes, - only the mounted domains have some. - """ - continue - - """ Clone the scope to assign the path to the path without the - matching domain, be careful to the "root_path" of the mounted domain. - - @TODO - Also, improper array unpacking may make crash the program without any - explicit error, we may have to improve this as we only rely on this - function to accomplish all the routing - """ - subscope = scope.copy() - _, result['domain'], subpath = path.split('/', 2) - subscope['path'] = f'/{subpath}' - - for mount_route in route.routes: - # Parse all domain routes - submatch = mount_route.matches(subscope) - if submatch[0] != Match.FULL: - continue - - # Route matches - try: - result['name'] = submatch[1]['endpoint'].__name__ - result['http_verb'] = scope['method'] - except Exception as e: - print(e) - - return result, submatch[1]['path_params'] - - raise HTTPException(404) - - -class AclCallerMiddleware(BaseHTTPMiddleware): - async def __call__(self, scope:Scope, receive: Receive, send: Send) -> None: - """ Points out to the domain which ACL function it should call - - Parameters : - - - request (Request): The current request - - - call_next (RequestResponseEndpoint): The next middleware/route function - - Return: - Response - """ - print('Hit AclCallerMiddleware of API') - if scope['type'] != 'http': - await self.app(scope, receive, send) - return - - - if scope['path'].split('/')[-1] not in ['docs','openapi.json','redoc']: - # routes in the the database, the others being - # docs/openapi.json/redoc - try: - d_match, path_params = match_route(app, scope) - except HTTPException: - return NotFoundResponse() - except Exception as e: - raise e - - d_match, path_params = match_route(app, scope) - - try: - scope['acls'] = [] - for acl in AclView(**d_match).select(): - # retrieve related ACLs - - if ('acl_function_name' not in acl.keys() - or 'domain' not in acl.keys()): - continue - - scope['acls'].append(acl['acl_function_name']) - - except StopIteration: - # TODO : No ACL sur une route existante, prevenir l'admin? - print("No ACL") - pass - - return await self.app(scope, receive, send) +from .lib.acl_caller_middleware import AclCallerMiddleware def mount_domains(app: ASGIApp, domains: list): @@ -240,5 +100,5 @@ app = Starlette( Middleware(AuthenticationMiddleware, backend=JWTAuthenticationBackend(secret_key=environ.get('HALFORM_SECRET'))), Middleware(AclCallerMiddleware), ], - on_startup=[startup], + on_startup=[startup] ) diff --git a/halfapi/lib/acl_caller_middleware.py b/halfapi/lib/acl_caller_middleware.py new file mode 100644 index 0000000..3515642 --- /dev/null +++ b/halfapi/lib/acl_caller_middleware.py @@ -0,0 +1,136 @@ +#!/usr/bin/env python3 +from starlette.exceptions import HTTPException +from starlette.middleware.base import BaseHTTPMiddleware +from starlette.routing import Match, Mount +from starlette.types import ASGIApp, Receive, Scope, Send + +from .models.api.view.acl import Acl as AclView + +def match_route(scope: Scope): + """ Checks all routes from "app" and checks if it matches with the one from + scope + + Parameters: + + - app (ASGIApp): The Starlette instance + - scope (MutableMapping[str, Any]): The requests scope + + Returns: + + - (dict, dict): The first dict of the tuple is the details on the + route, the second one is the path parameters + + Raises: + + HTTPException + """ + + """ The *result* variable is fitted to the filter that will be applied when + searching the route in the database. + Refer to the database documentation for more details on the api.route + table. + """ + from halfapi.app import app + + result = { + 'domain': None, + 'name': None, + 'http_verb': None, + 'version': None + } + + try: + """ Identification of the parts of the path + + Examples : + version : v4 + domain : organigramme + path : laboratoire/personnel + """ + _, result['domain'], path = scope['path'].split('/', 2) + except ValueError as e: + #404 Not found + raise HTTPException(404) + + # Prefix the path with "/" + path = f'/{path}' + + for route in app.routes: + + if type(route) != Mount: + """ The root app should not have exposed routes, + only the mounted domains have some. + """ + continue + + """ Clone the scope to assign the path to the path without the + matching domain, be careful to the "root_path" of the mounted domain. + + @TODO + Also, improper array unpacking may make crash the program without any + explicit error, we may have to improve this as we only rely on this + function to accomplish all the routing + """ + subscope = scope.copy() + _, result['domain'], subpath = path.split('/', 2) + subscope['path'] = f'/{subpath}' + + for mount_route in route.routes: + # Parse all domain routes + submatch = mount_route.matches(subscope) + if submatch[0] != Match.FULL: + continue + + # Route matches + try: + result['name'] = submatch[1]['endpoint'].__name__ + result['http_verb'] = scope['method'] + except Exception as e: + print(e) + + return result, submatch[1]['path_params'] + + raise HTTPException(404) + + +class AclCallerMiddleware(BaseHTTPMiddleware): + async def __call__(self, scope:Scope, receive: Receive, send: Send) -> None: + """ Points out to the domain which ACL function it should call + + Parameters : + + - request (Request): The current request + + - call_next (RequestResponseEndpoint): The next middleware/route function + + Return: + Response + """ + print('Hit AclCallerMiddleware of API') + if scope['type'] != 'http': + await self.app(scope, receive, send) + return + + + if scope['path'].split('/')[-1] not in ['docs','openapi.json','redoc']: + # routes in the the database, the others being + # docs/openapi.json/redoc + d_match, path_params = match_route(scope) + + try: + scope['acls'] = [] + for acl in AclView(**d_match).select(): + # retrieve related ACLs + + if ('acl_function_name' not in acl.keys() + or 'domain' not in acl.keys()): + continue + + scope['acls'].append(acl['acl_function_name']) + + except StopIteration: + # TODO : No ACL sur une route existante, prevenir l'admin? + print("No ACL") + pass + + return await self.app(scope, receive, send)