[lib.domain] implement domain schema - try: halfapi domain dummy_domain

This commit is contained in:
Maxime Alves LIRMM@home 2021-12-01 13:31:46 +01:00
parent 1ccfa0d10e
commit 49c13c56ac
6 changed files with 64 additions and 42 deletions

View File

@ -9,13 +9,16 @@ import importlib
import subprocess
import click
import orjson
from .cli import cli
from ..conf import config, write_config, DOMAINSDICT
from ..lib.domain import domain_schema
from ..lib.schemas import schema_dict_dom
from ..lib.routes import api_routes
from ..lib.responses import ORJSONResponse
from ..logging import logger
@ -117,49 +120,39 @@ def list_api_routes():
list_routes(domain, m_dom)
@click.option('--read',default=False, is_flag=True)
@click.option('--read',default=True, is_flag=True)
@click.option('--create',default=False, is_flag=True)
@click.option('--update',default=False, is_flag=True)
@click.option('--delete',default=False, is_flag=True)
@click.option('--domains',default=None)
@click.argument('domain',default=None, required=False)
@cli.command()
def domain(domains, delete, update, create, read): #, domains, read, create, update, delete):
def domain(domain, delete, update, create, read): #, domains, read, create, update, delete):
"""
The "halfapi domain" command
Parameters:
domain (List[str]|None): The list of the domains to list/update
domain (str|None): The domain name
The parameter has a misleading name as it is a multiple option
but this would be strange to use it several times named as "domains"
update (boolean): If set, update the database for the selected domains
"""
dom_dict = DOMAINSDICT()
if not domains:
if not domain:
if create:
# TODO: Connect to the create_domain function
raise NotImplementedError
else:
dom_dict_ = {}
raise Exception('Missing domain name')
if update:
raise NotImplementedError
if delete:
raise NotImplementedError
if read:
m_domain = importlib.import_module(domain)
for domain_name in domains.split(','):
if domain_name in dom_dict.keys():
dom_dict_[domain_name] = dom_dict[domain_name]
continue
click.echo(orjson.dumps(
domain_schema(m_domain),
option=orjson.OPT_NON_STR_KEYS,
default=ORJSONResponse.default_cast)
)
click.echo(
f'Domain {domain_name}s is not activated in the configuration')
dom_dict = dom_dict_
for domain_name, m_dom in dom_dict.items():
if update:
raise NotImplementedError
if delete:
raise NotImplementedError
if read:
list_routes(domain_name, m_dom)

View File

@ -32,12 +32,11 @@ from timing_asgi.integrations import StarletteScopeToName
from .lib.constants import API_SCHEMA_DICT
from .lib.domain_middleware import DomainMiddleware
from .lib.timing import HTimingClient
from .lib.domain import NoDomainsException
from .lib.jwt_middleware import JWTAuthenticationBackend
from .lib.responses import (ORJSONResponse, UnauthorizedResponse,
NotFoundResponse, InternalServerErrorResponse, NotImplementedResponse,
ServiceUnavailableResponse)
from .lib.domain import domain_schema_dict
from .lib.domain import domain_schema_dict, NoDomainsException, domain_schema
from .lib.routes import gen_domain_routes, gen_schema_routes, JSONRoute
from .lib.schemas import schema_json, get_acls
from .logging import logger, config_logging
@ -55,7 +54,7 @@ class HalfAPI:
CONFIG = config.get('config', {})
domain = config.get('domain')['name']
router = config.get('domain')['router']
router = config.get('domain').get('router', None)
if not (domain and router):
raise NoDomainsException()
@ -66,22 +65,18 @@ class HalfAPI:
self.__application = None
if domain and router:
m_domain = m_domain_router = m_domain_acl = None
if domain:
m_domain = importlib.import_module(f'{domain}')
m_domain_router = importlib.import_module(f'{domain}.{router}')
if not router:
router = getattr('__router__', domain, '.routers')
m_domain_router = importlib.import_module(router)
m_domain_acl = importlib.import_module(f'{domain}.acl')
self.schema = { **API_SCHEMA_DICT }
self.schema['domain'] = {
'name': domain,
'version': getattr(m_domain, '__version__', ''),
'patch_release': getattr(m_domain, '__patch_release__', ''),
'acls': tuple(getattr(m_domain_acl, 'ACLS', ()))
}
self.schema['paths'] = domain_schema_dict(m_domain_router)
if not(m_domain and m_domain_router and m_domain_acl):
raise Exception('Cannot import domain')
self.schema = domain_schema(m_domain)
routes = [ Route('/', JSONRoute(self.schema)) ]

View File

@ -39,6 +39,7 @@ ROUTE_SCHEMA = Schema({
DOMAIN_SCHEMA = Schema({
'name': str,
Optional('routers'): str,
Optional('version'): str,
Optional('patch_release'): str,
Optional('acls'): [

View File

@ -230,6 +230,7 @@ def gen_router_routes(m_router: ModuleType, path: List[str]) -> \
path.pop()
def domain_schema_dict(m_router: ModuleType) -> Dict:
""" gen_router_routes return values as a dict
Parameters:
@ -239,6 +240,8 @@ def domain_schema_dict(m_router: ModuleType) -> Dict:
Returns:
Dict: Schema of dict is halfapi.lib.constants.DOMAIN_SCHEMA
@TODO: Should be a "router_schema_dict" function
"""
d_res = {}
@ -256,6 +259,24 @@ def domain_schema_dict(m_router: ModuleType) -> Dict:
return d_res
from .constants import API_SCHEMA_DICT
def domain_schema(m_domain: ModuleType) -> Dict:
schema = { **API_SCHEMA_DICT }
routers_submod_str = getattr(m_domain, '__routers__', '.routers')
m_domain_acl = importlib.import_module('.acl', m_domain.__package__)
m_domain_routers = importlib.import_module(
routers_submod_str, m_domain.__package__
)
schema['domain'] = {
'name': getattr(m_domain, '__name__'),
'version': getattr(m_domain, '__version__', ''),
'patch_release': getattr(m_domain, '__patch_release__', ''),
'routers': routers_submod_str,
'acls': tuple(getattr(m_domain_acl, 'ACLS', ()))
}
schema['paths'] = domain_schema_dict(m_domain_routers)
return schema
def domain_schema_list(m_router: ModuleType) -> List:
""" Schema as list, one row by route/acl
Parameters:

View File

@ -0,0 +1,5 @@
__name__ = 'dummy_domain'
__version__ = '0.0.0'
__patch_release__ = '0.0.0'
__routers__ = '.routers'

View File

@ -20,9 +20,16 @@ class TestCliProj():
def test_domain_commands(self, project_runner):
""" TODO: Test create command
"""
r = project_runner('domain')
print(r.stdout)
assert r.exit_code == 1
r = project_runner('domain dummy_domain')
print(r.stdout)
assert r.exit_code == 0
def test_config_commands(self, project_runner):
try:
r = project_runner('config')