Projet

Général

Profil

0001-idp_oidc-restrict-client-s-returned-qs-to-authorized.patch

Paul Marillonnet, 02 juin 2022 18:06

Télécharger (4,34 ko)

Voir les différences:

Subject: [PATCH] idp_oidc: restrict client's returned qs to authorized users
 (#65942)

 src/authentic2_idp_oidc/apps.py | 22 ++++++++++++++-----
 tests/idp_oidc/test_api.py      | 38 ++++++++++++++++++++++++++++++++-
 2 files changed, 54 insertions(+), 6 deletions(-)
src/authentic2_idp_oidc/apps.py
179 179

  
180 180
    def a2_hook_api_modify_queryset(self, view, qs):
181 181
        from django.contrib.auth import get_user_model
182
        from django.contrib.contenttypes.models import ContentType
183
        from django.db.models import Q
184
        from django.utils.timezone import now
185

  
186
        from .models import OIDCAuthorization
182 187

  
183 188
        client = self.get_oidc_client(view)
189
        User = get_user_model()
190

  
191
        # reduce qs to users having previously granted an authorization to the client
192
        if view.request.method == 'GET' and issubclass(qs.model, User):
193
            authorized_users = OIDCAuthorization.objects.filter(
194
                Q(expired__isnull=True) | Q(expired__gt=now()),
195
                client_id=client.id,
196
                client_ct=ContentType.objects.get_for_model(client),
197
            ).values_list('user__id', flat=True)
198
            qs = qs.filter(id__in=authorized_users)
199

  
184 200
        # fast path
185
        if (
186
            not issubclass(qs.model, get_user_model())
187
            or client is None
188
            or not client.authorized_roles.exists()
189
        ):
201
        if not issubclass(qs.model, User) or client is None or not client.authorized_roles.exists():
190 202
            return qs
191 203

  
192 204
        qs = qs.filter(roles__in=client.authorized_roles.children())
tests/idp_oidc/test_api.py
15 15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 16

  
17 17
import random
18
from datetime import timedelta
18 19

  
20
from django.contrib.contenttypes.models import ContentType
19 21
from django.utils.timezone import now
20 22

  
21 23
from authentic2.custom_user.models import User
22
from authentic2_idp_oidc.models import OIDCClaim, OIDCClient
24
from authentic2_idp_oidc.models import OIDCAuthorization, OIDCClaim, OIDCClient
23 25
from authentic2_idp_oidc.utils import make_sub
24 26

  
25 27

  
......
72 74
    )
73 75

  
74 76
    users = [User.objects.create(username=f'user-{i}', last_name=f'Name-{i}') for i in range(10)]
77
    expired = now() + timedelta(hours=1)
78
    for user in users:
79
        OIDCAuthorization.objects.create(
80
            client_id=oidc_client.id,
81
            client_ct=ContentType.objects.get_for_model(OIDCClient),
82
            user=user,
83
            expired=expired,
84
        )
75 85
    pre_modification = now().strftime('%Y-%m-%dT%H:%M:%S')
76 86
    for count, user in enumerate(users):
77 87
        user.first_name = f'User {count}'
......
103 113
        assert user_dict['email']
104 114
        assert user_dict['email'].startswith(user_dict['last_name'])
105 115
        assert user_dict['email'].endswith('@templated.nowhere.null')
116

  
117

  
118
def test_api_users_list_queryset_reduction(app, oidc_client):
119
    oidc_client.has_api_access = True
120
    oidc_client.identifier_policy = OIDCClient.POLICY_PAIRWISE_REVERSIBLE
121
    oidc_client.save()
122

  
123
    pre_modification = now().strftime('%Y-%m-%dT%H:%M:%S')
124

  
125
    users = [User.objects.create(username=f'user-{i}', last_name=f'Name-{i}') for i in range(10)]
126
    expired = now() + timedelta(hours=1)
127
    for user in random.sample(users, k=5):
128
        OIDCAuthorization.objects.create(
129
            client_id=oidc_client.id,
130
            client_ct=ContentType.objects.get_for_model(OIDCClient),
131
            user=user,
132
            expired=expired,
133
        )
134

  
135
    app.authorization = ('Basic', (oidc_client.client_id, oidc_client.client_secret))
136
    response = app.get(
137
        f'/api/users/?modified__gt={pre_modification}&claim_resolution',
138
        status=200,
139
    )
140

  
141
    assert len(response.json['results']) == 5
106
-