Projet

Général

Profil

0004-auth_saml-add-roles-using-model-and-remove-useless-c.patch

Valentin Deniaud, 16 août 2022 14:12

Télécharger (11,3 ko)

Voir les différences:

Subject: [PATCH 4/7] auth_saml: add roles using model and remove useless code
 (#67025)

 src/authentic2_auth_saml/adapters.py | 133 +++------------------------
 tests/test_auth_saml.py              |  30 +++---
 2 files changed, 26 insertions(+), 137 deletions(-)
src/authentic2_auth_saml/adapters.py
19 19
from contextlib import contextmanager
20 20

  
21 21
from django.contrib import messages
22
from django.core.exceptions import MultipleObjectsReturned
23 22
from django.db.transaction import atomic
24 23
from django.utils.translation import ugettext as _
25 24
from mellon.adapters import DefaultAdapter, UserCreationError
26
from mellon.utils import get_setting
27 25

  
28
from authentic2.a2_rbac.models import OrganizationalUnit as OU
29
from authentic2.a2_rbac.models import Role
30 26
from authentic2.a2_rbac.utils import get_default_ou
31 27
from authentic2.backends import get_user_queryset
32 28
from authentic2.models import Lock
......
126 122

  
127 123
    @atomic
128 124
    def provision_a2_attributes(self, user, idp, saml_attributes):
129
        """Copy incoming SAML attributes to user attributes, A2_ATTRIBUTE_MAPPING must be a list of
130
        dictionaries like:
131

  
132
           {
133
               'attribute': 'email',
134
               'saml_attribute': 'email',
135
               # optional:
136
               'mandatory': False,
137
           }
138

  
139
         If an attribute is not mandatory any error is just logged, if the attribute is
140
         mandatory, login will fail.
141
        """
142 125
        saml_attributes = saml_attributes.copy()
143
        attribute_mapping = get_setting(idp, 'A2_ATTRIBUTE_MAPPING', [])
144 126

  
145
        if not isinstance(attribute_mapping, list):
146
            raise MappingError(_('invalid A2_ATTRIBUTE_MAPPING'))
147

  
148
        self.apply_attribute_mapping(user, idp, saml_attributes, attribute_mapping)
149

  
150
    def apply_attribute_mapping(self, user, idp, saml_attributes, attribute_mapping):
151 127
        self.rename_attributes(idp, saml_attributes)
152 128
        self.set_attributes(user, idp, saml_attributes)
153

  
154
        for mapping in attribute_mapping:
155
            if not isinstance(mapping, dict):
156
                raise MappingError(_('invalid mapping action "%(mapping)s"'), mapping=mapping)
157
            action = mapping.get('action', 'set-attribute')
158
            mandatory = mapping.get('mandatory', False) is True
159
            method = None
160
            if isinstance(action, str):
161
                try:
162
                    method = getattr(self, 'action_' + action.replace('-', '_'))
163
                except AttributeError:
164
                    pass
165
            try:
166
                if not method:
167
                    raise MappingError(_('invalid action'))
168
                logger.debug('auth_saml: applying provisionning mapping %s', mapping)
169
                method(user, idp, saml_attributes, mapping)
170
            except MappingError as e:
171
                if mandatory:
172
                    # it's mandatory, provisionning should fail completely
173
                    raise e
174
                logger.warning('auth_saml: mapping action failed: %s', e)
129
        self.action_add_role(user, idp, saml_attributes)
175 130

  
176 131
    def rename_attributes(self, idp, saml_attributes):
177 132
        for action in idp['authenticator'].rename_attribute_actions.all():
......
215 170
                return True
216 171
        return False
217 172

  
218
    def get_ou(self, role_desc):
219
        ou_desc = role_desc.get('ou')
220
        if ou_desc is None:
221
            return None
222
        if not isinstance(ou_desc, dict):
223
            raise MappingError(_('invalid ou description'))
224
        slug = ou_desc.get('slug')
225
        name = ou_desc.get('name')
226
        if slug:
227
            if not isinstance(slug, str):
228
                raise MappingError(_('invalid ou.slug in ou description'))
229
            try:
230
                return OU.objects.get(slug=slug)
231
            except OU.DoesNotExist:
232
                raise MappingError(_('unknown ou: "%(slug)s"'), slug=slug)
233
        elif name:
234
            if not isinstance(name, str):
235
                raise MappingError(_('invalid ou.slug in ou description'))
236
            try:
237
                return OU.objects.get(name=name)
238
            except OU.DoesNotExist:
239
                raise MappingError(_('unknown ou: "%(name)s"'), name=name)
240
        else:
241
            raise MappingError(_('invalid ou description'))
242

  
243
    def get_role(self, mapping):
244
        role_desc = mapping.get('role')
245
        if not role_desc or not isinstance(role_desc, dict):
246
            raise MappingError(_('missing or invalid role description'))
247
        slug = role_desc.get('slug')
248
        name = role_desc.get('name')
249
        ou = self.get_ou(role_desc)
250

  
251
        kwargs = {}
252
        if ou:
253
            kwargs['ou'] = ou
254

  
255
        if slug:
256
            if not isinstance(slug, str):
257
                raise MappingError(_('invalid role slug: "%(slug)s"'), slug=slug)
258
            kwargs['slug'] = slug
259
        elif name:
260
            if not isinstance(name, str):
261
                raise MappingError(_('invalid role name: "%(name)s"'), name=name)
262
            kwargs['name'] = name
263
        else:
264
            raise MappingError(_('invalid role description'))
265

  
266
        try:
267
            return Role.objects.get(**kwargs)
268
        except Role.DoesNotExist:
269
            raise MappingError(_('unknown role: %(kwargs)s'), kwargs=kwargs)
270
        except MultipleObjectsReturned:
271
            raise MappingError(_('ambiugous role description: %(kwargs)s'), kwargs=kwargs)
272

  
273
    def evaluate_condition(self, user, saml_attributes, mapping):
274
        condition = mapping.get('condition')
275
        if condition is None:
173
    def evaluate_condition(self, user, saml_attributes, condition):
174
        if not condition:
276 175
            return True
277
        if not isinstance(condition, str):
278
            raise MappingError(_('invalid condition'))
279 176
        try:
280 177
            # use a proxy to simplify condition expressions as subscript is forbidden
281 178
            # you can write "email == 'a@b.com'" but also "'a@b.com' in email__list"
......
285 182
        except Exception as e:
286 183
            raise MappingError(_('condition evaluation failed: %(message)s'), message=e)
287 184

  
288
    def action_add_role(self, user, idp, saml_attributes, mapping):
289
        role = self.get_role(mapping)
290
        if self.evaluate_condition(user, saml_attributes, mapping):
291
            if role not in user.roles.all():
292
                logger.info('auth_saml: adding role "%s"', role, extra={'user': user})
293
                user.roles.add(role)
294
        else:
295
            if role in user.roles.all():
296
                logger.info('auth_saml: removing role "%s"', role, extra={'user': user})
297
                user.roles.remove(role)
298

  
299
    def action_toggle_role(self, *args, **kwargs):
300
        return self.action_add_role(*args, **kwargs)
185
    def action_add_role(self, user, idp, saml_attributes):
186
        for action in idp['authenticator'].add_role_actions.all():
187
            with wrap_mapping_errors(action):
188
                if self.evaluate_condition(user, saml_attributes, action.condition):
189
                    if action.role not in user.roles.all():
190
                        logger.info('auth_saml: adding role "%s"', action.role, extra={'user': user})
191
                        user.roles.add(action.role)
192
                else:
193
                    if action.role in user.roles.all():
194
                        logger.info('auth_saml: removing role "%s"', action.role, extra={'user': user})
195
                        user.roles.remove(action.role)
301 196

  
302 197
    def auth_login(self, request, user):
303 198
        utils_misc.login(request, user, 'saml')
tests/test_auth_saml.py
28 28
from authentic2.custom_user.models import DeletedUser
29 29
from authentic2.models import Attribute
30 30
from authentic2_auth_saml.adapters import AuthenticAdapter, MappingError
31
from authentic2_auth_saml.models import RenameAttributeAction, SAMLAuthenticator, SetAttributeAction
31
from authentic2_auth_saml.models import (
32
    AddRoleAction,
33
    RenameAttributeAction,
34
    SAMLAuthenticator,
35
    SetAttributeAction,
36
)
32 37

  
33 38
from .utils import login
34 39

  
......
150 155
):
151 156
    caplog.set_level('WARNING')
152 157
    saml_attributes['http://nice/attribute/givenName'] = []
153
    adapter.apply_attribute_mapping(user, idp, saml_attributes, idp['A2_ATTRIBUTE_MAPPING'])
158
    adapter.provision_a2_attributes(user, idp, saml_attributes)
154 159
    assert re.match('.*no value.*first_name', caplog.records[-1].message)
155 160

  
156 161

  
......
160 165
    saml_attributes['http://nice/attribute/givenName'] = []
161 166
    SetAttributeAction.objects.filter(attribute='first_name').update(mandatory=True)
162 167
    with pytest.raises(MappingError, match='no value'):
163
        adapter.apply_attribute_mapping(user, idp, saml_attributes, idp['A2_ATTRIBUTE_MAPPING'])
168
        adapter.provision_a2_attributes(user, idp, saml_attributes)
164 169

  
165 170
    request = rf.get('/')
166 171
    request._messages = mock.Mock()
......
180 185
            enabled=True,
181 186
            metadata='meta1.xml',
182 187
            slug='idp1',
183
            a2_attribute_mapping=[
184
                {
185
                    'action': action_name,
186
                    'role': {
187
                        'name': simple_role.name,
188
                        'ou': {
189
                            'name': simple_role.ou.name,
190
                        },
191
                    },
192
                    'condition': "roles == 'A'",
193
                }
194
            ],
195 188
        )
189
        AddRoleAction.objects.create(authenticator=authenticator, role=simple_role, condition="roles == 'A'")
196 190
        return authenticator.settings
197 191

  
198 192
    @pytest.fixture
......
214 208

  
215 209
    def test_lookup_user_mandatory_condition(self, adapter, simple_role, idp, saml_attributes):
216 210
        # if a toggle-role is mandatory, failure to evaluate condition block user creation
217
        idp['A2_ATTRIBUTE_MAPPING'][0]['mandatory'] = True
211
        AddRoleAction.objects.update(mandatory=True)
218 212
        assert adapter.lookup_user(idp, saml_attributes) is None
219 213

  
220 214
    def test_lookup_user_mandatory(self, adapter, simple_role, idp, saml_attributes):
221
        idp['A2_ATTRIBUTE_MAPPING'][0]['mandatory'] = True
215
        AddRoleAction.objects.update(mandatory=True)
222 216
        saml_attributes['roles'] = ['A']
223 217
        user = adapter.lookup_user(idp, saml_attributes)
224 218
        assert simple_role in user.roles.all()
225 219

  
226 220
    def test_lookup_user_use_list(self, adapter, simple_role, idp, saml_attributes):
227
        idp['A2_ATTRIBUTE_MAPPING'][0]['condition'] = "'A' in roles__list"
221
        AddRoleAction.objects.update(condition="'A' in roles__list")
228 222
        saml_attributes['roles'] = ['A']
229 223
        user = adapter.lookup_user(idp, saml_attributes)
230 224
        assert simple_role in user.roles.all()
231
-