[clena] nettoyage des fonctions non utilisées

This commit is contained in:
Maxime Alves LIRMM 2022-03-16 13:09:50 +01:00
parent 988a1e5bab
commit 568aea9ea8
11 changed files with 33 additions and 389 deletions

View File

@ -11,6 +11,8 @@ pylint = "*"
build = "*"
pytest-pythonpath = "*"
twine = "*"
pyflakes = "*"
vulture = "*"
[packages]
click = ">=7.1,<8"

View File

@ -13,12 +13,10 @@ import orjson
from .cli import cli
from ..conf import write_config, CONFIG
from ..conf import CONFIG
from ..half_domain import HalfDomain
from ..lib.domain import domain_schema
from ..lib.schemas import schema_dict_dom
from ..lib.routes import api_routes
from ..lib.responses import ORJSONResponse

View File

@ -17,8 +17,6 @@ from ..logging import logger
# from ..lib.domain import domain_schema_dict
from ..lib.constants import DOMAIN_SCHEMA, ROUTE_SCHEMA
from ..lib.responses import ORJSONResponse
# from ..lib.routes import api_routes
from ..lib.schemas import schema_to_csv # get_api_routes
@click.argument('module', required=True)
@click.option('--export', default=False, is_flag=True)

View File

@ -46,7 +46,6 @@ import uuid
import toml
from .lib.domain import d_domains
from .logging import logger
PRODUCTION = True
@ -77,23 +76,6 @@ try:
except FileNotFoundError:
logger.error('Cannot find a configuration file under %s', HALFAPI_DOT_FILE)
def conf_files():
return [
os.path.join(
CONF_DIR, 'default.ini'
),
os.path.join(
os.getcwd(), '.halfapi', 'config')]
def write_config():
"""
Writes the current config to the highest priority config file
"""
# with open(conf_files()[-1], 'w') as halfapi_config:
# config.write(halfapi_config)
pass
def read_config():
"""

View File

@ -136,8 +136,9 @@ class HalfDomain(Starlette):
if not inspect.iscoroutinefunction(fct):
return route_decorator(fct), params
else:
return acl.args_check(fct), params
# TODO: Remove when using only sync functions
return acl.args_check(fct), params
@staticmethod

View File

@ -19,13 +19,13 @@ from .lib.domain import MissingAclError, PathError, UnknownPathParameterType, \
class HalfRoute(Route):
""" HalfRoute
"""
def __init__(self, path, fct, params, method):
def __init__(self, path: List[str], fct: Callable, params: List[Dict], method: str):
logger.info('HalfRoute creation: %s %s %s %s', path, fct, params, method)
if len(params) == 0:
raise MissingAclError('[{}] {}'.format(verb, '/'.join(path)))
raise MissingAclError('[{}] {}'.format(method, '/'.join(path)))
if len(path) == 0:
logger.error('Empty path for [{%s}]', verb)
logger.error('Empty path for [{%s}]', method)
raise PathError()
super().__init__(

View File

@ -38,7 +38,7 @@ from .lib.responses import (ORJSONResponse, UnauthorizedResponse,
ServiceUnavailableResponse)
from .lib.domain import NoDomainsException
from .lib.routes import gen_schema_routes, JSONRoute
from .lib.schemas import schema_json, get_acls
from .lib.schemas import schema_json
from .logging import logger, config_logging
from .half_domain import HalfDomain
from halfapi import __version__

View File

@ -3,7 +3,6 @@
lib/domain.py The domain-scoped utility functions
"""
import os
import re
import sys
import importlib
@ -11,7 +10,7 @@ import inspect
from functools import wraps
from types import ModuleType, FunctionType
from typing import Coroutine, Generator
from typing import Dict, List, Tuple, Iterator
from typing import Dict, List, Tuple
import yaml
from starlette.exceptions import HTTPException
@ -24,18 +23,28 @@ from halfapi.lib.constants import VERBS
from ..logging import logger
class MissingAclError(Exception):
""" Exception to use when no acl are specified for a route
"""
pass
class PathError(Exception):
""" Exception to use when the path for a route is malformed
"""
pass
class UnknownPathParameterType(Exception):
""" Exception to use when the path parameter for a route is not supported
"""
pass
class UndefinedRoute(Exception):
""" Exception to use when the route definition cannot be found
"""
pass
class UndefinedFunction(Exception):
""" Exception to use when a function definition cannot be found
"""
pass
class NoDomainsException(Exception):
@ -75,15 +84,17 @@ def route_decorator(fct: FunctionType, ret_type: str = 'json') -> Coroutine:
try:
if ret_type == 'json':
return ORJSONResponse(fct(**fct_args))
elif ret_type == 'ods':
if ret_type == 'ods':
res = fct(**fct_args)
assert isinstance(res, list)
for elt in res:
assert isinstance(elt, dict)
return ODSResponse(res)
else:
raise NotImplementedError
raise NotImplementedError
except NotImplementedError as exc:
raise HTTPException(501) from exc
except Exception as exc:
@ -139,252 +150,3 @@ def get_fct_name(http_verb: str, path: str) -> str:
return '_'.join(fct_name)
def gen_routes(m_router: ModuleType,
verb: str,
path: List[str],
params: List[Dict]) -> Tuple[FunctionType, Dict]:
"""
Returns a tuple of the function associatied to the verb and path arguments,
and the dictionary of it's acls
Parameters:
- m_router (ModuleType): The module containing the function definition
- verb (str): The HTTP verb for the route (GET, POST, ...)
- path (List): The route path, as a list (each item being a level of
deepness), from the lowest level (domain) to the highest
- params (Dict): The acl list of the following format :
[{'acl': Function, 'args': {'required': [], 'optional': []}}]
Returns:
(Function, Dict): The destination function and the acl dictionary
"""
if len(params) == 0:
raise MissingAclError('[{}] {}'.format(verb, '/'.join(path)))
if len(path) == 0:
logger.error('Empty path for [{%s}]', verb)
raise PathError()
fct_name = get_fct_name(verb, path[-1])
if hasattr(m_router, fct_name):
fct = getattr(m_router, fct_name)
else:
raise UndefinedFunction('{}.{}'.format(m_router.__name__, fct_name or ''))
if not inspect.iscoroutinefunction(fct):
return route_decorator(fct), params
else:
return acl.args_check(fct), params
# def gen_router_routes(m_router: ModuleType, path: List[str]) -> \
#  Iterator[Tuple[str, str, ModuleType, Coroutine, List]]:
#  """
#  Recursive generator that parses a router (or a subrouter)
#  and yields from gen_routes
# 
#  Parameters:
# 
#  - m_router (ModuleType): The currently treated router module
#  - path (List[str]): The current path stack
# 
#  Yields:
# 
#  (str, str, ModuleType, Coroutine, List): A tuple containing the path, verb,
#  router module, function reference and parameters of the route.
#  Function and parameters are yielded from then gen_routes function,
#  that decorates the endpoint function.
#  """
# 
#  for subpath, params in read_router(m_router).items():
#  path.append(subpath)
# 
#  for verb in VERBS:
#  if verb not in params:
#  continue
#  yield ('/'.join(filter(lambda x: len(x) > 0, path)),
#  verb,
#  m_router,
#  *gen_routes(m_router, verb, path, params[verb])
#  )
# 
#  for subroute in params.get('SUBROUTES', []):
#  #logger.debug('Processing subroute **%s** - %s', subroute, m_router.__name__)
#  param_match = re.fullmatch('^([A-Z_]+)_([a-z]+)$', subroute)
#  if param_match is not None:
#  try:
#  path.append('{{{}:{}}}'.format(
#  param_match.groups()[0].lower(),
#  param_match.groups()[1]))
#  except AssertionError as exc:
#  raise UnknownPathParameterType(subroute) from exc
#  else:
#  path.append(subroute)
# 
#  try:
#  yield from gen_router_routes(
#  importlib.import_module(f'.{subroute}', m_router.__name__),
#  path)
# 
#  except ImportError as exc:
#  logger.error('Failed to import subroute **{%s}**', subroute)
#  raise exc
# 
#  path.pop()
# 
#  path.pop()
# 
# def domain_schema_dict(m_router: ModuleType) -> Dict:
#  """ gen_router_routes return values as a dict
#  Parameters:
# 
#  m_router (ModuleType): The domain routers' module
# 
#  Returns:
# 
#  Dict: Schema of dict is halfapi.lib.constants.DOMAIN_SCHEMA
# 
#  @TODO: Should be a "router_schema_dict" function
#  """
#  d_res = {}
# 
#  for path, verb, m_router, fct, parameters in gen_router_routes(m_router, []):
#  if path not in d_res:
#  d_res[path] = {}
# 
#  if verb not in d_res[path]:
#  d_res[path][verb] = {}
# 
#  d_res[path][verb]['callable'] = f'{m_router.__name__}:{fct.__name__}'
#  try:
#  d_res[path][verb]['docs'] = yaml.safe_load(fct.__doc__)
#  except AttributeError:
#  logger.error(
#  'Cannot read docstring from fct (fct=%s path=%s verb=%s', fct.__name__, path, verb)
# 
#  d_res[path][verb]['acls'] = list(map(lambda elt: { **elt, 'acl': elt['acl'].__name__ },
#  parameters))
# 
#  return d_res
from .constants import API_SCHEMA_DICT
def domain_schema(m_domain: ModuleType) -> Dict:
schema = { **API_SCHEMA_DICT }
routers_submod_str = getattr(m_domain, '__routers__', '.routers')
m_domain_acl = importlib.import_module('.acl', m_domain.__package__)
m_domain_routers = importlib.import_module(
routers_submod_str, m_domain.__package__
)
schema['domain'] = {
'name': getattr(m_domain, '__name__'),
'version': getattr(m_domain, '__version__', ''),
'patch_release': getattr(m_domain, '__patch_release__', ''),
'routers': routers_submod_str,
'acls': tuple(getattr(m_domain_acl, 'ACLS', ()))
}
schema['paths'] = domain_schema_dict(m_domain_routers)
return schema
def domain_schema_list(m_router: ModuleType) -> List:
""" Schema as list, one row by route/acl
Parameters:
m_router (ModuleType): The domain routers' module
Returns:
List[Tuple]: (path, verb, callable, doc, acls)
"""
res = []
for path, verb, m_router, fct, parameters in gen_router_routes(m_router, []):
for params in parameters:
res.append((
path,
verb,
f'{m_router.__name__}:{fct.__name__}',
params.get('acl').__name__,
params.get('args', {}).get('required', []),
params.get('args', {}).get('optional', []),
params.get('out', [])
))
return res
def d_domains(config) -> Dict[str, ModuleType]:
"""
Parameters:
config (ConfigParser): The .halfapi/config based configparser object
Returns:
dict[str, ModuleType]
"""
domains = {}
if os.environ.get('HALFAPI_DOMAIN_NAME') and os.environ.get('HALFAPI_DOMAIN_MODULE', '.routers'):
domains[os.environ.get('HALFAPI_DOMAIN_NAME')] = os.environ.get('HALFAPI_DOMAIN_MODULE')
elif 'domains' in config:
domains = dict(config['domains'].items())
try:
return {
domain: importlib.import_module(''.join((domain, module)))
for domain, module in domains.items()
}
except ImportError as exc:
logger.error('Could not load a domain : %s', exc)
raise exc
def router_acls(route_params: Dict, path: List, m_router: ModuleType) -> Generator:
for verb in VERBS:
params = route_params.get(verb)
if params is None:
continue
if len(params) == 0:
logger.error('No ACL for route [{%s}] %s', verb, "/".join(path))
else:
for param in params:
fct_acl = param.get('acl')
if not isinstance(fct_acl, FunctionType):
continue
yield fct_acl.__name__, fct_acl
def domain_acls(m_router, path):
routes = read_router(m_router)
for subpath, route_params in routes.items():
path.append(subpath)
yield from router_acls(route_params, path, m_router)
subroutes = route_params.get('SUBROUTES', [])
for subroute in subroutes:
logger.debug('Processing subroute **%s** - %s', subroute, m_router.__name__)
path.append(subroute)
try:
submod = importlib.import_module(f'.{subroute}', m_router.__name__)
except ImportError as exc:
logger.error('Failed to import subroute **{%s}**', subroute)
raise exc
yield from domain_acls(submod, path)
path.pop()
path.pop()

View File

@ -81,8 +81,8 @@ class JWTAuthenticationBackend(AuthenticationBackend):
if token:
return AuthCredentials(), CheckUser(payload['user_id'])
else:
return AuthCredentials(), Nobody()
return AuthCredentials(), Nobody()
if PRODUCTION and 'debug' in payload.keys() and payload['debug']:

View File

@ -2,11 +2,13 @@
"""
Routes module
Classes :
- JSONRoute
Fonctions :
- gen_domain_routes
- gen_starlette_routes
- gen_schema_routes
- api_routes
- api_acls
Exception :
- DomainNotFoundError
@ -83,20 +85,6 @@ def gen_schema_routes(schema: Dict):
yield HalfRoute(path, args_check(fct), acls, verb)
def gen_starlette_routes(d_domains: Dict[str, ModuleType]) -> Generator:
"""
Yields the Route objects for HalfAPI app
Parameters:
d_domains (dict[str, ModuleType])
Returns:
Generator(Route)
"""
for domain_name, m_domain in d_domains.items():
yield from gen_domain_routes(m_domain)
def api_routes(m_dom: ModuleType) -> Tuple[Dict, Dict]:
"""
Yields the description objects for HalfAPI app routes
@ -146,28 +134,3 @@ def api_routes(m_dom: ModuleType) -> Tuple[Dict, Dict]:
raise exc
return d_res, d_acls
def api_acls(request):
""" Returns the list of possible ACLs
# TODO: Rewrite
"""
res = {}
domains = {}
doc = 'doc' in request.query_params
for domain, m_domain in domains.items():
res[domain] = {}
for acl_name, fct in domain_acls(m_domain, [domain]):
if not isinstance(fct, FunctionType):
continue
fct_result = fct.__doc__.strip() if doc and fct.__doc__ else fct(request)
if acl_name in res[domain]:
continue
if isinstance(fct_result, FunctionType):
fct_result = fct()(request)
res[domain][acl_name] = fct_result
return res

View File

@ -17,9 +17,8 @@ from types import ModuleType
from starlette.schemas import SchemaGenerator
from .. import __version__
# from .domain import gen_router_routes, domain_schema_list
from ..logging import logger
from .routes import gen_starlette_routes, api_routes, api_acls
from .routes import api_routes
from .responses import ORJSONResponse
SCHEMAS = SchemaGenerator(
@ -35,67 +34,6 @@ async def schema_json(request, *args, **kwargs):
SCHEMAS.get_schema(routes=request.app.routes))
def schema_dict_dom(d_domains: Dict[str, ModuleType]) -> Dict:
"""
Returns the API schema of the *m_domain* domain as a python dictionnary
Parameters:
d_domains (Dict[str, moduleType]): The module to scan for routes
Returns:
Dict: A dictionnary containing the description of the API using the
| OpenAPI standard
"""
return SCHEMAS.get_schema(
routes=list(gen_starlette_routes(d_domains)))
async def get_acls(request, *args, **kwargs):
"""
description: A dictionnary of the domains and their acls, with the
result of the acls functions
"""
return ORJSONResponse(api_acls(request))
def schema_to_csv(module_name, header=True) -> str:
"""
Returns a string composed where each line is a set of path, verb, function,
acl, required arguments, optional arguments and output variables. Those
lines should be unique in the result string;
"""
# retrieve module
m_router = importlib.import_module(module_name)
lines = []
if header:
lines.append([
'path',
'method',
'module:function',
'acl',
'args_required', 'args_optional',
'out'
])
for line in domain_schema_list(m_router):
lines.append([
line[0],
line[1],
line[2],
line[3],
','.join(line[4]),
','.join(line[5]),
','.join(line[6])
])
return '\n'.join(
[ ';'.join(fields) for fields in lines ]
)
def schema_csv_dict(csv: List[str], prefix='/') -> Dict:
package = None
schema_d = {}