0001-idp_oidc-restrict-client-s-returned-qs-to-authorized.patch
src/authentic2_idp_oidc/apps.py | ||
---|---|---|
179 | 179 | |
180 | 180 |
def a2_hook_api_modify_queryset(self, view, qs): |
181 | 181 |
from django.contrib.auth import get_user_model |
182 |
from django.db.models import Q |
|
183 |
from django.utils.timezone import now |
|
184 | ||
185 |
from .models import OIDCAuthorization, OIDCClient |
|
182 | 186 | |
183 | 187 |
client = self.get_oidc_client(view) |
184 |
# fast path |
|
188 |
User = get_user_model() |
|
189 | ||
190 |
# reduce qs to users having previously granted an authorization to the client |
|
185 | 191 |
if ( |
186 |
not issubclass(qs.model, get_user_model()) |
|
187 |
or client is None |
|
188 |
or not client.authorized_roles.exists() |
|
192 |
client |
|
193 |
and client.authorization_mode != OIDCClient.AUTHORIZATION_MODE_NONE |
|
194 |
and view.request.method == 'GET' |
|
195 |
and issubclass(qs.model, User) |
|
189 | 196 |
): |
197 |
authorizations = OIDCAuthorization.objects.none() |
|
198 |
if client.authorization_mode == OIDCClient.AUTHORIZATION_MODE_BY_SERVICE: |
|
199 |
authorizations = client.authorizations |
|
200 |
elif client.ou and hasattr(client.ou, 'oidc_authorizations'): |
|
201 |
authorizations = client.ou.oidc_authorizations |
|
202 |
qs = qs.filter( |
|
203 |
id__in=authorizations.filter(Q(expired__isnull=True) | Q(expired__gt=now())).values_list( |
|
204 |
'user__id', flat=True |
|
205 |
) |
|
206 |
) |
|
207 | ||
208 |
# fast path |
|
209 |
if not issubclass(qs.model, User) or client is None or not client.authorized_roles.exists(): |
|
190 | 210 |
return qs |
191 | 211 | |
192 | 212 |
qs = qs.filter(roles__in=client.authorized_roles.children()) |
tests/api/test_all.py | ||
---|---|---|
30 | 30 |
from django.urls import reverse |
31 | 31 |
from django.utils.encoding import force_text |
32 | 32 |
from django.utils.text import slugify |
33 |
from django.utils.timezone import now |
|
33 | 34 |
from requests.models import Response |
34 | 35 | |
35 | 36 |
from authentic2.a2_rbac.models import OrganizationalUnit as OU |
... | ... | |
40 | 41 |
from authentic2.models import Attribute, AttributeValue, AuthorizedRole, PasswordReset, Service |
41 | 42 |
from authentic2.utils.misc import good_next_url |
42 | 43 |
from django_rbac.models import SEARCH_OP |
44 |
from django_rbac.utils import get_ou_model |
|
43 | 45 | |
44 | 46 |
from ..utils import assert_event, basic_authorization_header, get_link_from_mail, login |
45 | 47 | |
... | ... | |
461 | 463 | |
462 | 464 | |
463 | 465 |
def test_api_users_create(settings, app, api_user): |
466 |
from authentic2_idp_oidc.models import OIDCAuthorization, OIDCClient |
|
467 | ||
464 | 468 |
at = Attribute.objects.create(kind='title', name='title', label='title') |
465 | 469 |
app.authorization = ('Basic', (api_user.username, api_user.username)) |
466 | 470 |
# test missing first_name |
... | ... | |
528 | 532 |
assert AttributeValue.objects.with_owner(new_user).filter(verified=True).count() == 0 |
529 | 533 |
assert AttributeValue.objects.with_owner(new_user).filter(attribute=at).exists() |
530 | 534 |
assert AttributeValue.objects.with_owner(new_user).get(attribute=at).content == payload['title'] |
535 |
if ( |
|
536 |
hasattr(api_user, 'oidc_client') |
|
537 |
and api_user.oidc_client.authorization_mode != OIDCClient.AUTHORIZATION_MODE_NONE |
|
538 |
): |
|
539 |
if api_user.oidc_client.authorization_mode == OIDCClient.AUTHORIZATION_MODE_BY_SERVICE: |
|
540 |
client_id = api_user.oidc_client.id |
|
541 |
client_ct = ContentType.objects.get_for_model(OIDCClient) |
|
542 |
else: |
|
543 |
client_id = api_user.oidc_client.ou.id |
|
544 |
client_ct = ContentType.objects.get_for_model(get_ou_model()) |
|
545 |
OIDCAuthorization.objects.create( |
|
546 |
user=new_user, |
|
547 |
client_id=client_id, |
|
548 |
client_ct=client_ct, |
|
549 |
expired=now() + datetime.timedelta(hours=1), |
|
550 |
) |
|
531 | 551 |
resp2 = app.get('/api/users/%s/' % resp.json['uuid']) |
532 | 552 |
assert resp.json == resp2.json |
533 | 553 |
payload.update({'uuid': '1234567890', 'email': 'foo@example.com', 'username': 'foobar'}) |
... | ... | |
536 | 556 |
assert 'title' in resp.json |
537 | 557 |
at.disabled = True |
538 | 558 |
at.save() |
559 |
if ( |
|
560 |
hasattr(api_user, 'oidc_client') |
|
561 |
and api_user.oidc_client.authorization_mode != OIDCClient.AUTHORIZATION_MODE_NONE |
|
562 |
): |
|
563 |
authz = OIDCAuthorization.objects.get(user=new_user) |
|
564 |
authz.user = User.objects.get(uuid='1234567890') |
|
565 |
authz.save() |
|
539 | 566 |
resp = app.get('/api/users/1234567890/') |
540 | 567 |
assert 'title' not in resp.json |
541 | 568 | |
... | ... | |
1185 | 1212 | |
1186 | 1213 | |
1187 | 1214 |
def test_api_drf_authentication_class(app, admin, user_ou1, oidc_client): |
1215 |
from authentic2_idp_oidc.models import OIDCAuthorization, OIDCClient |
|
1216 | ||
1188 | 1217 |
url = '/api/users/%s/' % user_ou1.uuid |
1189 | 1218 |
# test invalid client |
1190 | 1219 |
app.authorization = ('Basic', ('foo', 'bar')) |
... | ... | |
1198 | 1227 |
resp = app.get(url, status=401) |
1199 | 1228 |
assert resp.json['result'] == 0 |
1200 | 1229 |
assert resp.json['errors'] == "User inactive or deleted." |
1201 |
# test oidc client |
|
1230 |
# test oidc client unauthorized for user_ou1
|
|
1202 | 1231 |
app.authorization = ('Basic', (oidc_client.username, oidc_client.username)) |
1232 |
app.get(url, status=404) |
|
1233 |
OIDCAuthorization.objects.create( |
|
1234 |
client_id=oidc_client.id, |
|
1235 |
client_ct=ContentType.objects.get_for_model(OIDCClient), |
|
1236 |
user=user_ou1, |
|
1237 |
expired=now() + datetime.timedelta(hours=1), |
|
1238 |
) |
|
1239 |
# test oidc client |
|
1203 | 1240 |
app.get(url, status=200) |
1204 | 1241 |
# test oidc client without has API access |
1205 | 1242 |
oidc_client.oidc_client.has_api_access = False |
tests/conftest.py | ||
---|---|---|
301 | 301 |
def username(self): |
302 | 302 |
return self.oidc_client.client_id |
303 | 303 | |
304 |
@property |
|
305 |
def id(self): |
|
306 |
return self.oidc_client.id |
|
307 | ||
304 | 308 |
@property |
305 | 309 |
def is_superuser(self): |
306 | 310 |
return False |
tests/idp_oidc/test_api.py | ||
---|---|---|
15 | 15 |
# along with this program. If not, see <http://www.gnu.org/licenses/>. |
16 | 16 | |
17 | 17 |
import random |
18 |
from datetime import timedelta |
|
18 | 19 | |
20 |
from django.contrib.contenttypes.models import ContentType |
|
19 | 21 |
from django.utils.timezone import now |
20 | 22 | |
23 |
from authentic2.a2_rbac.utils import get_default_ou |
|
21 | 24 |
from authentic2.custom_user.models import User |
22 |
from authentic2_idp_oidc.models import OIDCClaim, OIDCClient |
|
25 |
from authentic2_idp_oidc.models import OIDCAuthorization, OIDCClaim, OIDCClient
|
|
23 | 26 |
from authentic2_idp_oidc.utils import make_sub |
27 |
from django_rbac.utils import get_ou_model |
|
24 | 28 | |
25 | 29 | |
26 | 30 |
def test_api_synchronization(app, oidc_client): |
... | ... | |
72 | 76 |
) |
73 | 77 | |
74 | 78 |
users = [User.objects.create(username=f'user-{i}', last_name=f'Name-{i}') for i in range(10)] |
79 |
expired = now() + timedelta(hours=1) |
|
80 |
for user in users: |
|
81 |
OIDCAuthorization.objects.create( |
|
82 |
client_id=oidc_client.id, |
|
83 |
client_ct=ContentType.objects.get_for_model(OIDCClient), |
|
84 |
user=user, |
|
85 |
expired=expired, |
|
86 |
) |
|
75 | 87 |
pre_modification = now().strftime('%Y-%m-%dT%H:%M:%S') |
76 | 88 |
for count, user in enumerate(users): |
77 | 89 |
user.first_name = f'User {count}' |
... | ... | |
103 | 115 |
assert user_dict['email'] |
104 | 116 |
assert user_dict['email'].startswith(user_dict['last_name']) |
105 | 117 |
assert user_dict['email'].endswith('@templated.nowhere.null') |
118 | ||
119 | ||
120 |
def test_api_users_list_queryset_reduction(app, oidc_client): |
|
121 |
oidc_client.has_api_access = True |
|
122 |
oidc_client.identifier_policy = OIDCClient.POLICY_PAIRWISE_REVERSIBLE |
|
123 |
oidc_client.save() |
|
124 | ||
125 |
pre_modification = now().strftime('%Y-%m-%dT%H:%M:%S') |
|
126 | ||
127 |
users = [User.objects.create(username=f'user-{i}', last_name=f'Name-{i}') for i in range(20)] |
|
128 |
expired = now() + timedelta(hours=1) |
|
129 |
for user in random.sample(users, k=5): |
|
130 |
OIDCAuthorization.objects.create( |
|
131 |
client_id=oidc_client.id, |
|
132 |
client_ct=ContentType.objects.get_for_model(OIDCClient), |
|
133 |
user=user, |
|
134 |
expired=expired, |
|
135 |
) |
|
136 | ||
137 |
app.authorization = ('Basic', (oidc_client.client_id, oidc_client.client_secret)) |
|
138 |
url = f'/api/users/?modified__gt={pre_modification}&claim_resolution' |
|
139 | ||
140 |
response = app.get(url, status=200) |
|
141 | ||
142 |
assert len(response.json['results']) == 5 |
|
143 | ||
144 |
oidc_client.authorization_mode = OIDCClient.AUTHORIZATION_MODE_NONE |
|
145 |
oidc_client.save() |
|
146 |
response = app.get(url, status=200) |
|
147 | ||
148 |
assert len(response.json['results']) == User.objects.count() |
|
149 | ||
150 |
oidc_client.authorization_mode = OIDCClient.AUTHORIZATION_MODE_BY_OU |
|
151 |
oidc_client.ou = get_default_ou() |
|
152 |
oidc_client.save() |
|
153 |
response = app.get(url, status=200) |
|
154 | ||
155 |
assert not response.json['results'] |
|
156 | ||
157 |
for user in random.sample(users, k=8): |
|
158 |
OIDCAuthorization.objects.create( |
|
159 |
client_id=get_default_ou().id, |
|
160 |
client_ct=ContentType.objects.get_for_model(get_ou_model()), |
|
161 |
user=user, |
|
162 |
expired=expired, |
|
163 |
) |
|
164 |
response = app.get(url, status=200) |
|
165 | ||
166 |
assert len(response.json['results']) == 8 |
|
106 |
- |