Projet

Général

Profil

0006-misc-make-logout-work-with-transient-NameID-69740.patch

Benjamin Dauvergne, 04 octobre 2022 11:47

Télécharger (8,91 ko)

Voir les différences:

Subject: [PATCH 6/8] misc: make logout work with transient NameID (#69740)

Implementation of transient NameID is special, the transient NameID is
ignored and an attribut value is used as the federation key. But in
order to producre a proper NameID for the logout request we need the
transient NameID value. To work around this problem we add a
transient_name_id attribute to the SessionIndex model representing the
current SSO session, and we modify the session dump template to use this
value instead of UserSAMLIdentifier.name_id if transient_name_id is not
None.
 mellon/adapters.py                            |  6 +++-
 .../0007_sessionindex_transient_name_id.py    | 18 ++++++++++
 mellon/models.py                              |  1 +
 mellon/templates/mellon/session_dump.xml      |  2 +-
 mellon/views.py                               |  4 +++
 tests/test_sso_slo.py                         | 35 +++++++++++++++++--
 6 files changed, 62 insertions(+), 4 deletions(-)
 create mode 100644 mellon/migrations/0007_sessionindex_transient_name_id.py
mellon/adapters.py
316 316
                            transient_federation_attribute,
317 317
                        )
318 318
                        return None
319
                saml_attributes['transient_name_id_content'] = name_id
319 320
            else:
320 321
                if self.request:
321 322
                    messages.warning(
......
459 460
        return None
460 461

  
461 462
    def _link_user(self, idp, saml_attributes, user):
463
        name_id_content = saml_attributes['name_id_content']
464
        if saml_attributes['name_id_format'] == lasso.SAML2_NAME_IDENTIFIER_FORMAT_TRANSIENT:
465
            name_id_content = saml_attributes['transient_name_id_content']
462 466
        saml_id, created = models.UserSAMLIdentifier.objects.get_or_create(
463
            name_id=saml_attributes['name_id_content'],
467
            name_id=name_id_content,
464 468
            issuer=models_utils.get_issuer(saml_attributes['issuer']),
465 469
            defaults={
466 470
                'user': user,
mellon/migrations/0007_sessionindex_transient_name_id.py
1
# Generated by Django 2.2.26 on 2022-10-03 15:23
2

  
3
from django.db import migrations, models
4

  
5

  
6
class Migration(migrations.Migration):
7

  
8
    dependencies = [
9
        ('mellon', '0006_nameid_attributes'),
10
    ]
11

  
12
    operations = [
13
        migrations.AddField(
14
            model_name='sessionindex',
15
            name='transient_name_id',
16
            field=models.TextField(null=True, verbose_name='Transient NameID'),
17
        ),
18
    ]
mellon/models.py
46 46
class SessionIndex(models.Model):
47 47
    session_index = models.TextField(_('SAML SessionIndex'))
48 48
    session_key = models.CharField(_('Django session key'), max_length=40)
49
    transient_name_id = models.TextField(verbose_name=_('Transient NameID'), null=True)
49 50
    saml_identifier = models.ForeignKey(
50 51
        verbose_name=_('SAML identifier'), to=UserSAMLIdentifier, on_delete=models.CASCADE
51 52
    )
mellon/templates/mellon/session_dump.xml
1 1
<ns0:Session xmlns:ns0="http://www.entrouvert.org/namespaces/lasso/0.0" xmlns:ns1="urn:oasis:names:tc:SAML:2.0:assertion" Version="2">{% for session_index in session_indexes %}{% with nameid=session_index.saml_identifier %}
2 2
<ns0:NidAndSessionIndex AssertionID="" ProviderID="{{ nameid.issuer.entity_id }}" SessionIndex="{{ session_index.session_index }}">
3
<ns1:NameID Format="{{ nameid.nid_format }}"{% if nameid.nid_name_qualifier %} NameQualifier="{{ nameid.nid_name_qualifier }}"{% endif %}{% if nameid.nid_sp_name_qualifier %} SPNameQualifier="{{ nameid.nid_sp_name_qualifier }}"{% endif %}{% if nameid.nid_sp_provided_id %} SPProvidedId="{{ nameid.nid_sp_provided_id}}"{% endif %}>{{ nameid.name_id }}</ns1:NameID>
3
<ns1:NameID Format="{{ nameid.nid_format }}"{% if nameid.nid_name_qualifier %} NameQualifier="{{ nameid.nid_name_qualifier }}"{% endif %}{% if nameid.nid_sp_name_qualifier %} SPNameQualifier="{{ nameid.nid_sp_name_qualifier }}"{% endif %}{% if nameid.nid_sp_provided_id %} SPProvidedId="{{ nameid.nid_sp_provided_id}}"{% endif %}>{% firstof session_index.transient_name_id nameid.name_id %}</ns1:NameID>
4 4
</ns0:NidAndSessionIndex>{% endwith %}{% endfor %}
5 5
</ns0:Session>
mellon/views.py
355 355
                saml_identifier=user.saml_identifier,
356 356
                session_key=self.request.session.session_key,
357 357
                session_index=session_index,
358
                # keep transient nameid to be able to produce logout requests
359
                transient_name_id=attributes['name_id_content']
360
                if attributes['name_id_format'] == lasso.SAML2_NAME_IDENTIFIER_FORMAT_TRANSIENT
361
                else None,
358 362
            )
359 363
        self.log.info('user %s (NameID is %r) logged in using SAML', user, attributes['name_id_content'])
360 364
        self.request.session['mellon_session'] = utils.flatten_datetime(attributes)
tests/test_sso_slo.py
16 16

  
17 17
import base64
18 18
import datetime
19
import logging
19 20
import re
20 21
import urllib.parse as urlparse
21 22
import xml.etree.ElementTree as ET
......
33 34
from httmock import response as mock_response
34 35
from pytest import fixture
35 36

  
37
from mellon import models
36 38
from mellon.utils import create_metadata
37 39
from mellon.views import lasso_decode
38 40

  
......
82 84
    session_dump = None
83 85
    identity_dump = None
84 86

  
85
    def __init__(self, idp_metadata, private_key, sp_metadata):
87
    def __init__(self, idp_metadata, private_key, sp_metadata, name_id=None):
86 88
        self.server = server = lasso.Server.newFromBuffers(idp_metadata, private_key)
87 89
        self.server.signatureMethod = lasso.SIGNATURE_METHOD_RSA_SHA256
88 90
        server.addProviderFromBuffer(lasso.PROVIDER_ROLE_SP, sp_metadata)
......
90 92
    def reset_session_dump(self):
91 93
        self.session_dump = None
92 94

  
93
    def process_authn_request_redirect(self, url, auth_result=True, consent=True, msg=None):
95
    def process_authn_request_redirect(self, url, auth_result=True, consent=True, msg=None, name_id=None):
94 96
        login = self.login = lasso.Login(self.server)
95 97
        if self.identity_dump:
96 98
            login.setIdentityFromDump(self.identity_dump)
......
121 123
                datetime.datetime.now().isoformat(),
122 124
                datetime.datetime.now().isoformat(),
123 125
            )
126
            for key in name_id or {}:
127
                setattr(login.assertion.subject.nameID, key, name_id[key])
124 128

  
125 129
            def add_attribute(name, *values, **kwargs):
126 130
                fmt = kwargs.get('fmt', lasso.SAML2_ATTRIBUTE_NAME_FORMAT_BASIC)
......
833 837

  
834 838
    response = app.post(reverse('mellon_login'), params={'SAMLResponse': body, 'RelayState': relay_state})
835 839
    assert app.session['mellon_session']['force_authn']
840

  
841

  
842
def test_sso_slo_transient_name_identifier(db, app, idp, caplog, sp_settings):
843
    caplog.set_level(logging.WARNING)
844
    sp_settings.MELLON_TRANSIENT_FEDERATION_ATTRIBUTE = 'email'
845
    response = app.get('/login/')
846
    url, body, relay_state = idp.process_authn_request_redirect(
847
        response['Location'],
848
        name_id={
849
            'format': lasso.SAML2_NAME_IDENTIFIER_FORMAT_TRANSIENT,
850
            'content': '1234',
851
        },
852
    )
853
    response = app.post('/login/', params={'SAMLResponse': body, 'RelayState': relay_state})
854

  
855
    usi = models.UserSAMLIdentifier.objects.get()
856
    assert usi.name_id == 'john.doe@gmail.com'
857
    session_index = models.SessionIndex.objects.get(saml_identifier=usi)
858
    assert session_index.transient_name_id == '1234'
859

  
860
    response = app.get('/logout/')
861
    assert urlparse.urlparse(response['Location']).path == '/singleLogout'
862
    url = idp.process_logout_request_redirect(response.location)
863
    caplog.clear()
864
    response = app.get(url)
865
    assert len(caplog.records) == 0, 'logout failed'
866
    assert response.location == '/'
836
-