[wip][testfail] multiple-domains app

This commit is contained in:
Maxime Alves LIRMM@home 2021-12-06 08:29:21 +01:00
parent 96f78e76c5
commit b4157c4a7d
19 changed files with 317 additions and 188 deletions

View File

@ -1,5 +1,5 @@
#!/usr/bin/env python3
__version__ = '0.6.1'
__version__ = '0.6.2'
def version():
return f'HalfAPI version:{__version__}'

View File

@ -1,6 +1,6 @@
#!/usr/bin/env python3
"""
cli/config.py Contains the .halfapi/config and HALFAPI_CONF_DIR/project_name templates
cli/config.py Contains the .halfapi/config
Defines the "halfapi config" command
"""
@ -17,7 +17,6 @@ router = {router}
CONF_STR="""
[project]
name = {project_name}
host = {host}
port = {port}
production = {production}

View File

@ -19,7 +19,6 @@ from .cli import cli
from ..logging import logger
TMPL_HALFAPI_ETC = """[project]
name = {project}
host = 127.0.0.1
port = 8000
secret = /path/to/secret_file
@ -27,17 +26,7 @@ production = False
base_dir = {base_dir}
"""
def format_halfapi_etc(project, path):
"""
Returns the formatted template for /etc/half_api/PROJECT_NAME
"""
return TMPL_HALFAPI_ETC.format(
project=project,
base_dir=path
)
TMPL_HALFAPI_CONFIG = """[project]
name = {name}
halfapi_version = {halfapi_version}
[domain]
@ -66,9 +55,7 @@ def init(project):
with open(f'{project}/.halfapi/config', 'w') as conf_file:
conf_file.write(TMPL_HALFAPI_CONFIG.format(
name=project,
halfapi_version=__version__))
click.echo(f'Configure halfapi project in {CONF_DIR}/{project}')
click.echo(format_halfapi_etc(project, CONF_DIR))

View File

@ -9,18 +9,17 @@ import uvicorn
from .cli import cli
from .domain import list_api_routes
from ..conf import (PROJECT_NAME, HOST, PORT, SCHEMA,
PRODUCTION, LOGLEVEL, CONFIG)
from ..conf import CONFIG, SCHEMA
from ..logging import logger
from ..lib.schemas import schema_csv_dict
from ..half_domain import HalfDomain
@click.option('--host', default=HOST)
@click.option('--port', default=PORT)
@click.option('--host', default=CONFIG.get('host'))
@click.option('--port', default=CONFIG.get('port'))
@click.option('--reload', default=False)
@click.option('--secret', default=False)
@click.option('--production', default=True)
@click.option('--loglevel', default=LOGLEVEL)
@click.option('--secret', default=CONFIG.get('secret'))
@click.option('--production', default=CONFIG.get('secret'))
@click.option('--loglevel', default=CONFIG.get('loglevel'))
@click.option('--prefix', default='/')
@click.option('--check', default=True)
@click.option('--dryrun', default=False, is_flag=True)
@ -36,21 +35,13 @@ def run(host, port, reload, secret, production, loglevel, prefix, check, dryrun,
host, port, reload, secret, production, loglevel, prefix, schema
)
if not host:
host = HOST
if not port:
port = PORT
port = int(port)
if PRODUCTION and reload:
if production and reload:
reload = False
raise Exception('Can\'t use live code reload in production')
log_level = LOGLEVEL or 'info'
click.echo(f'Launching application {PROJECT_NAME}')
click.echo(f'Launching application')
if secret:
CONFIG['secret'] = secret
@ -70,7 +61,8 @@ def run(host, port, reload, secret, production, loglevel, prefix, check, dryrun,
# And activate the desired one, mounted without prefix
CONFIG['domain'][domain] = {
'name': domain,
'prefix': False
'prefix': False,
'enabled': True
}
# list_api_routes()
@ -78,7 +70,7 @@ def run(host, port, reload, secret, production, loglevel, prefix, check, dryrun,
click.echo(f'uvicorn.run("halfapi.app:application"\n' \
f'host: {host}\n' \
f'port: {port}\n' \
f'log_level: {log_level}\n' \
f'log_level: {loglevel}\n' \
f'reload: {reload}\n'
)
@ -88,5 +80,5 @@ def run(host, port, reload, secret, production, loglevel, prefix, check, dryrun,
uvicorn.run('halfapi.app:application',
host=host,
port=int(port),
log_level=log_level,
log_level=loglevel,
reload=reload)

View File

@ -11,7 +11,7 @@ It defines the following globals :
- PROJECT_NAME (str) - HALFAPI_PROJECT_NAME
- PRODUCTION (bool) - HALFAPI_PRODUCTION
- LOGLEVEL (string) - HALFAPI_LOGLEVEL
- LOGLEVEL (str) - HALFAPI_LOGLEVEL
- BASE_DIR (str) - HALFAPI_BASE_DIR
- HOST (str) - HALFAPI_HOST
- PORT (int) - HALFAPI_PORT
@ -25,7 +25,6 @@ It reads the following ressource :
It follows the following format :
[project]
name = PROJECT_NAME
halfapi_version = HALFAPI_VERSION
[domain.domain_name]
@ -50,8 +49,6 @@ import toml
from .lib.domain import d_domains
from .logging import logger
CONFIG = {}
PRODUCTION = True
LOGLEVEL = 'info'
CONF_FILE = os.environ.get('HALFAPI_CONF_FILE', '.halfapi/config')
@ -66,7 +63,19 @@ HALFAPI_ETC_FILE=os.path.join(
HALFAPI_DOT_FILE=os.path.join(
os.getcwd(), '.halfapi', 'config')
HALFAPI_CONFIG_FILES = [ CONF_FILE, HALFAPI_DOT_FILE ]
HALFAPI_CONFIG_FILES = []
try:
with open(HALFAPI_ETC_FILE, 'r'):
HALFAPI_CONFIG_FILES.append(HALFAPI_ETC_FILE)
except FileNotFoundError:
logger.error('Cannot find a configuration file under %s', HALFAPI_DOT_FILE)
try:
with open(HALFAPI_DOT_FILE, 'r'):
HALFAPI_CONFIG_FILES.append(HALFAPI_DOT_FILE)
except FileNotFoundError:
logger.error('Cannot find a configuration file under %s', HALFAPI_DOT_FILE)
def conf_files():
return [
@ -90,30 +99,33 @@ def read_config():
"""
The highest index in "filenames" are the highest priorty
"""
return toml.load(HALFAPI_CONFIG_FILES)
d_res = {}
CONFIG = {}
logger.info('Reading config files %s', HALFAPI_CONFIG_FILES)
for CONF_FILE in HALFAPI_CONFIG_FILES:
d_res.update( toml.load(HALFAPI_CONFIG_FILES) )
PROJECT_NAME = CONFIG.get('project', {}).get(
'name',
environ.get('HALFAPI_PROJECT_NAME', os.path.basename(os.getcwd())))
logger.info('Reading config files (result) %s', d_res)
return { **d_res.get('project', {}), 'domain': d_res.get('domain', {}) }
CONFIG = read_config()
PROJECT_NAME = CONFIG.get('project_name',
environ.get('HALFAPI_PROJECT_NAME', os.getcwd().split('/')[-1]))
if len(CONFIG.get('domain', {}).keys()) == 0:
logger.info('No domains')
# logger.info('Running without domains: %s', d_domains(config) or 'empty domain dictionary')
# Bind
HOST = CONFIG.get('project', {}).get(
'host',
HOST = CONFIG.get('host',
environ.get('HALFAPI_HOST', '127.0.0.1'))
PORT = int(CONFIG.get('project', {}).get(
PORT = int(CONFIG.get(
'port',
environ.get('HALFAPI_PORT', '3000')))
# Secret
SECRET = CONFIG.get('project', {}).get(
SECRET = CONFIG.get(
'secret',
environ.get('HALFAPI_SECRET'))
@ -129,24 +141,21 @@ try:
except FileNotFoundError as exc:
logger.info('Running without secret file: %s', SECRET or 'no file specified')
PRODUCTION = bool(CONFIG.get('project', {}).get(
PRODUCTION = bool(CONFIG.get(
'production',
environ.get('HALFAPI_PROD', True)))
LOGLEVEL = CONFIG.get('project', {}).get(
LOGLEVEL = CONFIG.get(
'loglevel',
environ.get('HALFAPI_LOGLEVEL', 'info')).lower()
BASE_DIR = CONFIG.get('project', {}).get(
BASE_DIR = CONFIG.get(
'base_dir',
environ.get('HALFAPI_BASE_DIR', '.'))
CONFIG = {
'project_name': PROJECT_NAME,
'production': PRODUCTION,
'secret': SECRET,
'host': HOST,
'port': PORT,
'dryrun': DRYRUN,
'domain': {}
}
CONFIG['project_name'] = PROJECT_NAME
CONFIG['production'] = PRODUCTION
CONFIG['secret'] = SECRET
CONFIG['host'] = HOST
CONFIG['port'] = PORT
CONFIG['dryrun'] = DRYRUN

View File

@ -44,7 +44,7 @@ class HalfDomain(Starlette):
logger.info('HalfDomain creation %s %s', domain, config)
super().__init__(
routes=gen_domain_routes(self.m_router),
routes=gen_domain_routes(self.m_domain),
middleware=[
(DomainMiddleware,
{
@ -64,3 +64,153 @@ class HalfDomain(Starlette):
return getattr(m_acl, 'ACLS')
except AttributeError:
raise Exception(f'Missing acl.ACLS constant in {domain} module')
@staticmethod
def acls_route(domain):
d_res = {}
m_acl = importlib.import_module(f'{domain}.acl')
for acl_name, doc, order in HalfDomain.acls(domain):
fct = getattr(m_acl, acl_name)
d_res[acl_name] = {
'callable': fct,
'docs': doc,
'result': None
}
return d_res
# def schema(self):
def gen_routes(self):
"""
Yields the Route objects for a domain
Parameters:
m_domains: ModuleType
Returns:
Generator(HalfRoute)
"""
"""
yield HalfRoute('/',
JSONRoute(self.schema(self.m_domain)),
[{'acl': acl.public}],
'GET'
)
"""
for path, method, m_router, fct, params in self.gen_router_routes(self.m_domain, []):
yield HalfRoute(f'/{path}', fct, params, method)
def gen_router_routes(self, path: List[str]) -> \
Iterator[Tuple[str, str, ModuleType, Coroutine, List]]:
"""
Recursive generator that parses a router (or a subrouter)
and yields from gen_routes
Parameters:
- m_router (ModuleType): The currently treated router module
- path (List[str]): The current path stack
Yields:
(str, str, ModuleType, Coroutine, List): A tuple containing the path, verb,
router module, function reference and parameters of the route.
Function and parameters are yielded from then gen_routes function,
that decorates the endpoint function.
"""
for subpath, params in read_router(m_router).items():
path.append(subpath)
for verb in VERBS:
if verb not in params:
continue
yield ('/'.join(filter(lambda x: len(x) > 0, path)),
verb,
m_router,
*gen_routes(m_router, verb, path, params[verb])
)
for subroute in params.get('SUBROUTES', []):
#logger.debug('Processing subroute **%s** - %s', subroute, m_router.__name__)
param_match = re.fullmatch('^([A-Z_]+)_([a-z]+)$', subroute)
if param_match is not None:
try:
path.append('{{{}:{}}}'.format(
param_match.groups()[0].lower(),
param_match.groups()[1]))
except AssertionError as exc:
raise UnknownPathParameterType(subroute) from exc
else:
path.append(subroute)
try:
yield from gen_router_routes(
importlib.import_module(f'.{subroute}', m_router.__name__),
path)
except ImportError as exc:
logger.error('Failed to import subroute **{%s}**', subroute)
raise exc
path.pop()
path.pop()
def read_router(m_router: ModuleType) -> Dict:
"""
Reads a module and returns a router dict
If the module has a "ROUTES" constant, it just returns this constant,
Else, if the module has an "ACLS" constant, it builds the accurate dict
TODO: May be another thing, may be not a part of halfAPI
"""
m_path = None
try:
if not hasattr(m_router, 'ROUTES'):
routes = {'':{}}
acls = getattr(m_router, 'ACLS') if hasattr(m_router, 'ACLS') else None
if acls is not None:
for method in acls.keys():
if method not in VERBS:
raise Exception(
'This method is not handled: {}'.format(method))
routes[''][method] = []
routes[''][method] = acls[method].copy()
routes['']['SUBROUTES'] = []
if hasattr(m_router, '__path__'):
""" Module is a package
"""
m_path = getattr(m_router, '__path__')
if isinstance(m_path, list) and len(m_path) == 1:
routes['']['SUBROUTES'] = [
elt.name
for elt in os.scandir(m_path[0])
if elt.is_dir()
]
else:
routes = getattr(m_router, 'ROUTES')
try:
ROUTER_SCHEMA.validate(routes)
except SchemaError as exc:
logger.error(routes)
raise exc
return routes
except ImportError as exc:
# TODO: Proper exception handling
raise exc
except FileNotFoundError as exc:
# TODO: Proper exception handling
logger.error(m_path)
raise exc

View File

@ -18,7 +18,7 @@ class HalfRoute(Route):
""" HalfRoute
"""
def __init__(self, path, fct, params, method):
logger.info('HalfRoute creation: %s', params)
logger.info('HalfRoute creation: %s %s %s %s', path, fct, params, method)
super().__init__(
path,
HalfRoute.acl_decorator(

View File

@ -37,7 +37,7 @@ from .lib.responses import (ORJSONResponse, UnauthorizedResponse,
NotFoundResponse, InternalServerErrorResponse, NotImplementedResponse,
ServiceUnavailableResponse)
from .lib.domain import domain_schema_dict, NoDomainsException, domain_schema
from .lib.routes import gen_domain_routes, gen_schema_routes, JSONRoute
from .lib.routes import gen_schema_routes, JSONRoute
from .lib.schemas import schema_json, get_acls
from .logging import logger, config_logging
from .half_domain import HalfDomain
@ -49,14 +49,13 @@ class HalfAPI:
def __init__(self, config,
d_routes=None):
config_logging(logging.DEBUG)
self.config = config
SECRET = config.get('secret')
PRODUCTION = config.get('production', True)
CONFIG = config.get('config', {})
DRYRUN = config.get('dryrun', False)
SECRET = self.config.get('secret')
PRODUCTION = self.config.get('production', True)
DRYRUN = self.config.get('dryrun', False)
self.PRODUCTION = PRODUCTION
self.CONFIG = config
self.SECRET = SECRET
self.__application = None
@ -71,8 +70,11 @@ class HalfAPI:
Mount('/halfapi', routes=list(self.halfapi_routes()))
)
logger.info('Config: %s', config)
logger.info('Active domains: %s', config.get('domain', {}))
logger.info('Config: %s', self.config)
logger.info('Active domains: %s',
filter(
lambda n: n.get('enabled', False),
self.config.get('domain', {})))
if d_routes:
# Mount the routes from the d_routes argument - domain-less mode
@ -80,10 +82,6 @@ class HalfAPI:
for route in gen_schema_routes(d_routes):
routes.append(route)
else:
"""
for route in gen_domain_routes(m_domain_router):
routes.append(route)
"""
pass
startup_fcts = []
@ -106,21 +104,25 @@ class HalfAPI:
on_startup=startup_fcts
)
for key, domain in config.get('domain', {}).items():
for key, domain in self.config.get('domain', {}).items():
dom_name = domain.get('name', key)
if domain.get('prefix', False):
path = f'/{dom_name}'
else:
path = '/'
if not domain.get('enabled', False):
continue
if not domain.get('prefix', False):
if len(self.config.get('domain').keys()) > 1:
raise Exception('Cannot use multiple domains and set prefix to false')
path = '/'
else:
path = f'/{dom_name}'
logger.debug('Mounting domain %s on %s', domain.get('name'), path)
self.__application.mount(path,
Mount('/',
HalfDomain(
self.application,
domain.get('name', key),
domain.get('router'),
config=domain.get('config', {})
)
HalfDomain(
self.application,
domain.get('name', key),
domain.get('router'),
config=domain.get('config', {})
)
)
@ -132,12 +134,10 @@ class HalfAPI:
)
"""
if SECRET:
self.SECRET = SECRET
self.__application.add_middleware(
AuthenticationMiddleware,
backend=JWTAuthenticationBackend(secret_key=SECRET)
)
self.__application.add_middleware(
AuthenticationMiddleware,
backend=JWTAuthenticationBackend()
)
if not PRODUCTION:
self.__application.add_middleware(
@ -148,6 +148,7 @@ class HalfAPI:
)
@property
def version(self):
return __version__
@ -168,7 +169,7 @@ class HalfAPI:
yield Route('/whoami', get_user)
yield Route('/schema', schema_json)
yield Route('/acls', get_acls)
yield Route('/acls', self.acls_route())
yield Route('/version', self.version_async)
""" Halfapi debug routes definition
"""
@ -209,3 +210,29 @@ class HalfAPI:
import sys
time.sleep(1)
sys.exit(0)
def acls_route(self):
res = {
domain: HalfDomain.acls_route(domain)
for domain, domain_conf in self.config.get('domain', {}).items()
if domain_conf.get('enabled', False)
}
async def wrapped(req, *args, **kwargs):
for domain, domain_acls in res.items():
for acl_name, d_acl in domain_acls.items():
fct = d_acl['callable']
if not callable(fct):
raise Exception(
'No callable function in acl definition %s',
acl_name)
fct_result = fct(req, *args, **kwargs)
if callable(fct_result):
fct_result = fct()(req, *args, **kwargs)
d_acl['result'] = fct_result
return ORJSONResponse(res)
return wrapped

View File

@ -23,14 +23,16 @@ from starlette.exceptions import HTTPException
from .user import CheckUser, JWTUser, Nobody
from ..logging import logger
from ..conf import CONFIG
SECRET=None
try:
from ..conf import SECRET
except ImportError as exc:
logger.error('Could not import SECRET variable from conf module,'\
' using HALFAPI_SECRET environment variable')
try:
with open(CONFIG.get('secret', ''), 'r') as secret_file:
SECRET = secret_file.read().strip()
except FileNotFoundError:
logger.error('Could not import SECRET variable from conf module,'\
' using HALFAPI_SECRET environment variable')
class JWTAuthenticationBackend(AuthenticationBackend):
def __init__(self, secret_key: str = SECRET,

View File

@ -98,6 +98,8 @@ class ORJSONResponse(JSONResponse):
JWTUser, Nobody
}
if callable(typ):
return typ.__name__
if type(typ) in str_types:
return str(typ)
if type(typ) in list_types:

View File

@ -9,57 +9,4 @@ from schema import SchemaError
from .constants import VERBS, ROUTER_SCHEMA
from ..logging import logger
def read_router(m_router: ModuleType) -> Dict:
"""
Reads a module and returns a router dict
If the module has a "ROUTES" constant, it just returns this constant,
Else, if the module has an "ACLS" constant, it builds the accurate dict
TODO: May be another thing, may be not a part of halfAPI
"""
m_path = None
try:
if not hasattr(m_router, 'ROUTES'):
routes = {'':{}}
acls = getattr(m_router, 'ACLS') if hasattr(m_router, 'ACLS') else None
if acls is not None:
for method in acls.keys():
if method not in VERBS:
raise Exception(
'This method is not handled: {}'.format(method))
routes[''][method] = []
routes[''][method] = acls[method].copy()
routes['']['SUBROUTES'] = []
if hasattr(m_router, '__path__'):
""" Module is a package
"""
m_path = getattr(m_router, '__path__')
if isinstance(m_path, list) and len(m_path) == 1:
routes['']['SUBROUTES'] = [
elt.name
for elt in os.scandir(m_path[0])
if elt.is_dir()
]
else:
routes = getattr(m_router, 'ROUTES')
try:
ROUTER_SCHEMA.validate(routes)
except SchemaError as exc:
logger.error(routes)
raise exc
return routes
except ImportError as exc:
# TODO: Proper exception handling
raise exc
except FileNotFoundError as exc:
# TODO: Proper exception handling
logger.error(m_path)
raise exc

View File

@ -19,7 +19,7 @@ from types import ModuleType, FunctionType
import yaml
from .domain import gen_router_routes, domain_acls, route_decorator, domain_schema_dict
from .domain import gen_router_routes, domain_acls, route_decorator, domain_schema
from .responses import ORJSONResponse
from .acl import args_check
from ..half_route import HalfRoute
@ -43,6 +43,7 @@ def JSONRoute(data: Any) -> Coroutine:
async function
"""
async def wrapped(request, *args, **kwargs):
logger.debug('JSONRoute data: %s', data)
return ORJSONResponse(data)
return wrapped
@ -58,11 +59,12 @@ def gen_domain_routes(m_domain: ModuleType):
Returns:
Generator(HalfRoute)
"""
yield HalfRoute(f'/',
JSONRoute(domain_schema_dict(m_domain)),
yield HalfRoute('/',
JSONRoute(domain_schema(m_domain)),
[{'acl': acl.public}],
'GET'
)
for path, method, m_router, fct, params in gen_router_routes(m_domain, []):
yield HalfRoute(f'/{path}', fct, params, method)

View File

@ -61,9 +61,10 @@ class TestDomain(TestCase):
halfapi = HalfAPI({
'domain': {
'dummy_domain': {
'name': 'dummy_domain',
'router': 'dummy_domain.routers',
'name': self.DOMAIN,
'router': self.ROUTERS,
'prefix': False,
'enabled': True,
'config': {
'test': True
}
@ -76,14 +77,20 @@ class TestDomain(TestCase):
assert r.status_code == 200
d_r = r.json()
assert isinstance(d_r, dict)
assert 'openapi' in d_r
assert 'info' in d_r
assert 'paths' in d_r
assert 'domain' in d_r
r = client.get('/halfapi/acls')
assert r.status_code == 200
d_r = r.json()
assert isinstance(d_r, dict)
ACLS = HalfDomain.acls(self.DOMAIN)
assert len(ACLS) == len(d_r.keys())
for acl_name in ACLS:
assert acl_name in d_r.keys()
assert self.DOMAIN in d_r.keys()
ACLS = HalfDomain.acls(self.DOMAIN)
assert len(ACLS) == len(d_r[self.DOMAIN])
for acl_name in ACLS:
assert acl_name[0] in d_r[self.DOMAIN]

View File

@ -36,6 +36,7 @@ def test_options(runner):
assert r.exit_code == 0
@pytest.mark.skip
def test_init_project_fail(runner):
# Missing argument (project)
testproject = 'testproject'
@ -59,6 +60,7 @@ def test_init_project_fail(runner):
r = runner.invoke(Cli, ['init', testproject])
assert r.exit_code == 1
@pytest.mark.skip
def test_init_project(runner):
"""
"""

View File

@ -20,7 +20,7 @@ from starlette.testclient import TestClient
from halfapi import __version__
from halfapi.halfapi import HalfAPI
from halfapi.cli.cli import cli
from halfapi.cli.init import init, format_halfapi_etc
from halfapi.cli.init import init
from halfapi.cli.domain import domain, create_domain
from halfapi.lib.responses import ORJSONResponse
from halfapi.lib.jwt_middleware import JWTAuthenticationBackend
@ -39,7 +39,9 @@ from halfapi.lib.jwt_middleware import (
def dummy_domain():
yield {
'name': 'dummy_domain',
'router': 'dummy_domain.routers',
'router': '.routers',
'enabled': True,
'prefix': False,
'config': {
'test': True
}
@ -178,10 +180,12 @@ def project_runner(runner, halfapicli, tree):
with open(SECRET_PATH, 'w') as f:
f.write(str(uuid1()))
"""
with open(os.path.join('.halfapi', PROJNAME), 'w') as halfapi_etc:
PROJ_CONFIG = re.sub('secret = .*', f'secret = {SECRET_PATH}',
format_halfapi_etc(PROJNAME, os.getcwd()))
halfapi_etc.write(PROJ_CONFIG)
"""
###
@ -276,12 +280,14 @@ def application_debug(project_runner):
'domain': {
'domain': {
'name': 'test_domain',
'router': 'test_domain.routers'
'router': 'test_domain.routers',
'enabled': True,
'prefix': False,
'config':{
'test': True
}
}
},
'config':{
'domain_config': {'test_domain': {'test': True}}
}
})
assert isinstance(halfAPI, HalfAPI)
@ -294,7 +300,7 @@ def application_domain(dummy_domain):
'secret':'turlututu',
'production':True,
'domain': {
'domain': {
'dummy_domain': {
**dummy_domain,
'config': {
'test': True

View File

@ -2,12 +2,12 @@ from halfapi.lib import acl
from halfapi.lib.acl import public
from random import randint
def random():
def random(*args):
""" Random access ACL
"""
return randint(0,1) == 1
def denied():
def denied(*args):
""" Access denied
"""
return False
@ -17,5 +17,3 @@ ACLS = (
('random', random.__doc__, 10),
('denied', denied.__doc__, 0)
)

View File

@ -9,7 +9,9 @@ class TestConf(TestCase):
'domain': {
'dummy_domain': {
'name': 'dummy_domain',
'router': '.routers'
'router': '.routers',
'enabled': True,
'prefix': False,
}
}
}
@ -40,18 +42,11 @@ class TestConf(TestCase):
from halfapi.conf import (
CONFIG,
SCHEMA,
SECRET,
PROJECT_NAME,
HOST,
PORT,
CONF_DIR
)
assert isinstance(CONFIG, dict)
assert isinstance(CONFIG.get('project_name'), str)
assert isinstance(SCHEMA, dict)
assert isinstance(SECRET, str)
assert isinstance(PROJECT_NAME, str)
assert isinstance(HOST, str)
assert isinstance(PORT, int)
assert int(str(int(PORT))) == PORT
assert isinstance(CONF_DIR, str)
assert isinstance(CONFIG.get('secret'), str)
assert isinstance(CONFIG.get('host'), str)
assert isinstance(CONFIG.get('port'), int)

View File

@ -25,6 +25,7 @@ def test_routes(application_debug):
r = c.get('/halfapi/error/500')
assert r.status_code == 500
r = c.get('/')
print(r.content)
d_r = r.json()
assert isinstance(d_r, dict)
# assert API_SCHEMA.validate(d_r)

View File

@ -12,6 +12,9 @@ from halfapi.lib.domain import gen_router_routes
def test_get_config_route(dummy_project, application_domain):
c = TestClient(application_domain)
r = c.get('/')
assert r.status_code == 200
pprint(r.json())
r = c.get('/config')
assert r.status_code == 200
pprint(r.json())