2020-08-27 17:45:23 +02:00
|
|
|
|
import os
|
2020-07-03 11:42:10 +02:00
|
|
|
|
import jwt
|
2020-08-27 17:45:23 +02:00
|
|
|
|
from requests import Request
|
2020-07-08 12:41:24 +02:00
|
|
|
|
import pytest
|
2020-08-27 17:45:23 +02:00
|
|
|
|
from unittest.mock import patch
|
2020-07-08 12:41:24 +02:00
|
|
|
|
import json
|
2020-07-09 10:27:45 +02:00
|
|
|
|
from json.decoder import JSONDecodeError
|
2020-07-08 12:41:24 +02:00
|
|
|
|
import sys
|
|
|
|
|
from hashlib import sha256
|
|
|
|
|
from base64 import b64decode
|
2020-08-27 17:45:23 +02:00
|
|
|
|
from uuid import uuid4, UUID
|
|
|
|
|
|
2020-07-09 10:27:45 +02:00
|
|
|
|
from starlette.testclient import TestClient
|
2020-08-27 17:45:23 +02:00
|
|
|
|
from starlette.authentication import (
|
|
|
|
|
AuthenticationBackend, AuthenticationError, BaseUser, AuthCredentials,
|
|
|
|
|
UnauthenticatedUser)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#from halfapi.app import app
|
2020-09-25 01:06:21 +02:00
|
|
|
|
#os.environ['HALFAPI_PROD'] = 'True'
|
2020-08-27 17:45:23 +02:00
|
|
|
|
os.environ['HALFAPI_SECRET'] = 'randomsecret'
|
2020-07-09 10:27:45 +02:00
|
|
|
|
|
2020-08-27 17:45:23 +02:00
|
|
|
|
from halfapi.lib.jwt_middleware import (PRODUCTION, SECRET,
|
|
|
|
|
JWTUser, JWTAuthenticationBackend,
|
2020-07-09 10:27:45 +02:00
|
|
|
|
JWTWebSocketAuthenticationBackend)
|
2020-07-03 11:42:10 +02:00
|
|
|
|
|
2020-08-27 17:45:23 +02:00
|
|
|
|
def test_constants():
|
2020-09-25 01:06:21 +02:00
|
|
|
|
assert isinstance(PRODUCTION, bool)
|
|
|
|
|
assert isinstance(SECRET, str)
|
2020-08-27 17:45:23 +02:00
|
|
|
|
|
2020-07-09 10:27:45 +02:00
|
|
|
|
@pytest.fixture
|
|
|
|
|
def token():
|
|
|
|
|
# This fixture needs to have a running auth-lirmm on 127.0.0.1:3000
|
|
|
|
|
# Sets a valid token
|
2020-07-08 12:41:24 +02:00
|
|
|
|
|
|
|
|
|
r = requests.post('http://127.0.0.1:3000/',
|
|
|
|
|
data={'email':'maizi', 'password':'a'})
|
|
|
|
|
|
2020-07-09 10:27:45 +02:00
|
|
|
|
if len(r.text) <= 0:
|
|
|
|
|
raise Exception('No result in token retrieval')
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
res = json.loads(r.text)
|
|
|
|
|
except JSONDecodeError:
|
|
|
|
|
raise Exception('Malformed response from token retrieval')
|
|
|
|
|
|
|
|
|
|
if 'token' not in res.keys():
|
|
|
|
|
raise Exception('Missing token in token request')
|
|
|
|
|
|
|
|
|
|
return res['token']
|
|
|
|
|
|
2020-08-27 17:45:23 +02:00
|
|
|
|
|
|
|
|
|
@pytest.fixture
|
|
|
|
|
def token_builder():
|
|
|
|
|
yield jwt.encode({
|
|
|
|
|
'name':'xxx',
|
|
|
|
|
'id': str(uuid4())},
|
|
|
|
|
key=SECRET
|
|
|
|
|
)
|
|
|
|
|
|
2020-08-27 18:09:48 +02:00
|
|
|
|
@pytest.fixture
|
|
|
|
|
def token_debug_false_builder():
|
|
|
|
|
yield jwt.encode({
|
|
|
|
|
'name':'xxx',
|
|
|
|
|
'id': str(uuid4()),
|
|
|
|
|
'debug': False},
|
|
|
|
|
key=SECRET
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.fixture
|
|
|
|
|
def token_debug_true_builder():
|
|
|
|
|
yield jwt.encode({
|
|
|
|
|
'name':'xxx',
|
|
|
|
|
'id': str(uuid4()),
|
|
|
|
|
'debug': True},
|
|
|
|
|
key=SECRET
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
2020-08-27 17:45:23 +02:00
|
|
|
|
|
2020-07-10 12:58:53 +02:00
|
|
|
|
@pytest.fixture
|
|
|
|
|
def token_dirser():
|
|
|
|
|
# This fixture needs to have a running auth-lirmm on 127.0.0.1:3000
|
|
|
|
|
# Sets a valid token
|
|
|
|
|
|
|
|
|
|
r = requests.post('http://127.0.0.1:3000/',
|
|
|
|
|
data={'email':'dhenaut', 'password':'a'})
|
|
|
|
|
|
|
|
|
|
if len(r.text) <= 0:
|
|
|
|
|
raise Exception('No result in token retrieval')
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
res = json.loads(r.text)
|
|
|
|
|
except JSONDecodeError:
|
|
|
|
|
raise Exception('Malformed response from token retrieval')
|
|
|
|
|
|
|
|
|
|
if 'token' not in res.keys():
|
|
|
|
|
raise Exception('Missing token in token request')
|
|
|
|
|
|
|
|
|
|
return res['token']
|
|
|
|
|
|
2020-07-09 10:27:45 +02:00
|
|
|
|
|
2020-08-27 17:45:23 +02:00
|
|
|
|
"""
|
2020-07-09 10:27:45 +02:00
|
|
|
|
def test_token(token):
|
|
|
|
|
client = TestClient(app)
|
|
|
|
|
|
|
|
|
|
r = client.get('/user', headers={'Authorization':token})
|
|
|
|
|
res = False
|
|
|
|
|
try:
|
|
|
|
|
res = json.loads(r.text)
|
|
|
|
|
except JSONDecodeError:
|
|
|
|
|
raise Exception('Malformed response from /user request')
|
|
|
|
|
|
|
|
|
|
assert 'user' in res.keys()
|
|
|
|
|
assert 'id' in res['user'].keys()
|
|
|
|
|
assert 'token' in res['user'].keys()
|
|
|
|
|
assert 'payload' in res['user'].keys()
|
2020-07-10 12:58:53 +02:00
|
|
|
|
|
|
|
|
|
def test_labopers(token, token_dirser):
|
|
|
|
|
res = requests.get('http://127.0.0.1:8080/api/v4/organigramme/laboratoire/personnel',
|
|
|
|
|
params={
|
|
|
|
|
'q': 'limit:10|format:csv'
|
|
|
|
|
},
|
|
|
|
|
headers={
|
|
|
|
|
'Authorization': token
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
assert res.status_code == 401
|
|
|
|
|
|
|
|
|
|
res = requests.get('http://127.0.0.1:8080/api/v4/organigramme/laboratoire/personnel',
|
|
|
|
|
params={
|
|
|
|
|
'q': 'limit:10|format:csv'
|
|
|
|
|
},
|
|
|
|
|
headers={
|
|
|
|
|
'Authorization': token_dirser
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
assert res.status_code == 200
|
2020-08-27 17:45:23 +02:00
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
def test_JWTUser():
|
|
|
|
|
uid = uuid4()
|
|
|
|
|
token = '{}'
|
|
|
|
|
payload = {}
|
|
|
|
|
user = JWTUser(uid, token, payload)
|
|
|
|
|
assert user.id == uid
|
|
|
|
|
assert user.token == token
|
|
|
|
|
assert user.payload == payload
|
|
|
|
|
assert user.is_authenticated == True
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_JWTAuthenticationBackend(token_builder):
|
|
|
|
|
backend = JWTAuthenticationBackend()
|
|
|
|
|
assert backend.secret_key == SECRET
|
|
|
|
|
|
|
|
|
|
req = Request(
|
|
|
|
|
headers={
|
|
|
|
|
'Authorization': token_builder
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
credentials, user = await backend.authenticate(req)
|
|
|
|
|
assert type(user) == JWTUser
|
|
|
|
|
assert type(credentials) == AuthCredentials
|
2020-08-27 18:09:48 +02:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_JWTAuthenticationBackend_DebugFalse(token_debug_false_builder):
|
|
|
|
|
backend = JWTAuthenticationBackend()
|
|
|
|
|
assert backend.secret_key == SECRET
|
|
|
|
|
|
|
|
|
|
req = Request(
|
|
|
|
|
headers={
|
|
|
|
|
'Authorization': token_debug_false_builder
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
credentials, user = await backend.authenticate(req)
|
|
|
|
|
assert type(user) == JWTUser
|
|
|
|
|
assert type(credentials) == AuthCredentials
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_JWTAuthenticationBackend_DebugTrue(token_debug_true_builder):
|
|
|
|
|
backend = JWTAuthenticationBackend()
|
|
|
|
|
assert backend.secret_key == SECRET
|
|
|
|
|
|
|
|
|
|
req = Request(
|
|
|
|
|
headers={
|
|
|
|
|
'Authorization': token_debug_true_builder
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
await backend.authenticate(req)
|
|
|
|
|
except Exception as e:
|
|
|
|
|
assert type(e) == AuthenticationError
|