[cli] fix halfapi domain --read
This commit is contained in:
parent
46e62575ae
commit
c658815eb5
|
@ -13,7 +13,9 @@ import orjson
|
||||||
|
|
||||||
|
|
||||||
from .cli import cli
|
from .cli import cli
|
||||||
from ..conf import write_config
|
from ..conf import write_config, CONFIG
|
||||||
|
|
||||||
|
from ..half_domain import HalfDomain
|
||||||
|
|
||||||
from ..lib.domain import domain_schema
|
from ..lib.domain import domain_schema
|
||||||
from ..lib.schemas import schema_dict_dom
|
from ..lib.schemas import schema_dict_dom
|
||||||
|
@ -144,10 +146,12 @@ def domain(domain, delete, update, create, read): #, domains, read, create, upd
|
||||||
if delete:
|
if delete:
|
||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
if read:
|
if read:
|
||||||
m_domain = importlib.import_module(domain)
|
half_domain = HalfDomain(
|
||||||
|
domain,
|
||||||
|
config=CONFIG.get('domain').get('domain_name', {}))
|
||||||
|
|
||||||
click.echo(orjson.dumps(
|
click.echo(orjson.dumps(
|
||||||
domain_schema(m_domain),
|
half_domain.schema(),
|
||||||
option=orjson.OPT_NON_STR_KEYS,
|
option=orjson.OPT_NON_STR_KEYS,
|
||||||
default=ORJSONResponse.default_cast)
|
default=ORJSONResponse.default_cast)
|
||||||
)
|
)
|
||||||
|
|
|
@ -24,7 +24,7 @@ from .lib.domain_middleware import DomainMiddleware
|
||||||
from .logging import logger
|
from .logging import logger
|
||||||
|
|
||||||
class HalfDomain(Starlette):
|
class HalfDomain(Starlette):
|
||||||
def __init__(self, app, domain, router=None, config={}):
|
def __init__(self, domain, router=None, config={}, app=None):
|
||||||
self.app = app
|
self.app = app
|
||||||
|
|
||||||
self.m_domain = importlib.import_module(domain)
|
self.m_domain = importlib.import_module(domain)
|
||||||
|
@ -292,7 +292,7 @@ class HalfDomain(Starlette):
|
||||||
Generator(HalfRoute)
|
Generator(HalfRoute)
|
||||||
"""
|
"""
|
||||||
yield HalfRoute('/',
|
yield HalfRoute('/',
|
||||||
JSONRoute(self.domain_schema()),
|
JSONRoute(self.schema()),
|
||||||
[{'acl': acl.public}],
|
[{'acl': acl.public}],
|
||||||
'GET'
|
'GET'
|
||||||
)
|
)
|
||||||
|
@ -300,7 +300,7 @@ class HalfDomain(Starlette):
|
||||||
for path, method, m_router, fct, params in HalfDomain.gen_router_routes(self.m_router, []):
|
for path, method, m_router, fct, params in HalfDomain.gen_router_routes(self.m_router, []):
|
||||||
yield HalfRoute(f'/{path}', fct, params, method)
|
yield HalfRoute(f'/{path}', fct, params, method)
|
||||||
|
|
||||||
def domain_schema_dict(self) -> Dict:
|
def schema_dict(self) -> Dict:
|
||||||
""" gen_router_routes return values as a dict
|
""" gen_router_routes return values as a dict
|
||||||
Parameters:
|
Parameters:
|
||||||
|
|
||||||
|
@ -334,7 +334,7 @@ class HalfDomain(Starlette):
|
||||||
return d_res
|
return d_res
|
||||||
|
|
||||||
|
|
||||||
def domain_schema(self) -> Dict:
|
def schema(self) -> Dict:
|
||||||
schema = { **API_SCHEMA_DICT }
|
schema = { **API_SCHEMA_DICT }
|
||||||
schema['domain'] = {
|
schema['domain'] = {
|
||||||
'name': self.name,
|
'name': self.name,
|
||||||
|
@ -344,7 +344,7 @@ class HalfDomain(Starlette):
|
||||||
'routers': self.m_router.__name__,
|
'routers': self.m_router.__name__,
|
||||||
'acls': tuple(getattr(self.m_acl, 'ACLS', ()))
|
'acls': tuple(getattr(self.m_acl, 'ACLS', ()))
|
||||||
}
|
}
|
||||||
schema['paths'] = self.domain_schema_dict()
|
schema['paths'] = self.schema_dict()
|
||||||
return schema
|
return schema
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -1,30 +0,0 @@
|
||||||
import subprocess
|
|
||||||
from pprint import pprint
|
|
||||||
from starlette.testclient import TestClient
|
|
||||||
from starlette.authentication import (
|
|
||||||
AuthenticationBackend, AuthenticationError, BaseUser, AuthCredentials,
|
|
||||||
UnauthenticatedUser)
|
|
||||||
|
|
||||||
from halfapi.lib.schemas import schema_dict_dom, schema_to_csv, schema_csv_dict
|
|
||||||
from halfapi.lib.constants import DOMAIN_SCHEMA, API_SCHEMA
|
|
||||||
|
|
||||||
from halfapi import __version__
|
|
||||||
|
|
||||||
def test_schemas_dict_dom():
|
|
||||||
from .dummy_domain import routers
|
|
||||||
schema = schema_dict_dom({
|
|
||||||
'dummy_domain':routers})
|
|
||||||
assert isinstance(schema, dict)
|
|
||||||
|
|
||||||
def test_schema_to_csv():
|
|
||||||
csv = schema_to_csv('dummy_domain.routers', False)
|
|
||||||
assert isinstance(csv, str)
|
|
||||||
assert len(csv.split('\n')) > 0
|
|
||||||
|
|
||||||
def test_schema_csv_dict():
|
|
||||||
csv = schema_to_csv('dummy_domain.routers', False)
|
|
||||||
assert isinstance(csv, str)
|
|
||||||
schema_d = schema_csv_dict(csv.split('\n'))
|
|
||||||
assert isinstance(schema_d, dict)
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue