0001-api-filter-users-based-on-OIDC-client-authorized-rol.patch
src/authentic2_idp_oidc/apps.py | ||
---|---|---|
37 | 37 |
serializer.fields['uuid'] = serializers.SerializerMethodField( |
38 | 38 |
method_name='get_oidc_uuid') |
39 | 39 | |
40 |
@classmethod |
|
41 |
def get_oidc_client(cls, view): |
|
42 |
request = view.request |
|
43 |
if not hasattr(request.user, 'oidc_client'): |
|
44 |
return None |
|
45 |
return request.user.oidc_client |
|
46 | ||
40 | 47 |
def a2_hook_api_modify_view_before_get_object(self, view): |
41 | 48 |
'''Decrypt sub used as pk argument in URL.''' |
42 | 49 |
import uuid |
43 | 50 |
from . import utils |
44 | 51 | |
45 |
request = view.request |
|
46 |
if not hasattr(request.user, 'oidc_client'): |
|
47 |
return |
|
48 |
client = request.user.oidc_client |
|
52 |
client = self.get_oidc_client(view) |
|
49 | 53 |
if client.identifier_policy != client.POLICY_PAIRWISE_REVERSIBLE: |
50 | 54 |
return |
51 | 55 |
lookup_url_kwarg = view.lookup_url_kwarg or view.lookup_field |
... | ... | |
106 | 110 |
new_unknown_uuids.append(uuid_map[u]) |
107 | 111 |
new_unknown_uuids.extend(request.unknown_uuids) |
108 | 112 |
data['unknown_uuids'] = new_unknown_uuids |
113 | ||
114 |
def a2_hook_api_modify_queryset(self, view, qs): |
|
115 |
from django.contrib.auth import get_user_model |
|
116 | ||
117 |
client = self.get_oidc_client(view) |
|
118 |
# fast path |
|
119 |
if (not issubclass(qs.model, get_user_model()) |
|
120 |
or client is None |
|
121 |
or not client.authorized_roles.exists()): |
|
122 |
return qs |
|
123 | ||
124 |
qs = qs.filter(roles__in=client.authorized_roles.children()) |
|
125 |
qs = qs.distinct() |
|
126 | ||
127 |
return qs |
|
128 |
tests/test_idp_oidc.py | ||
---|---|---|
36 | 36 | |
37 | 37 |
User = get_user_model() |
38 | 38 | |
39 |
from authentic2.models import Attribute |
|
39 |
from authentic2.models import Attribute, AuthorizedRole
|
|
40 | 40 |
from authentic2_idp_oidc.models import OIDCClient, OIDCAuthorization, OIDCCode, OIDCAccessToken, OIDCClaim |
41 | 41 |
from authentic2_idp_oidc.utils import make_sub |
42 | 42 |
from authentic2.a2_rbac.utils import get_default_ou |
... | ... | |
1133 | 1133 |
assert not client.is_valid_redirect_uri('http://example5.com/tototata/') |
1134 | 1134 |
assert not client.is_valid_redirect_uri('http://example5.com/tototata') |
1135 | 1135 | |
1136 | ||
1137 |
def test_filter_api_users(app, oidc_client, admin, simple_user, role_random): |
|
1138 |
oidc_client.has_api_access = True |
|
1139 |
oidc_client.save() |
|
1140 | ||
1141 |
if (oidc_client.identifier_policy not in |
|
1142 |
(oidc_client.POLICY_UUID, oidc_client.POLICY_PAIRWISE_REVERSIBLE)): |
|
1143 |
return |
|
1144 | ||
1145 |
app.authorization = ('Basic', (oidc_client.client_id, oidc_client.client_secret)) |
|
1146 | ||
1147 |
response = app.get('/api/users/') |
|
1148 |
count = len(response.json['results']) |
|
1149 |
assert count > 0 |
|
1150 | ||
1151 |
AuthorizedRole.objects.create(service=oidc_client, role=role_random) |
|
1152 | ||
1153 |
response = app.get('/api/users/') |
|
1154 |
assert len(response.json['results']) == 0 |
|
1155 | ||
1156 |
role_random.members.add(simple_user) |
|
1157 |
response = app.get('/api/users/') |
|
1158 |
assert len(response.json['results']) == 1 |
|
1159 | ||
1160 |
AuthorizedRole.objects.all().delete() |
|
1161 | ||
1162 |
response = app.get('/api/users/') |
|
1163 |
assert len(response.json['results']) == count |
|
1136 |
- |