Projet

Général

Profil

0004-auth_saml-add-roles-using-model-and-remove-useless-c.patch

Valentin Deniaud, 17 août 2022 11:43

Télécharger (12,1 ko)

Voir les différences:

Subject: [PATCH 4/8] auth_saml: add roles using model and remove useless code
 (#67025)

 src/authentic2_auth_saml/adapters.py | 136 ++-------------------------
 tests/test_auth_saml.py              |  59 ++----------
 2 files changed, 16 insertions(+), 179 deletions(-)
src/authentic2_auth_saml/adapters.py
18 18
import logging
19 19

  
20 20
from django.contrib import messages
21
from django.core.exceptions import MultipleObjectsReturned
22 21
from django.db.transaction import atomic
23 22
from django.utils.translation import ugettext as _
24 23
from mellon.adapters import DefaultAdapter, UserCreationError
25
from mellon.utils import get_setting
26 24

  
27
from authentic2.a2_rbac.models import OrganizationalUnit as OU
28
from authentic2.a2_rbac.models import Role
29 25
from authentic2.a2_rbac.utils import get_default_ou
30 26
from authentic2.backends import get_user_queryset
31 27
from authentic2.models import Lock
32 28
from authentic2.utils import misc as utils_misc
33
from authentic2.utils.evaluate import evaluate_condition
34 29

  
35 30
from .models import SAMLAuthenticator
36 31

  
......
114 109

  
115 110
    @atomic
116 111
    def provision_a2_attributes(self, user, idp, saml_attributes):
117
        """Copy incoming SAML attributes to user attributes, A2_ATTRIBUTE_MAPPING must be a list of
118
        dictionaries like:
119

  
120
           {
121
               'attribute': 'email',
122
               'saml_attribute': 'email',
123
               # optional:
124
               'mandatory': False,
125
           }
126

  
127
         If an attribute is not mandatory any error is just logged, if the attribute is
128
         mandatory, login will fail.
129
        """
130 112
        saml_attributes = saml_attributes.copy()
131
        attribute_mapping = get_setting(idp, 'A2_ATTRIBUTE_MAPPING', [])
132

  
133
        if not isinstance(attribute_mapping, list):
134
            raise MappingError(_('invalid A2_ATTRIBUTE_MAPPING'))
135 113

  
136
        self.apply_attribute_mapping(user, idp, saml_attributes, attribute_mapping)
137

  
138
    def apply_attribute_mapping(self, user, idp, saml_attributes, attribute_mapping):
139 114
        self.rename_attributes(idp, saml_attributes)
140 115
        self.set_attributes(user, idp, saml_attributes)
141

  
142
        for mapping in attribute_mapping:
143
            if not isinstance(mapping, dict):
144
                raise MappingError(_('invalid mapping action "%(mapping)s"'), mapping=mapping)
145
            action = mapping.get('action', 'set-attribute')
146
            mandatory = mapping.get('mandatory', False) is True
147
            method = None
148
            if isinstance(action, str):
149
                try:
150
                    method = getattr(self, 'action_' + action.replace('-', '_'))
151
                except AttributeError:
152
                    pass
153
            try:
154
                if not method:
155
                    raise MappingError(_('invalid action'))
156
                logger.debug('auth_saml: applying provisionning mapping %s', mapping)
157
                method(user, idp, saml_attributes, mapping)
158
            except MappingError as e:
159
                if mandatory:
160
                    # it's mandatory, provisionning should fail completely
161
                    raise e
162
                logger.warning('auth_saml: mapping action failed: %s', e)
116
        self.action_add_role(user, idp, saml_attributes)
163 117

  
164 118
    def rename_attributes(self, idp, saml_attributes):
165 119
        for action in idp['authenticator'].rename_attribute_actions.all():
......
208 162
                return True
209 163
        return False
210 164

  
211
    def get_ou(self, role_desc):
212
        ou_desc = role_desc.get('ou')
213
        if ou_desc is None:
214
            return None
215
        if not isinstance(ou_desc, dict):
216
            raise MappingError(_('invalid ou description'))
217
        slug = ou_desc.get('slug')
218
        name = ou_desc.get('name')
219
        if slug:
220
            if not isinstance(slug, str):
221
                raise MappingError(_('invalid ou.slug in ou description'))
222
            try:
223
                return OU.objects.get(slug=slug)
224
            except OU.DoesNotExist:
225
                raise MappingError(_('unknown ou: "%(slug)s"'), slug=slug)
226
        elif name:
227
            if not isinstance(name, str):
228
                raise MappingError(_('invalid ou.slug in ou description'))
229
            try:
230
                return OU.objects.get(name=name)
231
            except OU.DoesNotExist:
232
                raise MappingError(_('unknown ou: "%(name)s"'), name=name)
233
        else:
234
            raise MappingError(_('invalid ou description'))
235

  
236
    def get_role(self, mapping):
237
        role_desc = mapping.get('role')
238
        if not role_desc or not isinstance(role_desc, dict):
239
            raise MappingError(_('missing or invalid role description'))
240
        slug = role_desc.get('slug')
241
        name = role_desc.get('name')
242
        ou = self.get_ou(role_desc)
243

  
244
        kwargs = {}
245
        if ou:
246
            kwargs['ou'] = ou
247

  
248
        if slug:
249
            if not isinstance(slug, str):
250
                raise MappingError(_('invalid role slug: "%(slug)s"'), slug=slug)
251
            kwargs['slug'] = slug
252
        elif name:
253
            if not isinstance(name, str):
254
                raise MappingError(_('invalid role name: "%(name)s"'), name=name)
255
            kwargs['name'] = name
256
        else:
257
            raise MappingError(_('invalid role description'))
258

  
259
        try:
260
            return Role.objects.get(**kwargs)
261
        except Role.DoesNotExist:
262
            raise MappingError(_('unknown role: %(kwargs)s'), kwargs=kwargs)
263
        except MultipleObjectsReturned:
264
            raise MappingError(_('ambiugous role description: %(kwargs)s'), kwargs=kwargs)
265

  
266
    def evaluate_condition(self, user, saml_attributes, mapping):
267
        condition = mapping.get('condition')
268
        if condition is None:
269
            return True
270
        if not isinstance(condition, str):
271
            raise MappingError(_('invalid condition'))
272
        try:
273
            # use a proxy to simplify condition expressions as subscript is forbidden
274
            # you can write "email == 'a@b.com'" but also "'a@b.com' in email__list"
275
            value = evaluate_condition(condition, SamlConditionContextProxy(saml_attributes))
276
            logger.debug('auth_saml: condition %r is %s', condition, value, extra={'user': user})
277
            return value
278
        except Exception as e:
279
            raise MappingError(_('condition evaluation failed: %(message)s'), message=e)
280

  
281
    def action_add_role(self, user, idp, saml_attributes, mapping):
282
        role = self.get_role(mapping)
283
        if self.evaluate_condition(user, saml_attributes, mapping):
284
            if role not in user.roles.all():
285
                logger.info('auth_saml: adding role "%s"', role, extra={'user': user})
286
                user.roles.add(role)
287
        else:
288
            if role in user.roles.all():
289
                logger.info('auth_saml: removing role "%s"', role, extra={'user': user})
290
                user.roles.remove(role)
291

  
292
    def action_toggle_role(self, *args, **kwargs):
293
        return self.action_add_role(*args, **kwargs)
165
    def action_add_role(self, user, idp, saml_attributes):
166
        for action in idp['authenticator'].add_role_actions.all():
167
            if action.role not in user.roles.all():
168
                logger.info('auth_saml: adding role "%s"', action.role, extra={'user': user})
169
                user.roles.add(action.role)
294 170

  
295 171
    def auth_login(self, request, user):
296 172
        utils_misc.login(request, user, 'saml')
tests/test_auth_saml.py
28 28
from authentic2.custom_user.models import DeletedUser
29 29
from authentic2.models import Attribute
30 30
from authentic2_auth_saml.adapters import AuthenticAdapter, MappingError
31
from authentic2_auth_saml.models import RenameAttributeAction, SAMLAuthenticator, SetAttributeAction
31
from authentic2_auth_saml.models import (
32
    AddRoleAction,
33
    RenameAttributeAction,
34
    SAMLAuthenticator,
35
    SetAttributeAction,
36
)
32 37

  
33 38
from .utils import login
34 39

  
......
150 155
):
151 156
    caplog.set_level('WARNING')
152 157
    saml_attributes['http://nice/attribute/givenName'] = []
153
    adapter.apply_attribute_mapping(user, idp, saml_attributes, idp['A2_ATTRIBUTE_MAPPING'])
158
    adapter.provision_a2_attributes(user, idp, saml_attributes)
154 159
    assert re.match('.*no value.*first_name', caplog.records[-1].message)
155 160

  
156 161

  
......
160 165
    saml_attributes['http://nice/attribute/givenName'] = []
161 166
    SetAttributeAction.objects.filter(attribute='first_name').update(mandatory=True)
162 167
    with pytest.raises(MappingError, match='no value'):
163
        adapter.apply_attribute_mapping(user, idp, saml_attributes, idp['A2_ATTRIBUTE_MAPPING'])
168
        adapter.provision_a2_attributes(user, idp, saml_attributes)
164 169

  
165 170
    request = rf.get('/')
166 171
    request._messages = mock.Mock()
......
180 185
            enabled=True,
181 186
            metadata='meta1.xml',
182 187
            slug='idp1',
183
            a2_attribute_mapping=[
184
                {
185
                    'action': action_name,
186
                    'role': {
187
                        'name': simple_role.name,
188
                        'ou': {
189
                            'name': simple_role.ou.name,
190
                        },
191
                    },
192
                    'condition': "roles == 'A'",
193
                }
194
            ],
195 188
        )
189
        AddRoleAction.objects.create(authenticator=authenticator, role=simple_role)
196 190
        return authenticator.settings
197 191

  
198 192
    @pytest.fixture
......
203 197
            'name_id_format': lasso.SAML2_NAME_IDENTIFIER_FORMAT_PERSISTENT,
204 198
        }
205 199

  
206
    def test_lookup_user_condition_fails(self, adapter, simple_role, idp, saml_attributes):
207
        user = adapter.lookup_user(idp, saml_attributes)
208
        assert simple_role not in user.roles.all()
209

  
210
    def test_lookup_user_condition_success(self, adapter, simple_role, idp, saml_attributes):
211
        saml_attributes['roles'] = ['A']
212
        user = adapter.lookup_user(idp, saml_attributes)
213
        assert simple_role in user.roles.all()
214

  
215
    def test_lookup_user_mandatory_condition(self, adapter, simple_role, idp, saml_attributes):
216
        # if a toggle-role is mandatory, failure to evaluate condition block user creation
217
        idp['A2_ATTRIBUTE_MAPPING'][0]['mandatory'] = True
218
        assert adapter.lookup_user(idp, saml_attributes) is None
219

  
220
    def test_lookup_user_mandatory(self, adapter, simple_role, idp, saml_attributes):
221
        idp['A2_ATTRIBUTE_MAPPING'][0]['mandatory'] = True
222
        saml_attributes['roles'] = ['A']
223
        user = adapter.lookup_user(idp, saml_attributes)
224
        assert simple_role in user.roles.all()
225

  
226
    def test_lookup_user_use_list(self, adapter, simple_role, idp, saml_attributes):
227
        idp['A2_ATTRIBUTE_MAPPING'][0]['condition'] = "'A' in roles__list"
228
        saml_attributes['roles'] = ['A']
200
    def test_lookup_user_success(self, adapter, simple_role, idp, saml_attributes):
229 201
        user = adapter.lookup_user(idp, saml_attributes)
230 202
        assert simple_role in user.roles.all()
231 203

  
232
    def test_lookup_user_add_and_remove(self, adapter, simple_role, idp, saml_attributes, caplog):
233
        saml_attributes['roles'] = ['A']
234
        user = adapter.lookup_user(idp, saml_attributes)
235
        assert simple_role in user.roles.all()
236

  
237
        saml_attributes['roles'] = []
238
        adapter.provision(user, idp, saml_attributes)
239
        # condition failed, so role should be removed
240
        user.refresh_from_db()
241
        assert simple_role not in user.roles.all()
242

  
243 204

  
244 205
def test_login_with_conditionnal_authenticators(db, app, settings, caplog):
245 206
    authenticator = SAMLAuthenticator.objects.create(
246
-