12 |
12 |
|
13 |
13 |
from django.core.urlresolvers import reverse
|
14 |
14 |
from django.utils.timezone import now
|
|
15 |
from django.contrib.auth import get_user_model
|
15 |
16 |
|
16 |
|
from authentic2_idp_oidc.models import OIDCClient, OIDCAuthorization, OIDCCode, OIDCAccessToken, OIDCClaim
|
|
17 |
User = get_user_model()
|
|
18 |
|
|
19 |
from authentic2_idp_oidc.models import OIDCClient, OIDCAuthorization, OIDCCode
|
17 |
20 |
from authentic2_idp_oidc.utils import make_sub
|
18 |
21 |
from authentic2.a2_rbac.utils import get_default_ou
|
19 |
22 |
from authentic2.utils import make_url
|
... | ... | |
85 |
88 |
'frontchannel_logout_uri': 'https://example.com/southpark/logout/',
|
86 |
89 |
'frontchannel_timeout': 3000,
|
87 |
90 |
},
|
|
91 |
{
|
|
92 |
'identifier_policy': OIDCClient.POLICY_PAIRWISE_REVERSIBLE,
|
|
93 |
},
|
88 |
94 |
]
|
89 |
95 |
|
90 |
96 |
|
... | ... | |
869 |
875 |
executor.loader.build_graph()
|
870 |
876 |
client = OIDCClient.objects.first()
|
871 |
877 |
assert OIDCClaim.objects.filter(client=client.id).count() == 5
|
|
878 |
|
|
879 |
|
|
880 |
def test_api_synchronization(app, oidc_client):
|
|
881 |
oidc_client.has_api_access = True
|
|
882 |
oidc_client.save()
|
|
883 |
users = [User.objects.create(username='user-%s' % i) for i in range(10)]
|
|
884 |
for user in users[5:]:
|
|
885 |
user.delete()
|
|
886 |
deleted_subs = set(make_sub(oidc_client, user) for user in users[5:])
|
|
887 |
|
|
888 |
app.authorization = ('Basic', (oidc_client.client_id, oidc_client.client_secret))
|
|
889 |
status = 200
|
|
890 |
if oidc_client.identifier_policy not in (OIDCClient.POLICY_PAIRWISE_REVERSIBLE, OIDCClient.POLICY_UUID):
|
|
891 |
status = 401
|
|
892 |
response = app.post_json('/api/users/synchronization/',
|
|
893 |
params={
|
|
894 |
'known_uuids': [make_sub(oidc_client, user) for user in users]},
|
|
895 |
status=status)
|
|
896 |
if status == 200:
|
|
897 |
assert response.json['result'] == 1
|
|
898 |
assert set(response.json['unknown_uuids']) == deleted_subs
|
872 |
|
-
|