[wip][testfail] update config monodomain - schema gives acls

This commit is contained in:
Maxime Alves LIRMM 2021-11-30 18:31:40 +01:00
parent ec26438340
commit 7e1cc21b8c
25 changed files with 315 additions and 156 deletions

View File

@ -59,6 +59,12 @@ that is available in the python path.
Run the project by using the `halfapi run` command.
You can try the dummy_domain with the following command.
```
python -m halfapi routes --export --noheader dummy_domain.routers | python -m halfapi run -
```
## API Testing

View File

@ -1,5 +1,5 @@
#!/usr/bin/env python3
__version__ = '0.5.13'
__version__ = '0.6.0'
def version():
return f'HalfAPI version:{__version__}'

View File

@ -5,4 +5,5 @@ from .logging import logger
logger.info('CONFIG: %s', CONFIG)
logger.info('SCHEMA: %s', SCHEMA)
application = HalfAPI(CONFIG, SCHEMA or None).application
application = HalfAPI(
CONFIG, SCHEMA or None).application

View File

@ -9,10 +9,6 @@ import click
from .cli import cli
from ..conf import CONFIG
DOMAINS_STR='\n'.join(
[ ' = '.join((key, val.__name__)) for key, val in CONFIG['domains'].items() ]
)
CONF_STR="""
[project]
name = {project_name}
@ -20,8 +16,11 @@ host = {host}
port = {port}
production = {production}
[domains]
""".format(**CONFIG) + DOMAINS_STR
[domain]
name = {domain_name}
router = {router}
"""
@cli.command()
def config():

View File

@ -28,10 +28,11 @@ def create_domain(domain_name: str, module_path: str):
# logger.warning('Domain **%s** is already in project', domain_name)
# sys.exit(1)
if not config.has_section('domains'):
config.add_section('domains')
if not config.has_section('domain'):
config.add_section('domain')
config.set('domains', domain_name, module_path)
config.set('domain', 'name', domain_name)
config.set('domain', 'router', module_path)
write_config()

View File

@ -40,7 +40,7 @@ TMPL_HALFAPI_CONFIG = """[project]
name = {name}
halfapi_version = {halfapi_version}
[domains]
[domain]
"""
@click.argument('project')

View File

@ -13,18 +13,19 @@ from .cli import cli
from ..logging import logger
from ..lib.domain import gen_router_routes
from ..lib.domain import domain_schema_dict
from ..lib.constants import DOMAIN_SCHEMA
from ..lib.routes import api_routes
from ..lib.schemas import schema_to_csv
# from ..lib.routes import api_routes
from ..lib.schemas import schema_to_csv # get_api_routes
@click.argument('module', required=True)
@click.option('--export', default=False, is_flag=True)
@click.option('--validate', default=False, is_flag=True)
@click.option('--check', default=False, is_flag=True)
@click.option('--noheader', default=False, is_flag=True)
@click.option('--schema', default=False, is_flag=True)
@cli.command()
def routes(module, export=False, validate=False, check=False, noheader=False):
def routes(module, export=False, validate=False, check=False, noheader=False, schema=False):
"""
The "halfapi routes" command
"""
@ -37,9 +38,14 @@ def routes(module, export=False, validate=False, check=False, noheader=False):
click.echo(schema_to_csv(module, header=not noheader))
if validate:
routes_d = api_routes(mod)
routes_d = domain_schema_dict(mod)
try:
DOMAIN_SCHEMA.validate(routes_d[0])
except Exception as exc:
pprint(routes_d[0])
raise exc from exc
"""
if schema:
click.echo(get_api_routes(uu
"""

View File

@ -10,7 +10,7 @@ import uvicorn
from .cli import cli
from .domain import list_api_routes
from ..conf import (PROJECT_NAME, HOST, PORT, SCHEMA,
PRODUCTION, LOGLEVEL, DOMAINSDICT, CONFIG)
PRODUCTION, LOGLEVEL, DOMAINSDICT, CONFIG, DOMAIN, ROUTER)
from ..logging import logger
from ..lib.schemas import schema_csv_dict
@ -20,11 +20,13 @@ from ..lib.schemas import schema_csv_dict
@click.option('--secret', default=False)
@click.option('--production', default=True)
@click.option('--loglevel', default=LOGLEVEL)
@click.option('--prefix', default='')
@click.option('--prefix', default='/')
@click.option('--check', default=True)
@click.argument('schema', type=click.File('r'), required=False)
@click.argument('router', required=False)
@click.argument('domain', required=False)
@cli.command()
def run(host, port, reload, secret, production, loglevel, prefix, check, schema):
def run(host, port, reload, secret, production, loglevel, prefix, check, schema, router, domain):
"""
The "halfapi run" command
"""
@ -50,9 +52,12 @@ def run(host, port, reload, secret, production, loglevel, prefix, check, schema)
sys.path.insert(0, os.getcwd())
CONFIG.get('domain')['name'] = domain
CONFIG.get('domain')['router'] = router
if schema:
# Populate the SCHEMA global with the data from the given file
for key, val in schema_csv_dict(schema).items():
for key, val in schema_csv_dict(schema, prefix).items():
SCHEMA[key] = val
# list_api_routes()

View File

@ -53,6 +53,9 @@ HOST = '127.0.0.1'
PORT = '3000'
SECRET = ''
CONF_FILE = os.environ.get('HALFAPI_CONF_FILE', '.halfapi/config')
DOMAIN = None
ROUTER = None
SCHEMA = {}
config = ConfigParser(allow_no_value=True)
@ -140,7 +143,10 @@ CONFIG = {
'secret': SECRET,
'host': HOST,
'port': PORT,
'domains': DOMAINS,
'domain': {
'name': None,
'router': None
},
'domain_config': {}
}

View File

@ -29,61 +29,61 @@ from timing_asgi.integrations import StarletteScopeToName
# module libraries
from .lib.constants import API_SCHEMA_DICT
from .lib.domain_middleware import DomainMiddleware
from .lib.timing import HTimingClient
from .lib.domain import NoDomainsException
from halfapi.lib.jwt_middleware import JWTAuthenticationBackend
from halfapi.lib.responses import (ORJSONResponse, UnauthorizedResponse,
from .lib.jwt_middleware import JWTAuthenticationBackend
from .lib.responses import (ORJSONResponse, UnauthorizedResponse,
NotFoundResponse, InternalServerErrorResponse, NotImplementedResponse,
ServiceUnavailableResponse)
from halfapi.lib.routes import gen_domain_routes, gen_schema_routes, JSONRoute
from halfapi.lib.schemas import schema_json, get_acls
from halfapi.logging import logger, config_logging
from .lib.domain import domain_schema_dict
from .lib.routes import gen_domain_routes, gen_schema_routes, JSONRoute
from .lib.schemas import schema_json, get_acls
from .logging import logger, config_logging
from halfapi import __version__
class HalfAPI:
def __init__(self, config, routes_dict=None):
def __init__(self, config,
routes_dict=None):
config_logging(logging.DEBUG)
SECRET = config.get('secret')
PRODUCTION = config.get('production', True)
DOMAINS = config.get('domains', {})
CONFIG = config.get('config', {
'domains': DOMAINS
})
CONFIG = config.get('config', {})
if not (DOMAINS or routes_dict):
domain = config.get('domain')['name']
router = config.get('domain')['router']
if not (domain and router):
raise NoDomainsException()
self.PRODUCTION = PRODUCTION
self.CONFIG = CONFIG
self.DOMAINS = DOMAINS
self.SECRET = SECRET
self.__application = None
""" The base route contains the route schema
"""
if routes_dict:
any_route = routes_dict[
list(routes_dict.keys())[0]
]
domain, router = any_route[
list(any_route.keys())[0]
]['module'].__name__.split('.')[0:2]
if domain and router:
m_domain = importlib.import_module(f'{domain}')
m_domain_router = importlib.import_module(f'{domain}.{router}')
m_domain_acl = importlib.import_module(f'{domain}.acl')
DOMAINS = {}
DOMAINS[domain] = importlib.import_module(f'{domain}.{router}')
self.schema = { **API_SCHEMA_DICT }
if DOMAINS:
self.api_routes = {}
self.schema['domain'] = {
'name': domain,
'version': getattr(m_domain, '__version__', ''),
'patch_release': getattr(m_domain, '__path_release__', ''),
'acls': tuple(getattr(m_domain_acl, 'ACLS', ()))
}
routes = [ Route('/', JSONRoute(self.api_routes)) ]
self.schema['paths'] = domain_schema_dict(m_domain_router)
routes = [ Route('/', JSONRoute(self.schema)) ]
""" HalfAPI routes (if not PRODUCTION, includes debug routes)
"""
@ -96,20 +96,9 @@ class HalfAPI:
logger.info('Domain-less mode : the given schema defines the activated routes')
for route in gen_schema_routes(routes_dict):
routes.append(route)
elif DOMAINS:
# Mount the domain routes
logger.info('Domains mode : the list of domains is retrieves from the configuration file')
for domain, m_domain in DOMAINS.items():
if domain not in self.api_routes.keys():
raise Exception(f'The domain does not have a schema: {domain}')
routes.append(
Route(f'/{domain}', JSONRoute(self.api_routes[domain]))
)
routes.append(
Mount(f'/{domain}', routes=gen_domain_routes(m_domain))
)
else:
for route in gen_domain_routes(m_domain_router):
routes.append(route)
self.__application = Starlette(
debug=not PRODUCTION,
@ -125,6 +114,7 @@ class HalfAPI:
self.__application.add_middleware(
DomainMiddleware,
domain=domain,
config=CONFIG
)
@ -192,3 +182,7 @@ class HalfAPI:
raise Exception('Test exception')
yield Route('/exception', exception)
@staticmethod
def api_schema(domain):
pass

View File

@ -1,27 +1,50 @@
import re
from schema import Schema, Optional
from .. import __version__
VERBS = ('GET', 'POST', 'PUT', 'PATCH', 'DELETE')
ACLS_SCHEMA = Schema([{
'acl': lambda fct: isinstance(fct(), bool),
'acl': str,
Optional('args'): {
Optional('required'): { str },
Optional('optional'): { str }
Optional('required'): [ str ],
Optional('optional'): [ str ]
},
Optional('out'): { str }
Optional('out'): [ str ]
}])
is_callable_dotted_notation = lambda x: re.match(
r'^(([a-zA-Z_])+\.?)*:[a-zA-Z_]+$', 'ab_c.TEST:get')
ROUTE_SCHEMA = Schema({
str: {
'docs': lambda n: True, # Should validate an openAPI spec
'acls': ACLS_SCHEMA
str: { # path
str: { # method
'callable': is_callable_dotted_notation,
'docs': lambda n: True, # Should validate an openAPI spec
'acls': ACLS_SCHEMA
}
}
})
DOMAIN_SCHEMA = Schema({
str: ROUTE_SCHEMA
'name': str,
Optional('version'): str,
Optional('patch_release'): str,
Optional('acls'): [
[str, str, int]
]
})
API_SCHEMA_DICT = {
'openapi': '3.0.0',
'info': {
'title': 'HalfAPI',
'version': __version__
},
}
API_SCHEMA = Schema({
str: DOMAIN_SCHEMA # key: domain name, value: result of lib.routes.api_routes(domain_module)
**API_SCHEMA_DICT,
'domain': DOMAIN_SCHEMA,
'paths': ROUTE_SCHEMA
})

View File

@ -12,6 +12,7 @@ from functools import wraps
from types import ModuleType, FunctionType
from typing import Coroutine, Generator
from typing import Dict, List, Tuple, Iterator
import yaml
from starlette.exceptions import HTTPException
@ -229,6 +230,60 @@ def gen_router_routes(m_router: ModuleType, path: List[str]) -> \
path.pop()
def domain_schema_dict(m_router: ModuleType) -> Dict:
""" gen_router_routes return values as a dict
Parameters:
m_router (ModuleType): The domain routers' module
Returns:
Dict: Schema of dict is halfapi.lib.constants.DOMAIN_SCHEMA
"""
d_res = {}
for path, verb, m_router, fct, parameters in gen_router_routes(m_router, []):
if path not in d_res:
d_res[path] = {}
if verb not in d_res[path]:
d_res[path][verb] = {}
d_res[path][verb]['callable'] = f'{m_router.__name__}:{fct.__name__}'
d_res[path][verb]['docs'] = yaml.safe_load(fct.__doc__)
d_res[path][verb]['acls'] = list(map(lambda elt: { **elt, 'acl': elt['acl'].__name__ },
parameters))
return d_res
def domain_schema_list(m_router: ModuleType) -> List:
""" Schema as list, one row by route/acl
Parameters:
m_router (ModuleType): The domain routers' module
Returns:
List[Tuple]: (path, verb, callable, doc, acls)
"""
res = []
for path, verb, m_router, fct, parameters in gen_router_routes(m_router, []):
for params in parameters:
res.append((
path,
verb,
f'{m_router.__name__}:{fct.__name__}',
params.get('acl').__name__,
params.get('args', {}).get('required', []),
params.get('args', {}).get('optional', []),
params.get('out', [])
))
return res
def d_domains(config) -> Dict[str, ModuleType]:
"""
Parameters:

View File

@ -13,15 +13,15 @@ class DomainMiddleware(BaseHTTPMiddleware):
"""
DomainMiddleware adds the api routes and acls to the following scope keys :
- domains
- api
- acl
"""
def __init__(self, app, config):
def __init__(self, app, domain, config):
logger.info('DomainMiddleware %s %s', domain, config)
super().__init__(app)
self.domain = domain
self.config = config
self.domains = {}
self.request = None
@ -31,14 +31,9 @@ class DomainMiddleware(BaseHTTPMiddleware):
Call of the route fonction (decorated or not)
"""
l_path = URL(scope=request.scope).path.split('/')
cur_domain = l_path[0]
if len(cur_domain) == 0 and len(l_path) > 1:
cur_domain = l_path[1]
request.scope['domain'] = cur_domain
request.scope['config'] = self.config['domain_config'][cur_domain] \
if cur_domain in self.config.get('domain_config', {}) else {}
request.scope['domain'] = self.domain
request.scope['config'] = self.config['domain_config'][self.domain] \
if self.domain in self.config.get('domain_config', {}) else {}
response = await call_next(request)
@ -56,6 +51,6 @@ class DomainMiddleware(BaseHTTPMiddleware):
response.headers['x-args-optional'] = \
','.join(request.scope['args']['optional'])
response.headers['x-domain'] = cur_domain
response.headers['x-domain'] = self.domain
return response

View File

@ -17,7 +17,7 @@ from types import ModuleType
from starlette.schemas import SchemaGenerator
from .. import __version__
from .domain import gen_router_routes
from .domain import gen_router_routes, domain_schema_list
from ..logging import logger
from .routes import gen_starlette_routes, api_routes, api_acls
from .responses import ORJSONResponse
@ -67,7 +67,7 @@ def schema_to_csv(module_name, header=True) -> str:
lines should be unique in the result string;
"""
# retrieve module
mod = importlib.import_module(module_name)
m_router = importlib.import_module(module_name)
lines = []
if header:
lines.append([
@ -79,35 +79,16 @@ def schema_to_csv(module_name, header=True) -> str:
'out'
])
for path, verb, m_router, fct, parameters in gen_router_routes(mod, []):
""" Call route generator (.lib.domain)
"""
for param in parameters:
""" Each parameters row represents rules for a specific ACL
"""
fields = (
f'/{path}',
verb,
f'{m_router.__name__}:{fct.__name__}',
param['acl'].__name__,
','.join((param.get('args', {}).get('required', set()))),
','.join((param.get('args', {}).get('optional', set()))),
','.join((param.get('out', set())))
)
if fields[0:4] in map(lambda elt: elt[0:4], lines):
raise Exception(
'Already defined acl for this route \
(path: {}, verb: {}, acl: {})'.format(
path,
verb,
param['acl'].__name__
)
)
lines.append(fields)
for line in domain_schema_list(m_router):
lines.append([
line[0],
line[1],
line[2],
line[3],
','.join(line[4]),
','.join(line[5]),
','.join(line[6])
])
return '\n'.join(
[ ';'.join(fields) for fields in lines ]
@ -115,7 +96,7 @@ def schema_to_csv(module_name, header=True) -> str:
def schema_csv_dict(csv: List[str]) -> Dict:
def schema_csv_dict(csv: List[str], prefix='/') -> Dict:
package = None
schema_d = {}
@ -125,8 +106,13 @@ def schema_csv_dict(csv: List[str]) -> Dict:
for line in csv:
if not line:
continue
path, verb, router, acl_fct_name, args_req, args_opt, out = line.strip().split(';')
logger.info('schema_csv_dict %s %s %s', path, args_req, args_opt)
path = f'{prefix}{path}'
if path not in schema_d:
schema_d[path] = {}

View File

@ -7,4 +7,4 @@ def test_config(cli_runner):
cp = ConfigParser()
cp.read_string(result.output)
assert cp.has_section('project')
assert cp.has_section('domains')
assert cp.has_section('domain')

View File

@ -197,7 +197,7 @@ def project_runner(runner, halfapicli, halfapi_conf_dir):
###
# add dummy domain
###
create_domain('tests', '.dummy_domain.routers')
create_domain('dummy_domain', '.routers')
###
yield halfapicli
@ -264,9 +264,10 @@ def dummy_project():
f'secret = {halfapi_secret}\n',
'port = 3050\n',
'loglevel = debug\n',
'[domains]\n',
f'{domain} = .routers',
f'[{domain}]',
'[domain]\n',
f'name = {domain}\n',
'router = routers\n',
f'[{domain}]\n',
'test = True'
])
@ -288,11 +289,11 @@ def application_debug(routers):
halfAPI = HalfAPI({
'secret':'turlututu',
'production':False,
'domains': {
'dummy_domain': routers
'domain': {
'name': 'dummy_domain',
'router': 'routers'
},
'config':{
'domains': {'dummy_domain':routers},
'domain_config': {'dummy_domain': {'test': True}}
}
})
@ -310,9 +311,11 @@ def application_domain(routers):
return HalfAPI({
'secret':'turlututu',
'production':True,
'domains':{'dummy_domain':routers},
'domain': {
'name': 'dummy_domain',
'router': 'routers'
},
'config':{
'domains': {'dummy_domain':routers},
'domain_config': {'dummy_domain': {'test': True}}
}
}).application

View File

@ -2,10 +2,20 @@ from halfapi.lib import acl
from halfapi.lib.acl import public
from random import randint
def random():
""" Random access ACL
"""
return randint(0,1) == 1
def denied():
""" Access denied
"""
return False
ACLS = (
('public', public.__doc__, 999),
('random', random.__doc__, 10),
('denied', denied.__doc__, 0)
)

View File

@ -9,7 +9,8 @@ def test_halfapi_dummy_domain():
with patch('starlette.applications.Starlette') as mock:
mock.return_value = MagicMock()
halfapi = HalfAPI({
'domains': {
'dummy_domain': '.routers'
'domain': {
'name': 'dummy_domain',
'router': 'routers'
}
})

View File

@ -82,7 +82,7 @@ class TestCli():
assert cp.has_option('project', 'name')
assert cp.get('project', 'name') == PROJNAME
assert cp.get('project', 'halfapi_version') == __version__
assert cp.has_section('domains')
assert cp.has_section('domain')
except AssertionError as exc:
subprocess.run(['tree', '-a', os.getcwd()])
raise exc

View File

@ -1,22 +1,23 @@
from halfapi.halfapi import HalfAPI
halfapi_arg = { 'domain': { 'name': 'dummy_domain', 'router': 'routers' } }
def test_conf_production_default():
halfapi = HalfAPI({
'domains': {'test': True}
**halfapi_arg
})
assert halfapi.PRODUCTION is True
def test_conf_production_true():
halfapi = HalfAPI({
**halfapi_arg,
'production': True,
'domains': {'test': True}
})
assert halfapi.PRODUCTION is True
def test_conf_production_false():
halfapi = HalfAPI({
**halfapi_arg,
'production': False,
'domains': {'test': True}
})
assert halfapi.PRODUCTION is False

54
tests/test_domain.py Normal file
View File

@ -0,0 +1,54 @@
import importlib
import functools
import os
import sys
import json
from unittest import TestCase
from click.testing import CliRunner
from halfapi.cli.cli import cli
from pprint import pprint
class TestDomain(TestCase):
DOMAIN = 'dummy_domain'
ROUTERS = 'routers'
@property
def router_module(self):
return '.'.join((self.DOMAIN, self.ROUTERS))
def setUp(self):
class_ = CliRunner
def invoke_wrapper(f):
"""Augment CliRunner.invoke to emit its output to stdout.
This enables pytest to show the output in its logs on test
failures.
"""
@functools.wraps(f)
def wrapper(*args, **kwargs):
echo = kwargs.pop('echo', False)
result = f(*args, **kwargs)
if echo is True:
sys.stdout.write(result.output)
return result
return wrapper
class_.invoke = invoke_wrapper(class_.invoke)
self.runner = class_()
def tearDown(self):
pass
def test_routes(self):
result = self.runner.invoke(cli, '--version')
self.assertEqual(result.exit_code, 0)
result = self.runner.invoke(cli, ['routes', '--export', self.router_module])
self.assertEqual(result.exit_code, 0)
print(result.stdout)
# result_d = json.loads(result.stdout)
# self.assertTrue()

View File

@ -4,6 +4,7 @@ import importlib
import subprocess
import time
import pytest
from pprint import pprint
from starlette.routing import Route
from starlette.testclient import TestClient
@ -11,8 +12,11 @@ from halfapi.lib.domain import gen_router_routes
def test_get_config_route(dummy_project, application_domain, routers):
c = TestClient(application_domain)
r = c.get('/dummy_domain/config')
r = c.get('/config')
assert r.status_code == 200
pprint(r.json())
assert 'test' in r.json()
def test_get_route(dummy_project, application_domain, routers):
c = TestClient(application_domain)
path = verb = params = None
@ -27,7 +31,7 @@ def test_get_route(dummy_project, application_domain, routers):
for route_def in []:#dummy_domain_routes:
path, verb = route_def[0], route_def[1]
route_path = '/dummy_domain/{}'.format(path)
route_path = '/{}'.format(path)
print(route_path)
try:
if verb.lower() == 'get':
@ -62,7 +66,7 @@ def test_get_route(dummy_project, application_domain, routers):
for route_def in dummy_domain_path_routes:
path, verb = route_def[0], route_def[1]
path = path.format(test=str(test_uuid))
route_path = f'/dummy_domain/{path}'
route_path = f'/{path}'
if verb.lower() == 'get':
r = c.get(f'{route_path}')
@ -73,14 +77,14 @@ def test_delete_route(dummy_project, application_domain, routers):
c = TestClient(application_domain)
from uuid import uuid4
arg = str(uuid4())
r = c.delete(f'/dummy_domain/abc/alphabet/{arg}')
r = c.delete(f'/abc/alphabet/{arg}')
assert r.status_code == 200
assert isinstance(r.json(), str)
def test_arguments_route(dummy_project, application_domain, routers):
c = TestClient(application_domain)
path = '/dummy_domain/arguments'
path = '/arguments'
r = c.get(path)
assert r.status_code == 400
r = c.get(path, params={'foo':True})
@ -90,7 +94,7 @@ def test_arguments_route(dummy_project, application_domain, routers):
assert r.status_code == 200
for key, val in arg.items():
assert r.json()[key] == str(val)
path = '/dummy_domain/async/arguments'
path = '/async/arguments'
r = c.get(path)
assert r.status_code == 400
r = c.get(path, params={'foo':True})

View File

@ -1,6 +1,7 @@
#!/usr/bin/env python3
import importlib
from halfapi.lib.domain import VERBS, gen_routes, gen_router_routes, MissingAclError
from halfapi.lib.domain import VERBS, gen_routes, gen_router_routes, \
MissingAclError, domain_schema_dict, domain_schema_list
from types import FunctionType
@ -36,3 +37,15 @@ def test_gen_routes():
assert isinstance(params, list)
assert len(TEST_uuid.ACLS['GET']) == len(params)
def test_domain_schema_dict():
from .dummy_domain import routers
d_res = domain_schema_dict(routers)
assert isinstance(d_res, dict)
def test_domain_schema_list():
from .dummy_domain import routers
res = domain_schema_list(routers)
assert isinstance(res, list)
assert len(res) > 0

View File

@ -1,5 +1,5 @@
from starlette.routing import Route
from halfapi.lib.routes import gen_starlette_routes, api_routes, gen_router_routes
from halfapi.lib.routes import gen_starlette_routes, gen_router_routes
def test_gen_starlette_routes():
from .dummy_domain import routers
@ -8,6 +8,9 @@ def test_gen_starlette_routes():
assert isinstance(route, Route)
import pytest
@pytest.mark.skip
def test_api_routes():
from . import dummy_domain
d_res, d_acls = api_routes(dummy_domain)

View File

@ -6,7 +6,7 @@ from starlette.authentication import (
UnauthenticatedUser)
from halfapi.lib.schemas import schema_dict_dom, schema_to_csv, schema_csv_dict
from halfapi.lib.constants import DOMAIN_SCHEMA
from halfapi.lib.constants import DOMAIN_SCHEMA, API_SCHEMA
from halfapi import __version__
@ -22,20 +22,11 @@ def test_get_api_schema(project_runner, application_debug):
assert isinstance(c, TestClient)
d_r = r.json()
assert isinstance(d_r, dict)
def test_get_schema_route(project_runner, application_debug):
c = TestClient(application_debug)
assert isinstance(c, TestClient)
r = c.get('/halfapi/schema')
d_r = r.json()
assert isinstance(d_r, dict)
assert 'openapi' in d_r.keys()
assert 'info' in d_r.keys()
assert d_r['info']['title'] == 'HalfAPI'
assert d_r['info']['version'] == __version__
assert 'paths' in d_r.keys()
pprint(d_r)
assert API_SCHEMA.validate(d_r)
"""
def test_get_api_dummy_domain_routes(application_domain, routers):
c = TestClient(application_domain)
r = c.get('/dummy_domain')
@ -46,6 +37,7 @@ def test_get_api_dummy_domain_routes(application_domain, routers):
assert 'GET' in d_r['abc/alphabet']
assert len(d_r['abc/alphabet']['GET']) > 0
assert 'acls' in d_r['abc/alphabet']['GET']
"""
def test_schema_to_csv():
csv = schema_to_csv('dummy_domain.routers', False)
@ -58,3 +50,4 @@ def test_schema_csv_dict():
schema_d = schema_csv_dict(csv.split('\n'))
assert isinstance(schema_d, dict)