Projet

Général

Profil

0003-add-user-lookup-by-attributes-33739.patch

Benjamin Dauvergne, 06 juin 2019 15:20

Télécharger (18,4 ko)

Voir les différences:

Subject: [PATCH 3/3] add user lookup by attributes (#33739)

 README                        |  24 +++++
 mellon/adapters.py            | 102 ++++++++++++++++---
 mellon/app_settings.py        |   1 +
 tests/test_default_adapter.py | 181 +++++++++++++++++++++++++++-------
 4 files changed, 261 insertions(+), 47 deletions(-)
README
261 261
Should be post or artifact. Default is post. You can refer to the SAML 2.0
262 262
specification to learn the difference.
263 263

  
264
MELLON_LOOKUP_BY_ATTRIBUTES
265
---------------------------
266

  
267
Allow looking for user with some SAML attributes if the received NameID is
268
still unknown. It must be a list of dictionnaries with two mandatory keys
269
`user_field` and `saml_attribute`.
270

  
271
Each dictionnary is a rule for linking, applying all the rules should only
272
return one user, the boolean operator OR is applied between the rules.
273

  
274
So for example if you received a SAML attribute named `email` and you want to
275
link user with the same email you would configured it like that:
276

  
277
   MELLON_LOOKUP_BY_ATTRIBUTES = [
278
       {
279
           'saml_attribute': 'email',
280
           'user_field': 'email',
281
       }
282
   ]
283

  
284
The targeted user(s) field(s) should be as much as possible unique
285
individually, if not django-mellon will refuse to link multiple users matching
286
the rules.
287

  
264 288
Tests
265 289
=====
266 290

  
mellon/adapters.py
6 6
import requests
7 7
import requests.exceptions
8 8

  
9
from django.core.exceptions import PermissionDenied
9
from django.core.exceptions import PermissionDenied, FieldDoesNotExist
10 10
from django.contrib import auth
11 11
from django.contrib.auth.models import Group
12 12
from django.utils import six
......
21 21
    pass
22 22

  
23 23

  
24
def display_truncated_list(l, max_length=10):
25
    s = '[' + ', '.join(map(six.text_type, l))
26
    if len(l) > max_length:
27
        s += '..truncated more than %d items (%d)]' % (max_length, len(l))
28
    else:
29
        s += ']'
30
    return s
31

  
32

  
24 33
class DefaultAdapter(object):
25 34
    def __init__(self, *args, **kwargs):
26 35
        self.logger = logging.getLogger(__name__)
......
128 137
            name_id = saml_attributes['name_id_content']
129 138
        issuer = saml_attributes['issuer']
130 139
        try:
131
            return User.objects.get(saml_identifiers__name_id=name_id,
140
            user = User.objects.get(saml_identifiers__name_id=name_id,
132 141
                                    saml_identifiers__issuer=issuer)
142
            self.logger.info('looked up user %s with name_id %s from issuer %s',
143
                             user, name_id, issuer)
144
            return user
133 145
        except User.DoesNotExist:
134 146
            pass
135 147

  
136
        if not utils.get_setting(idp, 'PROVISION'):
137
            self.logger.warning('provisionning disabled, login refused')
138
            return None
148
        user = None
149
        lookup_by_attributes = utils.get_setting(idp, 'LOOKUP_BY_ATTRIBUTES')
150
        if lookup_by_attributes:
151
            user = self._lookup_by_attributes(idp, saml_attributes, lookup_by_attributes)
152

  
153
        provisionning = False
154
        if not user:
155
            if not utils.get_setting(idp, 'PROVISION'):
156
                self.logger.debug('provisionning disabled, login refused')
157
                return None
158
            provisionning = True
159
            user = self.create_user(User)
139 160

  
140
        user = self.create_user(User)
141 161
        real_user = self._link_user(idp, saml_attributes, issuer, name_id, user)
142 162
        if user != real_user:
143 163
            self.logger.info('looked up user %s with name_id %s from issuer %s',
144 164
                             user, name_id, issuer)
145
            user.delete()
146
        else:
147
            try:
148
                self.finish_create_user(idp, saml_attributes, user)
149
            except UserCreationError:
165
            if provisionning:
150 166
                user.delete()
151
                return None
152
            self.logger.info('created new user %s with name_id %s from issuer %s',
153
                             user, name_id, issuer)
167
        else:
168
            if provisionning:
169
                try:
170
                    self.finish_create_user(idp, saml_attributes, user)
171
                except UserCreationError:
172
                    user.delete()
173
                    return None
174
                self.logger.info('created new user %s with name_id %s from issuer %s',
175
                                 user, name_id, issuer)
154 176
        return real_user
155 177

  
178
    def _lookup_by_attributes(self, idp, saml_attributes, lookup_by_attributes):
179
        if not isinstance(lookup_by_attributes, list):
180
            self.logger.error('invalid LOOKUP_BY_ATTRIBUTES configuration %r: it must be a list', lookup_by_attributes)
181
            return None
182

  
183
        users = set()
184
        for line in lookup_by_attributes:
185
            if not isinstance(line, dict):
186
                self.logger.error('invalid LOOKUP_BY_ATTRIBUTES configuration %r: it must be a list of dicts', line)
187
                continue
188
            user_field = line.get('user_field')
189
            if not hasattr(user_field, 'isalpha'):
190
                self.logger.error('invalid LOOKUP_BY_ATTRIBUTES configuration %r: user_field is missing', line)
191
                continue
192
            try:
193
                User._meta.get_field(user_field)
194
            except FieldDoesNotExist:
195
                self.logger.error('invalid LOOKUP_BY_ATTRIBUTES configuration %r, user field %s does not exist',
196
                                  line, user_field)
197
                continue
198
            saml_attribute = line.get('saml_attribute')
199
            if not hasattr(saml_attribute, 'isalpha'):
200
                self.logger.error('invalid LOOKUP_BY_ATTRIBUTES configuration %r: saml_attribute is missing', line)
201
                continue
202
            values = saml_attributes.get(saml_attribute)
203
            if not values:
204
                self.logger.error('looking for user by saml attribute %r and user field %r, skipping because empty',
205
                                  saml_attribute, user_field)
206
                continue
207
            ignore_case = line.get('ignore-case', False)
208
            for value in values:
209
                key = user_field
210
                if ignore_case:
211
                    key += '__iexact'
212
                users_found = User.objects.filter(**{key: value})
213
                if not users_found:
214
                    self.logger.debug('looking for users by attribute %r and user field %r with value %r: not found',
215
                                      saml_attribute, user_field, value)
216
                    continue
217
                self.logger.info(u'looking for user by attribute %r and user field %r with value %r: found %s',
218
                                 saml_attribute, user_field, value, display_truncated_list(users_found))
219
                users.update(users_found)
220
        if len(users) == 1:
221
            user = list(users)[0]
222
            self.logger.info(u'looking for user by attributes %r: found user %s',
223
                             lookup_by_attributes, user)
224
            return user
225
        elif len(users) > 1:
226
            self.logger.warning(u'looking for user by attributes %r: too many users found(%d), failing',
227
                                lookup_by_attributes, len(users))
228
        return None
229

  
156 230
    def _link_user(self, idp, saml_attributes, issuer, name_id, user):
157 231
        saml_id, created = models.UserSAMLIdentifier.objects.get_or_create(
158 232
            name_id=name_id, issuer=issuer, defaults={'user': user})
mellon/app_settings.py
40 40
        'ARTIFACT_RESOLVE_TIMEOUT': 10.0,
41 41
        'LOGIN_HINTS': [],
42 42
        'SIGNATURE_METHOD': 'RSA-SHA256',
43
        'LOOKUP_BY_ATTRIBUTES': [],
43 44
    }
44 45

  
45 46
    @property
tests/test_default_adapter.py
11 11

  
12 12
pytestmark = pytest.mark.django_db
13 13

  
14
idp = {
15
    'METADATA': open('tests/metadata.xml').read(),
16
}
17
saml_attributes = {
18
    'name_id_format': lasso.SAML2_NAME_IDENTIFIER_FORMAT_PERSISTENT,
19
    'name_id_content': 'x' * 32,
20
    'issuer': 'http://idp5/metadata',
21
    'username': ['foobar'],
22
    'email': ['test@example.net'],
23
    'first_name': ['Foo'],
24
    'last_name': ['Bar'],
25
    'is_superuser': ['true'],
26
    'group': ['GroupA', 'GroupB', 'GroupC'],
27
}
28

  
29

  
30
def test_format_username(settings):
14
User = auth.get_user_model()
15

  
16

  
17
@pytest.fixture
18
def idp():
19
    return {
20
        'METADATA': open('tests/metadata.xml').read(),
21
    }
22

  
23

  
24
@pytest.fixture
25
def saml_attributes():
26
    return {
27
        'name_id_format': lasso.SAML2_NAME_IDENTIFIER_FORMAT_PERSISTENT,
28
        'name_id_content': 'x' * 32,
29
        'issuer': 'http://idp5/metadata',
30
        'username': ['foobar'],
31
        'email': ['test@example.net'],
32
        'first_name': ['Foo'],
33
        'last_name': ['Bar'],
34
        'is_superuser': ['true'],
35
        'group': ['GroupA', 'GroupB', 'GroupC'],
36
    }
37

  
38

  
39
@pytest.fixture
40
def john(db):
41
    return User.objects.create(username='john.doe', email='john.doe@example.com')
42

  
43

  
44
@pytest.fixture
45
def jane(db):
46
    return User.objects.create(username='jane.doe', email='john.doe@example.com')
47

  
48

  
49
def test_format_username(settings, idp, saml_attributes):
31 50
    adapter = DefaultAdapter()
32 51
    assert adapter.format_username(idp, {}) is None
33 52
    assert adapter.format_username(idp, saml_attributes) == ('x' * 32 + '@saml')[:30]
......
37 56
    assert adapter.format_username(idp, saml_attributes) == 'foobar'
38 57

  
39 58

  
40
def test_lookup_user(settings):
41
    User = auth.get_user_model()
59
def test_lookup_user(settings, idp, saml_attributes):
42 60
    adapter = DefaultAdapter()
43 61
    user = adapter.lookup_user(idp, saml_attributes)
44 62
    assert user is not None
......
55 73
    assert User.objects.count() == 0
56 74

  
57 75

  
58
def test_lookup_user_transaction(transactional_db, concurrency):
76
def test_lookup_user_transaction(transactional_db, concurrency, idp, saml_attributes):
59 77
    adapter = DefaultAdapter()
60 78
    p = ThreadPool(concurrency)
61 79

  
......
81 99
    assert len(set(user.pk for user in users)) == 1
82 100

  
83 101

  
84
def test_provision_user_attributes(settings, django_user_model, caplog):
102
def test_provision_user_attributes(settings, django_user_model, idp, saml_attributes, caplog):
85 103
    settings.MELLON_IDENTITY_PROVIDERS = [idp]
86 104
    settings.MELLON_ATTRIBUTE_MAPPING = {
87 105
        'email': u'{attributes[email][0]}',
......
102 120
    assert 'set field email' in caplog.text
103 121

  
104 122

  
105
def test_provision_user_groups(settings, django_user_model, caplog):
123
def test_provision_user_groups(settings, django_user_model, idp, saml_attributes, caplog):
106 124
    settings.MELLON_IDENTITY_PROVIDERS = [idp]
107 125
    settings.MELLON_GROUP_ATTRIBUTE = 'group'
108 126
    user = SAMLBackend().authenticate(saml_attributes=saml_attributes)
......
118 136
    user = SAMLBackend().authenticate(saml_attributes=saml_attributes2)
119 137
    assert user.groups.count() == 2
120 138
    assert set(user.groups.values_list('name', flat=True)) == set(saml_attributes2['group'])
121
    assert len(caplog.records) == 5
139
    assert len(caplog.records) == 6
122 140
    assert 'removing group GroupA' in caplog.records[-1].message
123 141

  
124 142

  
125
def test_provision_is_superuser(settings, django_user_model, caplog):
143
def test_provision_is_superuser(settings, django_user_model, idp, saml_attributes, caplog):
126 144
    settings.MELLON_IDENTITY_PROVIDERS = [idp]
127 145
    settings.MELLON_SUPERUSER_MAPPING = {
128 146
        'is_superuser': 'true',
......
137 155
    assert not 'flag is_staff and is_superuser removed' in caplog.text
138 156

  
139 157

  
140
def test_provision_absent_attribute(settings, django_user_model, caplog):
158
def test_provision_absent_attribute(settings, django_user_model, idp, saml_attributes, caplog):
141 159
    settings.MELLON_IDENTITY_PROVIDERS = [idp]
142 160
    settings.MELLON_ATTRIBUTE_MAPPING = {
143 161
        'email': '{attributes[email][0]}',
......
155 173
    assert 'set field last_name' in caplog.text
156 174

  
157 175

  
158
def test_provision_long_attribute(settings, django_user_model, caplog):
176
def test_provision_long_attribute(settings, django_user_model, idp, saml_attributes, caplog):
159 177
    settings.MELLON_IDENTITY_PROVIDERS = [idp]
160 178
    settings.MELLON_ATTRIBUTE_MAPPING = {
161 179
        'email': '{attributes[email][0]}',
......
174 192
    assert 'set field email' in caplog.text
175 193

  
176 194

  
177
def test_lookup_user_transient_with_email(private_settings):
195
def test_lookup_user_transient_with_email(private_settings, idp, saml_attributes):
178 196
    private_settings.MELLON_TRANSIENT_FEDERATION_ATTRIBUTE = 'email'
179
    User = auth.get_user_model()
180 197
    adapter = DefaultAdapter()
181
    saml_attributes2 = saml_attributes.copy()
182
    saml_attributes2['name_id_format'] = lasso.SAML2_NAME_IDENTIFIER_FORMAT_TRANSIENT
198
    saml_attributes['name_id_format'] = lasso.SAML2_NAME_IDENTIFIER_FORMAT_TRANSIENT
183 199
    assert User.objects.count() == 0
184
    user = adapter.lookup_user(idp, saml_attributes2)
200
    user = adapter.lookup_user(idp, saml_attributes)
185 201
    assert user is not None
186 202
    assert user.saml_identifiers.count() == 1
187
    assert user.saml_identifiers.first().name_id == saml_attributes2['email'][0]
203
    assert user.saml_identifiers.first().name_id == saml_attributes['email'][0]
188 204

  
189
    user2 = adapter.lookup_user(idp, saml_attributes2)
205
    user2 = adapter.lookup_user(idp, saml_attributes)
190 206
    assert user.id == user2.id
191 207

  
192 208
    User.objects.all().delete()
......
196 212
    user = adapter.lookup_user(idp, saml_attributes)
197 213
    assert user is None
198 214
    assert User.objects.count() == 0
215

  
216

  
217
def test_lookup_user_by_attributes_bad_setting1(settings, idp, saml_attributes, caplog):
218
    settings.MELLON_PROVISION = False
219

  
220
    adapter = DefaultAdapter()
221
    settings.MELLON_LOOKUP_BY_ATTRIBUTES = 'coin'
222
    assert adapter.lookup_user(idp, saml_attributes) is None
223
    assert caplog.records[-1].message.endswith('it must be a list')
224

  
225

  
226
def test_lookup_user_by_attributes_bad_setting2(settings, idp, saml_attributes, caplog):
227
    settings.MELLON_PROVISION = False
228

  
229
    adapter = DefaultAdapter()
230
    settings.MELLON_LOOKUP_BY_ATTRIBUTES = ['coin']
231
    assert adapter.lookup_user(idp, saml_attributes) is None
232
    assert caplog.records[-1].message.endswith('it must be a list of dicts')
233

  
234

  
235
def test_lookup_user_by_attributes_bad_setting3(settings, idp, saml_attributes, caplog):
236
    settings.MELLON_PROVISION = False
237

  
238
    adapter = DefaultAdapter()
239
    settings.MELLON_LOOKUP_BY_ATTRIBUTES = [{}]
240
    assert adapter.lookup_user(idp, saml_attributes) is None
241
    assert caplog.records[-1].message.endswith('user_field is missing')
242

  
243

  
244
def test_lookup_user_by_attributes_bad_setting4(settings, idp, saml_attributes, caplog):
245
    settings.MELLON_PROVISION = False
246

  
247
    adapter = DefaultAdapter()
248
    settings.MELLON_LOOKUP_BY_ATTRIBUTES = [{'user_field': 'username'}]
249
    assert adapter.lookup_user(idp, saml_attributes) is None
250
    assert caplog.records[-1].message.endswith('saml_attribute is missing')
251

  
252

  
253
def test_lookup_user_by_attributes_not_found(settings, idp, saml_attributes, caplog):
254
    settings.MELLON_PROVISION = False
255

  
256
    adapter = DefaultAdapter()
257
    caplog.set_level('DEBUG')
258
    settings.MELLON_LOOKUP_BY_ATTRIBUTES = [{'user_field': 'username', 'saml_attribute': 'saml_at1'}]
259
    saml_attributes['saml_at1'] = ['john.doe']
260
    assert adapter.lookup_user(idp, saml_attributes) is None
261
    assert caplog.records[-2].message.endswith(': not found')
262

  
263

  
264
def test_lookup_user_by_attributes_too_many1(settings, idp, saml_attributes, john, jane, caplog):
265
    settings.MELLON_PROVISION = False
266

  
267
    adapter = DefaultAdapter()
268
    settings.MELLON_LOOKUP_BY_ATTRIBUTES = [{'user_field': 'email', 'saml_attribute': 'saml_at1'}]
269
    saml_attributes['saml_at1'] = ['john.doe@example.com']
270
    assert adapter.lookup_user(idp, saml_attributes) is None
271
    assert 'too many users found(2)' in caplog.records[-1].message
272

  
273

  
274
def test_lookup_user_by_attributes_too_manyi2(settings, idp, saml_attributes, john, jane, caplog):
275
    settings.MELLON_PROVISION = False
276

  
277
    adapter = DefaultAdapter()
278
    saml_attributes['saml_at1'] = ['john.doe']
279
    saml_attributes['saml_at2'] = ['jane.doe']
280

  
281
    settings.MELLON_LOOKUP_BY_ATTRIBUTES = [
282
        {'user_field': 'username', 'saml_attribute': 'saml_at1'},
283
        {'user_field': 'username', 'saml_attribute': 'saml_at2'},
284
    ]
285
    assert adapter.lookup_user(idp, saml_attributes) is None
286
    assert 'too many users found(2)' in caplog.records[-1].message
287

  
288

  
289
def test_lookup_user_by_attributes_found(settings, idp, saml_attributes, john, jane, caplog):
290
    settings.MELLON_PROVISION = False
291

  
292
    adapter = DefaultAdapter()
293
    saml_attributes['saml_at1'] = ['john.doe']
294
    settings.MELLON_LOOKUP_BY_ATTRIBUTES = [
295
        {'user_field': 'username', 'saml_attribute': 'saml_at1'},
296
    ]
297
    assert adapter.lookup_user(idp, saml_attributes) == john
298

  
299

  
300
def test_lookup_user_by_attributes_ignore_case(settings, idp, saml_attributes, john, jane, caplog):
301
    settings.MELLON_PROVISION = False
302

  
303
    adapter = DefaultAdapter()
304
    saml_attributes['saml_at1'] = ['Jane.Doe']
305
    settings.MELLON_LOOKUP_BY_ATTRIBUTES = [
306
        {'user_field': 'username', 'saml_attribute': 'saml_at1'},
307
    ]
308
    assert adapter.lookup_user(idp, saml_attributes) is None
309

  
310
    settings.MELLON_LOOKUP_BY_ATTRIBUTES = [
311
        {'user_field': 'username', 'saml_attribute': 'saml_at1', 'ignore-case': True},
312
    ]
313
    assert adapter.lookup_user(idp, saml_attributes) == jane
199
-