Projet

Général

Profil

0003-add-user-lookup-by-attributes-33739.patch

Benjamin Dauvergne, 11 juin 2019 16:36

Télécharger (18,5 ko)

Voir les différences:

Subject: [PATCH 3/4] add user lookup by attributes (#33739)

 README                        |  26 +++++
 mellon/adapters.py            | 102 ++++++++++++++++---
 mellon/app_settings.py        |   1 +
 tests/test_default_adapter.py | 181 +++++++++++++++++++++++++++-------
 4 files changed, 263 insertions(+), 47 deletions(-)
README
261 261
Should be post or artifact. Default is post. You can refer to the SAML 2.0
262 262
specification to learn the difference.
263 263

  
264
MELLON_LOOKUP_BY_ATTRIBUTES
265
---------------------------
266

  
267
Allow looking for user with some SAML attributes if the received NameID is
268
still unknown. It must be a list of dictionnaries with two mandatory keys
269
`user_field` and `saml_attribute`. The optionnal key `ignore-case` should be a
270
boolean indicating if the match is case-insensitive (default is to respect the
271
case).
272

  
273
Each dictionnary is a rule for linking, applying all the rules should only
274
return one user, the boolean operator OR is applied between the rules.
275

  
276
So for example if you received a SAML attribute named `email` and you want to
277
link user with the same email you would configured it like that:
278

  
279
   MELLON_LOOKUP_BY_ATTRIBUTES = [
280
       {
281
           'saml_attribute': 'email',
282
           'user_field': 'email',
283
       }
284
   ]
285

  
286
The targeted user(s) field(s) should be as much as possible unique
287
individually, if not django-mellon will refuse to link multiple users matching
288
the rules.
289

  
264 290
Tests
265 291
=====
266 292

  
mellon/adapters.py
6 6
import requests
7 7
import requests.exceptions
8 8

  
9
from django.core.exceptions import PermissionDenied
9
from django.core.exceptions import PermissionDenied, FieldDoesNotExist
10 10
from django.contrib import auth
11 11
from django.contrib.auth.models import Group
12 12
from django.utils import six
......
21 21
    pass
22 22

  
23 23

  
24
def display_truncated_list(l, max_length=10):
25
    s = '[' + ', '.join(map(six.text_type, l))
26
    if len(l) > max_length:
27
        s += '..truncated more than %d items (%d)]' % (max_length, len(l))
28
    else:
29
        s += ']'
30
    return s
31

  
32

  
24 33
class DefaultAdapter(object):
25 34
    def __init__(self, *args, **kwargs):
26 35
        self.logger = logging.getLogger(__name__)
......
128 137
            name_id = saml_attributes['name_id_content']
129 138
        issuer = saml_attributes['issuer']
130 139
        try:
131
            return User.objects.get(saml_identifiers__name_id=name_id,
140
            user = User.objects.get(saml_identifiers__name_id=name_id,
132 141
                                    saml_identifiers__issuer=issuer)
142
            self.logger.info('looked up user %s with name_id %s from issuer %s',
143
                             user, name_id, issuer)
144
            return user
133 145
        except User.DoesNotExist:
134 146
            pass
135 147

  
136
        if not utils.get_setting(idp, 'PROVISION'):
137
            self.logger.warning('provisionning disabled, login refused')
138
            return None
148
        user = None
149
        lookup_by_attributes = utils.get_setting(idp, 'LOOKUP_BY_ATTRIBUTES')
150
        if lookup_by_attributes:
151
            user = self._lookup_by_attributes(idp, saml_attributes, lookup_by_attributes)
152

  
153
        created = False
154
        if not user:
155
            if not utils.get_setting(idp, 'PROVISION'):
156
                self.logger.debug('provisionning disabled, login refused')
157
                return None
158
            created = True
159
            user = self.create_user(User)
139 160

  
140
        user = self.create_user(User)
141 161
        nameid_user = self._link_user(idp, saml_attributes, issuer, name_id, user)
142 162
        if user != nameid_user:
143 163
            self.logger.info('looked up user %s with name_id %s from issuer %s',
144 164
                             nameid_user, name_id, issuer)
145
            user.delete()
165
            if created:
166
                user.delete()
146 167
            return nameid_user
147 168

  
148
        try:
149
            self.finish_create_user(idp, saml_attributes, nameid_user)
150
        except UserCreationError:
151
            nameid_user.delete()
152
            return None
153
        self.logger.info('created new user %s with name_id %s from issuer %s',
154
                         nameid_user, name_id, issuer)
169
        if created:
170
            try:
171
                self.finish_create_user(idp, saml_attributes, nameid_user)
172
            except UserCreationError:
173
                user.delete()
174
                return None
175
            self.logger.info('created new user %s with name_id %s from issuer %s',
176
                             nameid_user, name_id, issuer)
155 177
        return nameid_user
156 178

  
179
    def _lookup_by_attributes(self, idp, saml_attributes, lookup_by_attributes):
180
        if not isinstance(lookup_by_attributes, list):
181
            self.logger.error('invalid LOOKUP_BY_ATTRIBUTES configuration %r: it must be a list', lookup_by_attributes)
182
            return None
183

  
184
        users = set()
185
        for line in lookup_by_attributes:
186
            if not isinstance(line, dict):
187
                self.logger.error('invalid LOOKUP_BY_ATTRIBUTES configuration %r: it must be a list of dicts', line)
188
                continue
189
            user_field = line.get('user_field')
190
            if not hasattr(user_field, 'isalpha'):
191
                self.logger.error('invalid LOOKUP_BY_ATTRIBUTES configuration %r: user_field is missing', line)
192
                continue
193
            try:
194
                User._meta.get_field(user_field)
195
            except FieldDoesNotExist:
196
                self.logger.error('invalid LOOKUP_BY_ATTRIBUTES configuration %r, user field %s does not exist',
197
                                  line, user_field)
198
                continue
199
            saml_attribute = line.get('saml_attribute')
200
            if not hasattr(saml_attribute, 'isalpha'):
201
                self.logger.error('invalid LOOKUP_BY_ATTRIBUTES configuration %r: saml_attribute is missing', line)
202
                continue
203
            values = saml_attributes.get(saml_attribute)
204
            if not values:
205
                self.logger.error('looking for user by saml attribute %r and user field %r, skipping because empty',
206
                                  saml_attribute, user_field)
207
                continue
208
            ignore_case = line.get('ignore-case', False)
209
            for value in values:
210
                key = user_field
211
                if ignore_case:
212
                    key += '__iexact'
213
                users_found = User.objects.filter(saml_identifiers__isnull=True, **{key: value})
214
                if not users_found:
215
                    self.logger.debug('looking for users by attribute %r and user field %r with value %r: not found',
216
                                      saml_attribute, user_field, value)
217
                    continue
218
                self.logger.info(u'looking for user by attribute %r and user field %r with value %r: found %s',
219
                                 saml_attribute, user_field, value, display_truncated_list(users_found))
220
                users.update(users_found)
221
        if len(users) == 1:
222
            user = list(users)[0]
223
            self.logger.info(u'looking for user by attributes %r: found user %s',
224
                             lookup_by_attributes, user)
225
            return user
226
        elif len(users) > 1:
227
            self.logger.warning(u'looking for user by attributes %r: too many users found(%d), failing',
228
                                lookup_by_attributes, len(users))
229
        return None
230

  
157 231
    def _link_user(self, idp, saml_attributes, issuer, name_id, user):
158 232
        saml_id, created = models.UserSAMLIdentifier.objects.get_or_create(
159 233
            name_id=name_id, issuer=issuer, defaults={'user': user})
mellon/app_settings.py
40 40
        'ARTIFACT_RESOLVE_TIMEOUT': 10.0,
41 41
        'LOGIN_HINTS': [],
42 42
        'SIGNATURE_METHOD': 'RSA-SHA256',
43
        'LOOKUP_BY_ATTRIBUTES': [],
43 44
    }
44 45

  
45 46
    @property
tests/test_default_adapter.py
11 11

  
12 12
pytestmark = pytest.mark.django_db
13 13

  
14
idp = {
15
    'METADATA': open('tests/metadata.xml').read(),
16
}
17
saml_attributes = {
18
    'name_id_format': lasso.SAML2_NAME_IDENTIFIER_FORMAT_PERSISTENT,
19
    'name_id_content': 'x' * 32,
20
    'issuer': 'http://idp5/metadata',
21
    'username': ['foobar'],
22
    'email': ['test@example.net'],
23
    'first_name': ['Foo'],
24
    'last_name': ['Bar'],
25
    'is_superuser': ['true'],
26
    'group': ['GroupA', 'GroupB', 'GroupC'],
27
}
28

  
29

  
30
def test_format_username(settings):
14
User = auth.get_user_model()
15

  
16

  
17
@pytest.fixture
18
def idp():
19
    return {
20
        'METADATA': open('tests/metadata.xml').read(),
21
    }
22

  
23

  
24
@pytest.fixture
25
def saml_attributes():
26
    return {
27
        'name_id_format': lasso.SAML2_NAME_IDENTIFIER_FORMAT_PERSISTENT,
28
        'name_id_content': 'x' * 32,
29
        'issuer': 'http://idp5/metadata',
30
        'username': ['foobar'],
31
        'email': ['test@example.net'],
32
        'first_name': ['Foo'],
33
        'last_name': ['Bar'],
34
        'is_superuser': ['true'],
35
        'group': ['GroupA', 'GroupB', 'GroupC'],
36
    }
37

  
38

  
39
@pytest.fixture
40
def john(db):
41
    return User.objects.create(username='john.doe', email='john.doe@example.com')
42

  
43

  
44
@pytest.fixture
45
def jane(db):
46
    return User.objects.create(username='jane.doe', email='john.doe@example.com')
47

  
48

  
49
def test_format_username(settings, idp, saml_attributes):
31 50
    adapter = DefaultAdapter()
32 51
    assert adapter.format_username(idp, {}) is None
33 52
    assert adapter.format_username(idp, saml_attributes) == ('x' * 32 + '@saml')[:30]
......
37 56
    assert adapter.format_username(idp, saml_attributes) == 'foobar'
38 57

  
39 58

  
40
def test_lookup_user(settings):
41
    User = auth.get_user_model()
59
def test_lookup_user(settings, idp, saml_attributes):
42 60
    adapter = DefaultAdapter()
43 61
    user = adapter.lookup_user(idp, saml_attributes)
44 62
    assert user is not None
......
55 73
    assert User.objects.count() == 0
56 74

  
57 75

  
58
def test_lookup_user_transaction(transactional_db, concurrency):
76
def test_lookup_user_transaction(transactional_db, concurrency, idp, saml_attributes):
59 77
    adapter = DefaultAdapter()
60 78
    p = ThreadPool(concurrency)
61 79

  
......
81 99
    assert len(set(user.pk for user in users)) == 1
82 100

  
83 101

  
84
def test_provision_user_attributes(settings, django_user_model, caplog):
102
def test_provision_user_attributes(settings, django_user_model, idp, saml_attributes, caplog):
85 103
    settings.MELLON_IDENTITY_PROVIDERS = [idp]
86 104
    settings.MELLON_ATTRIBUTE_MAPPING = {
87 105
        'email': u'{attributes[email][0]}',
......
102 120
    assert 'set field email' in caplog.text
103 121

  
104 122

  
105
def test_provision_user_groups(settings, django_user_model, caplog):
123
def test_provision_user_groups(settings, django_user_model, idp, saml_attributes, caplog):
106 124
    settings.MELLON_IDENTITY_PROVIDERS = [idp]
107 125
    settings.MELLON_GROUP_ATTRIBUTE = 'group'
108 126
    user = SAMLBackend().authenticate(saml_attributes=saml_attributes)
......
118 136
    user = SAMLBackend().authenticate(saml_attributes=saml_attributes2)
119 137
    assert user.groups.count() == 2
120 138
    assert set(user.groups.values_list('name', flat=True)) == set(saml_attributes2['group'])
121
    assert len(caplog.records) == 5
139
    assert len(caplog.records) == 6
122 140
    assert 'removing group GroupA' in caplog.records[-1].message
123 141

  
124 142

  
125
def test_provision_is_superuser(settings, django_user_model, caplog):
143
def test_provision_is_superuser(settings, django_user_model, idp, saml_attributes, caplog):
126 144
    settings.MELLON_IDENTITY_PROVIDERS = [idp]
127 145
    settings.MELLON_SUPERUSER_MAPPING = {
128 146
        'is_superuser': 'true',
......
137 155
    assert not 'flag is_staff and is_superuser removed' in caplog.text
138 156

  
139 157

  
140
def test_provision_absent_attribute(settings, django_user_model, caplog):
158
def test_provision_absent_attribute(settings, django_user_model, idp, saml_attributes, caplog):
141 159
    settings.MELLON_IDENTITY_PROVIDERS = [idp]
142 160
    settings.MELLON_ATTRIBUTE_MAPPING = {
143 161
        'email': '{attributes[email][0]}',
......
155 173
    assert 'set field last_name' in caplog.text
156 174

  
157 175

  
158
def test_provision_long_attribute(settings, django_user_model, caplog):
176
def test_provision_long_attribute(settings, django_user_model, idp, saml_attributes, caplog):
159 177
    settings.MELLON_IDENTITY_PROVIDERS = [idp]
160 178
    settings.MELLON_ATTRIBUTE_MAPPING = {
161 179
        'email': '{attributes[email][0]}',
......
174 192
    assert 'set field email' in caplog.text
175 193

  
176 194

  
177
def test_lookup_user_transient_with_email(private_settings):
195
def test_lookup_user_transient_with_email(private_settings, idp, saml_attributes):
178 196
    private_settings.MELLON_TRANSIENT_FEDERATION_ATTRIBUTE = 'email'
179
    User = auth.get_user_model()
180 197
    adapter = DefaultAdapter()
181
    saml_attributes2 = saml_attributes.copy()
182
    saml_attributes2['name_id_format'] = lasso.SAML2_NAME_IDENTIFIER_FORMAT_TRANSIENT
198
    saml_attributes['name_id_format'] = lasso.SAML2_NAME_IDENTIFIER_FORMAT_TRANSIENT
183 199
    assert User.objects.count() == 0
184
    user = adapter.lookup_user(idp, saml_attributes2)
200
    user = adapter.lookup_user(idp, saml_attributes)
185 201
    assert user is not None
186 202
    assert user.saml_identifiers.count() == 1
187
    assert user.saml_identifiers.first().name_id == saml_attributes2['email'][0]
203
    assert user.saml_identifiers.first().name_id == saml_attributes['email'][0]
188 204

  
189
    user2 = adapter.lookup_user(idp, saml_attributes2)
205
    user2 = adapter.lookup_user(idp, saml_attributes)
190 206
    assert user.id == user2.id
191 207

  
192 208
    User.objects.all().delete()
......
196 212
    user = adapter.lookup_user(idp, saml_attributes)
197 213
    assert user is None
198 214
    assert User.objects.count() == 0
215

  
216

  
217
def test_lookup_user_by_attributes_bad_setting1(settings, idp, saml_attributes, caplog):
218
    settings.MELLON_PROVISION = False
219

  
220
    adapter = DefaultAdapter()
221
    settings.MELLON_LOOKUP_BY_ATTRIBUTES = 'coin'
222
    assert adapter.lookup_user(idp, saml_attributes) is None
223
    assert caplog.records[-1].message.endswith('it must be a list')
224

  
225

  
226
def test_lookup_user_by_attributes_bad_setting2(settings, idp, saml_attributes, caplog):
227
    settings.MELLON_PROVISION = False
228

  
229
    adapter = DefaultAdapter()
230
    settings.MELLON_LOOKUP_BY_ATTRIBUTES = ['coin']
231
    assert adapter.lookup_user(idp, saml_attributes) is None
232
    assert caplog.records[-1].message.endswith('it must be a list of dicts')
233

  
234

  
235
def test_lookup_user_by_attributes_bad_setting3(settings, idp, saml_attributes, caplog):
236
    settings.MELLON_PROVISION = False
237

  
238
    adapter = DefaultAdapter()
239
    settings.MELLON_LOOKUP_BY_ATTRIBUTES = [{}]
240
    assert adapter.lookup_user(idp, saml_attributes) is None
241
    assert caplog.records[-1].message.endswith('user_field is missing')
242

  
243

  
244
def test_lookup_user_by_attributes_bad_setting4(settings, idp, saml_attributes, caplog):
245
    settings.MELLON_PROVISION = False
246

  
247
    adapter = DefaultAdapter()
248
    settings.MELLON_LOOKUP_BY_ATTRIBUTES = [{'user_field': 'username'}]
249
    assert adapter.lookup_user(idp, saml_attributes) is None
250
    assert caplog.records[-1].message.endswith('saml_attribute is missing')
251

  
252

  
253
def test_lookup_user_by_attributes_not_found(settings, idp, saml_attributes, caplog):
254
    settings.MELLON_PROVISION = False
255

  
256
    adapter = DefaultAdapter()
257
    caplog.set_level('DEBUG')
258
    settings.MELLON_LOOKUP_BY_ATTRIBUTES = [{'user_field': 'username', 'saml_attribute': 'saml_at1'}]
259
    saml_attributes['saml_at1'] = ['john.doe']
260
    assert adapter.lookup_user(idp, saml_attributes) is None
261
    assert caplog.records[-2].message.endswith(': not found')
262

  
263

  
264
def test_lookup_user_by_attributes_too_many1(settings, idp, saml_attributes, john, jane, caplog):
265
    settings.MELLON_PROVISION = False
266

  
267
    adapter = DefaultAdapter()
268
    settings.MELLON_LOOKUP_BY_ATTRIBUTES = [{'user_field': 'email', 'saml_attribute': 'saml_at1'}]
269
    saml_attributes['saml_at1'] = ['john.doe@example.com']
270
    assert adapter.lookup_user(idp, saml_attributes) is None
271
    assert 'too many users found(2)' in caplog.records[-1].message
272

  
273

  
274
def test_lookup_user_by_attributes_too_manyi2(settings, idp, saml_attributes, john, jane, caplog):
275
    settings.MELLON_PROVISION = False
276

  
277
    adapter = DefaultAdapter()
278
    saml_attributes['saml_at1'] = ['john.doe']
279
    saml_attributes['saml_at2'] = ['jane.doe']
280

  
281
    settings.MELLON_LOOKUP_BY_ATTRIBUTES = [
282
        {'user_field': 'username', 'saml_attribute': 'saml_at1'},
283
        {'user_field': 'username', 'saml_attribute': 'saml_at2'},
284
    ]
285
    assert adapter.lookup_user(idp, saml_attributes) is None
286
    assert 'too many users found(2)' in caplog.records[-1].message
287

  
288

  
289
def test_lookup_user_by_attributes_found(settings, idp, saml_attributes, john, jane, caplog):
290
    settings.MELLON_PROVISION = False
291

  
292
    adapter = DefaultAdapter()
293
    saml_attributes['saml_at1'] = ['john.doe']
294
    settings.MELLON_LOOKUP_BY_ATTRIBUTES = [
295
        {'user_field': 'username', 'saml_attribute': 'saml_at1'},
296
    ]
297
    assert adapter.lookup_user(idp, saml_attributes) == john
298

  
299

  
300
def test_lookup_user_by_attributes_ignore_case(settings, idp, saml_attributes, john, jane, caplog):
301
    settings.MELLON_PROVISION = False
302

  
303
    adapter = DefaultAdapter()
304
    saml_attributes['saml_at1'] = ['Jane.Doe']
305
    settings.MELLON_LOOKUP_BY_ATTRIBUTES = [
306
        {'user_field': 'username', 'saml_attribute': 'saml_at1'},
307
    ]
308
    assert adapter.lookup_user(idp, saml_attributes) is None
309

  
310
    settings.MELLON_LOOKUP_BY_ATTRIBUTES = [
311
        {'user_field': 'username', 'saml_attribute': 'saml_at1', 'ignore-case': True},
312
    ]
313
    assert adapter.lookup_user(idp, saml_attributes) == jane
199
-