[jwt] fix jwt middleware and add some tests
This commit is contained in:
parent
7b134f2dfb
commit
15cd059705
|
@ -3,11 +3,11 @@ from starlette.requests import Request
|
|||
from starlette.exceptions import HTTPException
|
||||
from starlette.middleware.base import BaseHTTPMiddleware
|
||||
|
||||
|
||||
class AclMiddleware(BaseHTTPMiddleware):
|
||||
def __init__(self, app, acl_module):
|
||||
super().__init__(app)
|
||||
self.acl_module = acl_module
|
||||
|
||||
async def dispatch(self, request: Request, call_next):
|
||||
""" Checks the "acls" key in the scope and applies the
|
||||
corresponding functions in the current module's acl lib.
|
||||
|
@ -21,8 +21,6 @@ class AclMiddleware(BaseHTTPMiddleware):
|
|||
try:
|
||||
fct = getattr(self.acl_module, acl_fct_name)
|
||||
if fct(request) is True:
|
||||
print(f'{fct} : {fct(request)}\n')
|
||||
|
||||
return await call_next(request)
|
||||
|
||||
except AttributeError as e:
|
||||
|
|
|
@ -43,13 +43,18 @@ class JWTUser(BaseUser):
|
|||
self.token = token
|
||||
self.payload = payload
|
||||
|
||||
def __str__(self):
|
||||
if len(self.__id) > 0:
|
||||
return self.__id
|
||||
else:
|
||||
return 'no id'
|
||||
@property
|
||||
def is_authenticated(self) -> bool:
|
||||
return True
|
||||
|
||||
@property
|
||||
def id(self) -> str:
|
||||
return self.id
|
||||
return self.__id
|
||||
|
||||
|
||||
class JWTAuthenticationBackend(AuthenticationBackend):
|
||||
|
@ -68,6 +73,9 @@ class JWTAuthenticationBackend(AuthenticationBackend):
|
|||
payload = jwt.decode(token, key=self.secret_key, algorithms=self.algorithm)
|
||||
except jwt.InvalidTokenError as e:
|
||||
raise AuthenticationError(str(e))
|
||||
except Exception as e:
|
||||
print(e)
|
||||
|
||||
|
||||
return AuthCredentials(["authenticated"]), JWTUser(
|
||||
id=payload['id'], token=token, payload=payload)
|
||||
|
|
|
@ -1,7 +1,29 @@
|
|||
import jwt
|
||||
from ..halfapi.app import app
|
||||
import requests
|
||||
import pytest
|
||||
import json
|
||||
import sys
|
||||
from hashlib import sha256
|
||||
from halfapi.app import app
|
||||
from base64 import b64decode
|
||||
|
||||
def coucou():
|
||||
return
|
||||
def test_connected():
|
||||
app.route('/', coucou)
|
||||
|
||||
def test_token():
|
||||
# This test needs to have a running auth-lirmm on 127.0.0.1:3000
|
||||
|
||||
r = requests.post('http://127.0.0.1:3000/',
|
||||
data={'email':'maizi', 'password':'a'})
|
||||
|
||||
assert len(r.text) > 0
|
||||
res = json.loads(r.text)
|
||||
assert 'token' in res.keys()
|
||||
sys.stderr.write(f'Token : {res["token"]}\n')
|
||||
secret = open('/etc/half_orm/secret').readline()
|
||||
sys.stderr.write(f'Secret : {secret}\n')
|
||||
assert jwt.decode(
|
||||
res['token'],
|
||||
secret, algorithms=['HS256'])
|
||||
|
|
Loading…
Reference in New Issue