Projet

Général

Profil

0001-auth_oidc-add-a-STRATEGY_FIND_EMAIL-user-matching-pr.patch

Paul Marillonnet, 28 avril 2022 12:08

Télécharger (15,1 ko)

Voir les différences:

Subject: [PATCH] auth_oidc: add a STRATEGY_FIND_EMAIL user-matching provider
 option (#63729)

 src/authentic2_auth_oidc/backends.py          | 177 ++++++++++++------
 .../migrations/0004_auto_20171017_1522.py     |   4 +
 src/authentic2_auth_oidc/models.py            |   5 +
 tests/test_auth_oidc.py                       |  53 ++++++
 4 files changed, 179 insertions(+), 60 deletions(-)
src/authentic2_auth_oidc/backends.py
26 26

  
27 27
from authentic2 import app_settings, hooks
28 28
from authentic2.a2_rbac.models import OrganizationalUnit
29
from authentic2.a2_rbac.utils import get_default_ou
29 30
from authentic2.utils.crypto import base64url_encode
30 31
from authentic2.utils.template import Template
31 32

  
......
132 133
            logger.warning('auth_oidc: id_token nonce %r != expected nonce %r', id_token_nonce, nonce)
133 134
            return None
134 135

  
135
        User = get_user_model()
136
        user = None
137
        if provider.strategy == models.OIDCProvider.STRATEGY_FIND_UUID:
138
            # use the OP sub to find an user by UUUID
139
            # it means OP and RP share the same account base and OP is passing its UUID as sub
140
            try:
141
                user = User.objects.get(uuid=id_token.sub, is_active=True)
142
            except User.DoesNotExist:
143
                pass
144
            else:
145
                logger.info('auth_oidc: found user using UUID (=sub) "%s": %s', id_token.sub, user)
146
        elif provider.strategy == models.OIDCProvider.STRATEGY_FIND_USERNAME:
147
            users = User.objects.filter(username=id_token.sub, is_active=True).order_by('pk')
148
            if not users:
149
                logger.warning('auth_oidc: user with username (=sub) "%s" not found', id_token.sub)
150
            else:
151
                user = users[0]
152
                logger.info('auth_oidc: found user using username (=sub) "%s": %s', id_token.sub, user)
153
        else:
154
            try:
155
                user = User.objects.get(
156
                    oidc_account__provider=provider, oidc_account__sub=id_token.sub, is_active=True
157
                )
158
            except User.DoesNotExist:
159
                pass
160
            else:
161
                logger.info('auth_oidc: found user using with sub "%s": %s', id_token.sub, user)
136
        # map claims to attributes or user fields
137
        # mapping is done before eventual creation of user as EMAIL_IS_UNIQUE needs to know if the
138
        # mapping will provide some mail to us
139
        ou_map = {ou.slug: ou for ou in OrganizationalUnit.cached()}
140
        user_ou = provider.ou
141
        user_info = None
142
        save_user = False
143
        mappings = []
144
        context = id_token_content.copy()
162 145
        need_user_info = False
163 146
        for claim_mapping in provider.claim_mappings.all():
164 147
            need_user_info = need_user_info or not claim_mapping.idtoken_claim
165 148

  
166
        user_info = None
167 149
        if need_user_info:
168 150
            if not access_token:
169 151
                logger.warning('auth_oidc: need user info for some claims, but no access token was returned')
......
185 167
                    logger.warning('auth_oidc: bad JSON in user info response, %s (%r)', e, response.content)
186 168
                else:
187 169
                    logger.debug('auth_oidc: user_info content %s', user_info)
188

  
189
        # check for required claims
190
        for claim_mapping in provider.claim_mappings.all():
191
            claim = claim_mapping.claim
192
            if claim_mapping.required:
193
                if '{{' in claim or '{%' in claim:
194
                    logger.warning('claim \'%r\' is templated, it cannot be set as required')
195
                elif claim_mapping.idtoken_claim and claim not in id_token:
196
                    logger.warning(
197
                        'auth_oidc: cannot create user missing required claim %r in id_token (%r)',
198
                        claim,
199
                        id_token,
200
                    )
201
                    return None
202
                elif not user_info or claim not in user_info:
203
                    logger.warning(
204
                        'auth_oidc: cannot create user missing required claim %r in user_info (%r)',
205
                        claim,
206
                        user_info,
207
                    )
208
                    return None
209

  
210
        # map claims to attributes or user fields
211
        # mapping is done before eventual creation of user as EMAIL_IS_UNIQUE needs to know if the
212
        # mapping will provide some mail to us
213
        ou_map = {ou.slug: ou for ou in OrganizationalUnit.cached()}
214
        user_ou = provider.ou
215
        save_user = False
216
        mappings = []
217
        context = id_token_content.copy()
218
        if need_user_info:
219
            context.update(user_info or {})
170
                    context.update(user_info or {})
220 171

  
221 172
        for claim_mapping in provider.claim_mappings.all():
222 173
            claim = claim_mapping.claim
......
243 194
                verified = True
244 195
            mappings.append((attribute, value, verified))
245 196

  
197
        # check for required claims
198
        for claim_mapping in provider.claim_mappings.all():
199
            claim = claim_mapping.claim
200
            if claim_mapping.required:
201
                if '{{' in claim or '{%' in claim:
202
                    logger.warning('claim \'%r\' is templated, it cannot be set as required')
203
                elif claim_mapping.idtoken_claim and claim not in id_token:
204
                    logger.warning(
205
                        'auth_oidc: cannot create user missing required claim %r in id_token (%r)',
206
                        claim,
207
                        id_token,
208
                    )
209
                    return None
210
                elif not user_info or claim not in user_info:
211
                    logger.warning(
212
                        'auth_oidc: cannot create user missing required claim %r in user_info (%r)',
213
                        claim,
214
                        user_info,
215
                    )
216
                    return None
217

  
246 218
        # find en email in mappings
247 219
        email = None
248 220
        for attribute, value, verified in mappings:
249 221
            if attribute == 'email':
250 222
                email = value
251 223

  
224
        User = get_user_model()
225
        user = None
226
        if provider.strategy == models.OIDCProvider.STRATEGY_FIND_UUID:
227
            # use the OP sub to find an user by UUUID
228
            # it means OP and RP share the same account base and OP is passing its UUID as sub
229
            try:
230
                user = User.objects.get(uuid=id_token.sub, is_active=True)
231
            except User.DoesNotExist:
232
                pass
233
            else:
234
                logger.info('auth_oidc: found user using UUID (=sub) "%s": %s', id_token.sub, user)
235
        elif provider.strategy == models.OIDCProvider.STRATEGY_FIND_USERNAME:
236
            users = User.objects.filter(username=id_token.sub, is_active=True).order_by('pk')
237
            if not users:
238
                logger.warning('auth_oidc: user with username (=sub) "%s" not found', id_token.sub)
239
            else:
240
                user = users[0]
241
                logger.info('auth_oidc: found user using username (=sub) "%s": %s', id_token.sub, user)
242
        elif provider.strategy == models.OIDCProvider.STRATEGY_FIND_EMAIL:
243
            if email and id_token.sub != email:
244
                logger.warning(
245
                    'auth_oidc: email claim (%s) does not match subject identifier (%s) yet STRATEGY_FIND_EMAIL, fallback on email claim.',
246
                    email,
247
                    id_token.sub,
248
                )
249
            elif not email:
250
                logger.warning(
251
                    'auth_oidc: email claim absent yet STRATEGY_FIND_EMAIL is set, using subject identifier (%s) instead',
252
                    id_token.sub,
253
                )
254
                email = id_token.sub
255

  
256
            user = None
257
            try:
258
                user = User.objects.get(email__iexact=email, is_active=True)
259
            except User.DoesNotExist:
260
                logger.warning('auth_oidc: user with email "%s" not found', email)
261
            except User.MultipleObjectsReturned:
262
                default_ou = get_default_ou()
263
                if provider.ou and provider.ou != default_ou:
264
                    logger.info(
265
                        'auth_oidc: searching user with email "%s" within provider %s\'s ou %s',
266
                        email,
267
                        provider,
268
                        provider.ou,
269
                    )
270
                    try:
271
                        user = User.objects.get(email__iexact=email, ou=provider.ou, is_active=True)
272
                    except User.DoesNotExist:
273
                        logger.warning(
274
                            'auth_oidc: user with email "%s" not found with provider %s\'s ou %s',
275
                            email,
276
                            provider,
277
                            provider.ou,
278
                        )
279
                    else:
280
                        logger.info(
281
                            'auth_oidc: found user using email "%s": %s, within provider %s\'s ou %s',
282
                            email,
283
                            user,
284
                            provider,
285
                            provider.ou,
286
                        )
287
                else:
288
                    logger.info('auth_oidc: searching user with email "%s" within default ou', email)
289
                    try:
290
                        user = User.objects.get(email__iexact=email, ou=default_ou, is_active=True)
291
                    except User.DoesNotExist:
292
                        logger.warning('auth_oidc: user with email "%s" not found with default ou', email)
293
                    else:
294
                        logger.info(
295
                            'auth_oidc: found user using email "%s": %s, within default ou', email, user
296
                        )
297
            else:
298
                logger.info('auth_oidc: found user using email "%s": %s', email, user)
299
        else:
300
            try:
301
                user = User.objects.get(
302
                    oidc_account__provider=provider, oidc_account__sub=id_token.sub, is_active=True
303
                )
304
            except User.DoesNotExist:
305
                pass
306
            else:
307
                logger.info('auth_oidc: found user using with sub "%s": %s', id_token.sub, user)
308

  
252 309
        # eventually create a new user or link to an existing one based on email
253 310
        created_user = False
254 311
        linked = False
src/authentic2_auth_oidc/migrations/0004_auto_20171017_1522.py
18 18
                    ('create', 'create if standard account matching failed'),
19 19
                    ('find-uuid', 'use sub to find existing user through UUID'),
20 20
                    ('find-username', 'use sub to find existing user through username'),
21
                    (
22
                        'find-email',
23
                        'use email claim (or sub if claim is absent) to find existing user through email',
24
                    ),
21 25
                    ('none', 'none'),
22 26
                ],
23 27
            ),
src/authentic2_auth_oidc/models.py
42 42
    STRATEGY_CREATE = 'create'
43 43
    STRATEGY_FIND_UUID = 'find-uuid'
44 44
    STRATEGY_FIND_USERNAME = 'find-username'
45
    STRATEGY_FIND_EMAIL = 'find-email'
45 46
    STRATEGY_NONE = 'none'
46 47

  
47 48
    STRATEGIES = [
48 49
        (STRATEGY_CREATE, _('create if standard account matching failed')),
49 50
        (STRATEGY_FIND_UUID, _('use sub to find existing user through UUID')),
50 51
        (STRATEGY_FIND_USERNAME, _('use sub to find existing user through username')),
52
        (
53
            STRATEGY_FIND_EMAIL,
54
            _('use email claim (or sub if claim is absent) to find existing user through email'),
55
        ),
51 56
        (STRATEGY_NONE, _('none')),
52 57
    ]
53 58
    ALGO_NONE = 0
tests/test_auth_oidc.py
678 678
    assert response.location.startswith('https://server.example.com/logout?')
679 679

  
680 680

  
681
def test_strategy_find_email(app, caplog, code, oidc_provider, oidc_provider_jwkset, simple_user):
682
    OIDCClaimMapping.objects.all().delete()
683
    OIDCClaimMapping.objects.create(
684
        provider=oidc_provider,
685
        claim='email',
686
        attribute='email',
687
        idtoken_claim=False,  # served by user_info endpoint
688
    )
689
    oidc_provider.strategy = oidc_provider.STRATEGY_FIND_EMAIL
690
    oidc_provider.save()
691

  
692
    assert User.objects.count() == 1
693

  
694
    response = app.get('/').maybe_follow()
695
    assert oidc_provider.name in response.text
696
    response = response.click(oidc_provider.name)
697
    location = urllib.parse.urlparse(response.location)
698
    query = QueryDict(location.query)
699
    state = query['state']
700
    nonce = query['nonce']
701

  
702
    with utils.check_log(caplog, 'cannot create user'):
703
        with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code, nonce=nonce):
704
            response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state})
705

  
706
    simple_user.email = 'sub@example.com'
707
    simple_user.save()
708

  
709
    with utils.check_log(caplog, 'cannot create user'):
710
        with oidc_provider_mock(
711
            oidc_provider, oidc_provider_jwkset, code, sub='sub@example.com', nonce=nonce
712
        ):
713
            response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state})
714

  
715
    simple_user.email = 'john.doe@example.com'
716
    simple_user.save()
717

  
718
    with utils.check_log(caplog, 'found user using email'):
719
        with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code, nonce=nonce):
720
            response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state})
721

  
722
    assert urllib.parse.urlparse(response['Location']).path == '/'
723
    assert User.objects.count() == 1
724
    user = User.objects.get()
725
    # verify user was not modified
726
    assert user.username == 'user'
727
    assert user.first_name == 'Jôhn'
728
    assert user.last_name == 'Dôe'
729
    assert user.email == 'john.doe@example.com'
730
    assert user.attributes.first_name == 'Jôhn'
731
    assert user.attributes.last_name == 'Dôe'
732

  
733

  
681 734
def test_strategy_create(app, caplog, code, oidc_provider, oidc_provider_jwkset):
682 735
    oidc_provider.ou.email_is_unique = True
683 736
    oidc_provider.ou.save()
684
-