0003-add-user-lookup-by-attributes-33739.patch
README | ||
---|---|---|
261 | 261 |
Should be post or artifact. Default is post. You can refer to the SAML 2.0 |
262 | 262 |
specification to learn the difference. |
263 | 263 | |
264 |
MELLON_LOOKUP_BY_ATTRIBUTES |
|
265 |
--------------------------- |
|
266 | ||
267 |
Allow looking for user with some SAML attributes if the received NameID is |
|
268 |
still unknown. It must be a list of dictionnaries with two mandatory keys |
|
269 |
`user_field` and `saml_attribute`. The optionnal key `ignore-case` should be a |
|
270 |
boolean indicating if the match is case-insensitive (default is to respect the |
|
271 |
case). |
|
272 | ||
273 |
Each dictionnary is a rule for linking, applying all the rules should only |
|
274 |
return one user, the boolean operator OR is applied between the rules. |
|
275 | ||
276 |
So for example if you received a SAML attribute named `email` and you want to |
|
277 |
link user with the same email you would configured it like that: |
|
278 | ||
279 |
MELLON_LOOKUP_BY_ATTRIBUTES = [ |
|
280 |
{ |
|
281 |
'saml_attribute': 'email', |
|
282 |
'user_field': 'email', |
|
283 |
} |
|
284 |
] |
|
285 | ||
286 |
The targeted user(s) field(s) should be as much as possible unique |
|
287 |
individually, if not django-mellon will refuse to link multiple users matching |
|
288 |
the rules. |
|
289 | ||
264 | 290 |
Tests |
265 | 291 |
===== |
266 | 292 |
mellon/adapters.py | ||
---|---|---|
6 | 6 |
import requests |
7 | 7 |
import requests.exceptions |
8 | 8 | |
9 |
from django.core.exceptions import PermissionDenied |
|
9 |
from django.core.exceptions import PermissionDenied, FieldDoesNotExist
|
|
10 | 10 |
from django.contrib import auth |
11 | 11 |
from django.contrib.auth.models import Group |
12 | 12 |
from django.utils import six |
... | ... | |
21 | 21 |
pass |
22 | 22 | |
23 | 23 | |
24 |
def display_truncated_list(l, max_length=10): |
|
25 |
s = '[' + ', '.join(map(six.text_type, l)) |
|
26 |
if len(l) > max_length: |
|
27 |
s += '..truncated more than %d items (%d)]' % (max_length, len(l)) |
|
28 |
else: |
|
29 |
s += ']' |
|
30 |
return s |
|
31 | ||
32 | ||
24 | 33 |
class DefaultAdapter(object): |
25 | 34 |
def __init__(self, *args, **kwargs): |
26 | 35 |
self.logger = logging.getLogger(__name__) |
... | ... | |
128 | 137 |
name_id = saml_attributes['name_id_content'] |
129 | 138 |
issuer = saml_attributes['issuer'] |
130 | 139 |
try: |
131 |
return User.objects.get(saml_identifiers__name_id=name_id,
|
|
140 |
user = User.objects.get(saml_identifiers__name_id=name_id,
|
|
132 | 141 |
saml_identifiers__issuer=issuer) |
142 |
self.logger.info('looked up user %s with name_id %s from issuer %s', |
|
143 |
user, name_id, issuer) |
|
144 |
return user |
|
133 | 145 |
except User.DoesNotExist: |
134 | 146 |
pass |
135 | 147 | |
136 |
if not utils.get_setting(idp, 'PROVISION'): |
|
137 |
self.logger.warning('provisionning disabled, login refused') |
|
138 |
return None |
|
148 |
user = None |
|
149 |
lookup_by_attributes = utils.get_setting(idp, 'LOOKUP_BY_ATTRIBUTES') |
|
150 |
if lookup_by_attributes: |
|
151 |
user = self._lookup_by_attributes(idp, saml_attributes, lookup_by_attributes) |
|
152 | ||
153 |
created = False |
|
154 |
if not user: |
|
155 |
if not utils.get_setting(idp, 'PROVISION'): |
|
156 |
self.logger.debug('provisionning disabled, login refused') |
|
157 |
return None |
|
158 |
created = True |
|
159 |
user = self.create_user(User) |
|
139 | 160 | |
140 |
user = self.create_user(User) |
|
141 | 161 |
nameid_user = self._link_user(idp, saml_attributes, issuer, name_id, user) |
142 | 162 |
if user != nameid_user: |
143 | 163 |
self.logger.info('looked up user %s with name_id %s from issuer %s', |
144 | 164 |
nameid_user, name_id, issuer) |
145 |
user.delete() |
|
165 |
if created: |
|
166 |
user.delete() |
|
146 | 167 |
return nameid_user |
147 | 168 | |
148 |
try: |
|
149 |
self.finish_create_user(idp, saml_attributes, nameid_user) |
|
150 |
except UserCreationError: |
|
151 |
nameid_user.delete() |
|
152 |
return None |
|
153 |
self.logger.info('created new user %s with name_id %s from issuer %s', |
|
154 |
nameid_user, name_id, issuer) |
|
169 |
if created: |
|
170 |
try: |
|
171 |
self.finish_create_user(idp, saml_attributes, nameid_user) |
|
172 |
except UserCreationError: |
|
173 |
user.delete() |
|
174 |
return None |
|
175 |
self.logger.info('created new user %s with name_id %s from issuer %s', |
|
176 |
nameid_user, name_id, issuer) |
|
155 | 177 |
return nameid_user |
156 | 178 | |
179 |
def _lookup_by_attributes(self, idp, saml_attributes, lookup_by_attributes): |
|
180 |
if not isinstance(lookup_by_attributes, list): |
|
181 |
self.logger.error('invalid LOOKUP_BY_ATTRIBUTES configuration %r: it must be a list', lookup_by_attributes) |
|
182 |
return None |
|
183 | ||
184 |
users = set() |
|
185 |
for line in lookup_by_attributes: |
|
186 |
if not isinstance(line, dict): |
|
187 |
self.logger.error('invalid LOOKUP_BY_ATTRIBUTES configuration %r: it must be a list of dicts', line) |
|
188 |
continue |
|
189 |
user_field = line.get('user_field') |
|
190 |
if not hasattr(user_field, 'isalpha'): |
|
191 |
self.logger.error('invalid LOOKUP_BY_ATTRIBUTES configuration %r: user_field is missing', line) |
|
192 |
continue |
|
193 |
try: |
|
194 |
User._meta.get_field(user_field) |
|
195 |
except FieldDoesNotExist: |
|
196 |
self.logger.error('invalid LOOKUP_BY_ATTRIBUTES configuration %r, user field %s does not exist', |
|
197 |
line, user_field) |
|
198 |
continue |
|
199 |
saml_attribute = line.get('saml_attribute') |
|
200 |
if not hasattr(saml_attribute, 'isalpha'): |
|
201 |
self.logger.error('invalid LOOKUP_BY_ATTRIBUTES configuration %r: saml_attribute is missing', line) |
|
202 |
continue |
|
203 |
values = saml_attributes.get(saml_attribute) |
|
204 |
if not values: |
|
205 |
self.logger.error('looking for user by saml attribute %r and user field %r, skipping because empty', |
|
206 |
saml_attribute, user_field) |
|
207 |
continue |
|
208 |
ignore_case = line.get('ignore-case', False) |
|
209 |
for value in values: |
|
210 |
key = user_field |
|
211 |
if ignore_case: |
|
212 |
key += '__iexact' |
|
213 |
users_found = User.objects.filter(saml_identifiers__isnull=True, **{key: value}) |
|
214 |
if not users_found: |
|
215 |
self.logger.debug('looking for users by attribute %r and user field %r with value %r: not found', |
|
216 |
saml_attribute, user_field, value) |
|
217 |
continue |
|
218 |
self.logger.info(u'looking for user by attribute %r and user field %r with value %r: found %s', |
|
219 |
saml_attribute, user_field, value, display_truncated_list(users_found)) |
|
220 |
users.update(users_found) |
|
221 |
if len(users) == 1: |
|
222 |
user = list(users)[0] |
|
223 |
self.logger.info(u'looking for user by attributes %r: found user %s', |
|
224 |
lookup_by_attributes, user) |
|
225 |
return user |
|
226 |
elif len(users) > 1: |
|
227 |
self.logger.warning(u'looking for user by attributes %r: too many users found(%d), failing', |
|
228 |
lookup_by_attributes, len(users)) |
|
229 |
return None |
|
230 | ||
157 | 231 |
def _link_user(self, idp, saml_attributes, issuer, name_id, user): |
158 | 232 |
saml_id, created = models.UserSAMLIdentifier.objects.get_or_create( |
159 | 233 |
name_id=name_id, issuer=issuer, defaults={'user': user}) |
mellon/app_settings.py | ||
---|---|---|
40 | 40 |
'ARTIFACT_RESOLVE_TIMEOUT': 10.0, |
41 | 41 |
'LOGIN_HINTS': [], |
42 | 42 |
'SIGNATURE_METHOD': 'RSA-SHA256', |
43 |
'LOOKUP_BY_ATTRIBUTES': [], |
|
43 | 44 |
} |
44 | 45 | |
45 | 46 |
@property |
tests/test_default_adapter.py | ||
---|---|---|
11 | 11 | |
12 | 12 |
pytestmark = pytest.mark.django_db |
13 | 13 | |
14 |
idp = { |
|
15 |
'METADATA': open('tests/metadata.xml').read(), |
|
16 |
} |
|
17 |
saml_attributes = { |
|
18 |
'name_id_format': lasso.SAML2_NAME_IDENTIFIER_FORMAT_PERSISTENT, |
|
19 |
'name_id_content': 'x' * 32, |
|
20 |
'issuer': 'http://idp5/metadata', |
|
21 |
'username': ['foobar'], |
|
22 |
'email': ['test@example.net'], |
|
23 |
'first_name': ['Foo'], |
|
24 |
'last_name': ['Bar'], |
|
25 |
'is_superuser': ['true'], |
|
26 |
'group': ['GroupA', 'GroupB', 'GroupC'], |
|
27 |
} |
|
28 | ||
29 | ||
30 |
def test_format_username(settings): |
|
14 |
User = auth.get_user_model() |
|
15 | ||
16 | ||
17 |
@pytest.fixture |
|
18 |
def idp(): |
|
19 |
return { |
|
20 |
'METADATA': open('tests/metadata.xml').read(), |
|
21 |
} |
|
22 | ||
23 | ||
24 |
@pytest.fixture |
|
25 |
def saml_attributes(): |
|
26 |
return { |
|
27 |
'name_id_format': lasso.SAML2_NAME_IDENTIFIER_FORMAT_PERSISTENT, |
|
28 |
'name_id_content': 'x' * 32, |
|
29 |
'issuer': 'http://idp5/metadata', |
|
30 |
'username': ['foobar'], |
|
31 |
'email': ['test@example.net'], |
|
32 |
'first_name': ['Foo'], |
|
33 |
'last_name': ['Bar'], |
|
34 |
'is_superuser': ['true'], |
|
35 |
'group': ['GroupA', 'GroupB', 'GroupC'], |
|
36 |
} |
|
37 | ||
38 | ||
39 |
@pytest.fixture |
|
40 |
def john(db): |
|
41 |
return User.objects.create(username='john.doe', email='john.doe@example.com') |
|
42 | ||
43 | ||
44 |
@pytest.fixture |
|
45 |
def jane(db): |
|
46 |
return User.objects.create(username='jane.doe', email='john.doe@example.com') |
|
47 | ||
48 | ||
49 |
def test_format_username(settings, idp, saml_attributes): |
|
31 | 50 |
adapter = DefaultAdapter() |
32 | 51 |
assert adapter.format_username(idp, {}) is None |
33 | 52 |
assert adapter.format_username(idp, saml_attributes) == ('x' * 32 + '@saml')[:30] |
... | ... | |
37 | 56 |
assert adapter.format_username(idp, saml_attributes) == 'foobar' |
38 | 57 | |
39 | 58 | |
40 |
def test_lookup_user(settings): |
|
41 |
User = auth.get_user_model() |
|
59 |
def test_lookup_user(settings, idp, saml_attributes): |
|
42 | 60 |
adapter = DefaultAdapter() |
43 | 61 |
user = adapter.lookup_user(idp, saml_attributes) |
44 | 62 |
assert user is not None |
... | ... | |
55 | 73 |
assert User.objects.count() == 0 |
56 | 74 | |
57 | 75 | |
58 |
def test_lookup_user_transaction(transactional_db, concurrency): |
|
76 |
def test_lookup_user_transaction(transactional_db, concurrency, idp, saml_attributes):
|
|
59 | 77 |
adapter = DefaultAdapter() |
60 | 78 |
p = ThreadPool(concurrency) |
61 | 79 | |
... | ... | |
81 | 99 |
assert len(set(user.pk for user in users)) == 1 |
82 | 100 | |
83 | 101 | |
84 |
def test_provision_user_attributes(settings, django_user_model, caplog): |
|
102 |
def test_provision_user_attributes(settings, django_user_model, idp, saml_attributes, caplog):
|
|
85 | 103 |
settings.MELLON_IDENTITY_PROVIDERS = [idp] |
86 | 104 |
settings.MELLON_ATTRIBUTE_MAPPING = { |
87 | 105 |
'email': u'{attributes[email][0]}', |
... | ... | |
102 | 120 |
assert 'set field email' in caplog.text |
103 | 121 | |
104 | 122 | |
105 |
def test_provision_user_groups(settings, django_user_model, caplog): |
|
123 |
def test_provision_user_groups(settings, django_user_model, idp, saml_attributes, caplog):
|
|
106 | 124 |
settings.MELLON_IDENTITY_PROVIDERS = [idp] |
107 | 125 |
settings.MELLON_GROUP_ATTRIBUTE = 'group' |
108 | 126 |
user = SAMLBackend().authenticate(saml_attributes=saml_attributes) |
... | ... | |
118 | 136 |
user = SAMLBackend().authenticate(saml_attributes=saml_attributes2) |
119 | 137 |
assert user.groups.count() == 2 |
120 | 138 |
assert set(user.groups.values_list('name', flat=True)) == set(saml_attributes2['group']) |
121 |
assert len(caplog.records) == 5
|
|
139 |
assert len(caplog.records) == 6
|
|
122 | 140 |
assert 'removing group GroupA' in caplog.records[-1].message |
123 | 141 | |
124 | 142 | |
125 |
def test_provision_is_superuser(settings, django_user_model, caplog): |
|
143 |
def test_provision_is_superuser(settings, django_user_model, idp, saml_attributes, caplog):
|
|
126 | 144 |
settings.MELLON_IDENTITY_PROVIDERS = [idp] |
127 | 145 |
settings.MELLON_SUPERUSER_MAPPING = { |
128 | 146 |
'is_superuser': 'true', |
... | ... | |
137 | 155 |
assert not 'flag is_staff and is_superuser removed' in caplog.text |
138 | 156 | |
139 | 157 | |
140 |
def test_provision_absent_attribute(settings, django_user_model, caplog): |
|
158 |
def test_provision_absent_attribute(settings, django_user_model, idp, saml_attributes, caplog):
|
|
141 | 159 |
settings.MELLON_IDENTITY_PROVIDERS = [idp] |
142 | 160 |
settings.MELLON_ATTRIBUTE_MAPPING = { |
143 | 161 |
'email': '{attributes[email][0]}', |
... | ... | |
155 | 173 |
assert 'set field last_name' in caplog.text |
156 | 174 | |
157 | 175 | |
158 |
def test_provision_long_attribute(settings, django_user_model, caplog): |
|
176 |
def test_provision_long_attribute(settings, django_user_model, idp, saml_attributes, caplog):
|
|
159 | 177 |
settings.MELLON_IDENTITY_PROVIDERS = [idp] |
160 | 178 |
settings.MELLON_ATTRIBUTE_MAPPING = { |
161 | 179 |
'email': '{attributes[email][0]}', |
... | ... | |
174 | 192 |
assert 'set field email' in caplog.text |
175 | 193 | |
176 | 194 | |
177 |
def test_lookup_user_transient_with_email(private_settings): |
|
195 |
def test_lookup_user_transient_with_email(private_settings, idp, saml_attributes):
|
|
178 | 196 |
private_settings.MELLON_TRANSIENT_FEDERATION_ATTRIBUTE = 'email' |
179 |
User = auth.get_user_model() |
|
180 | 197 |
adapter = DefaultAdapter() |
181 |
saml_attributes2 = saml_attributes.copy() |
|
182 |
saml_attributes2['name_id_format'] = lasso.SAML2_NAME_IDENTIFIER_FORMAT_TRANSIENT |
|
198 |
saml_attributes['name_id_format'] = lasso.SAML2_NAME_IDENTIFIER_FORMAT_TRANSIENT |
|
183 | 199 |
assert User.objects.count() == 0 |
184 |
user = adapter.lookup_user(idp, saml_attributes2)
|
|
200 |
user = adapter.lookup_user(idp, saml_attributes) |
|
185 | 201 |
assert user is not None |
186 | 202 |
assert user.saml_identifiers.count() == 1 |
187 |
assert user.saml_identifiers.first().name_id == saml_attributes2['email'][0]
|
|
203 |
assert user.saml_identifiers.first().name_id == saml_attributes['email'][0] |
|
188 | 204 | |
189 |
user2 = adapter.lookup_user(idp, saml_attributes2)
|
|
205 |
user2 = adapter.lookup_user(idp, saml_attributes) |
|
190 | 206 |
assert user.id == user2.id |
191 | 207 | |
192 | 208 |
User.objects.all().delete() |
... | ... | |
196 | 212 |
user = adapter.lookup_user(idp, saml_attributes) |
197 | 213 |
assert user is None |
198 | 214 |
assert User.objects.count() == 0 |
215 | ||
216 | ||
217 |
def test_lookup_user_by_attributes_bad_setting1(settings, idp, saml_attributes, caplog): |
|
218 |
settings.MELLON_PROVISION = False |
|
219 | ||
220 |
adapter = DefaultAdapter() |
|
221 |
settings.MELLON_LOOKUP_BY_ATTRIBUTES = 'coin' |
|
222 |
assert adapter.lookup_user(idp, saml_attributes) is None |
|
223 |
assert caplog.records[-1].message.endswith('it must be a list') |
|
224 | ||
225 | ||
226 |
def test_lookup_user_by_attributes_bad_setting2(settings, idp, saml_attributes, caplog): |
|
227 |
settings.MELLON_PROVISION = False |
|
228 | ||
229 |
adapter = DefaultAdapter() |
|
230 |
settings.MELLON_LOOKUP_BY_ATTRIBUTES = ['coin'] |
|
231 |
assert adapter.lookup_user(idp, saml_attributes) is None |
|
232 |
assert caplog.records[-1].message.endswith('it must be a list of dicts') |
|
233 | ||
234 | ||
235 |
def test_lookup_user_by_attributes_bad_setting3(settings, idp, saml_attributes, caplog): |
|
236 |
settings.MELLON_PROVISION = False |
|
237 | ||
238 |
adapter = DefaultAdapter() |
|
239 |
settings.MELLON_LOOKUP_BY_ATTRIBUTES = [{}] |
|
240 |
assert adapter.lookup_user(idp, saml_attributes) is None |
|
241 |
assert caplog.records[-1].message.endswith('user_field is missing') |
|
242 | ||
243 | ||
244 |
def test_lookup_user_by_attributes_bad_setting4(settings, idp, saml_attributes, caplog): |
|
245 |
settings.MELLON_PROVISION = False |
|
246 | ||
247 |
adapter = DefaultAdapter() |
|
248 |
settings.MELLON_LOOKUP_BY_ATTRIBUTES = [{'user_field': 'username'}] |
|
249 |
assert adapter.lookup_user(idp, saml_attributes) is None |
|
250 |
assert caplog.records[-1].message.endswith('saml_attribute is missing') |
|
251 | ||
252 | ||
253 |
def test_lookup_user_by_attributes_not_found(settings, idp, saml_attributes, caplog): |
|
254 |
settings.MELLON_PROVISION = False |
|
255 | ||
256 |
adapter = DefaultAdapter() |
|
257 |
caplog.set_level('DEBUG') |
|
258 |
settings.MELLON_LOOKUP_BY_ATTRIBUTES = [{'user_field': 'username', 'saml_attribute': 'saml_at1'}] |
|
259 |
saml_attributes['saml_at1'] = ['john.doe'] |
|
260 |
assert adapter.lookup_user(idp, saml_attributes) is None |
|
261 |
assert caplog.records[-2].message.endswith(': not found') |
|
262 | ||
263 | ||
264 |
def test_lookup_user_by_attributes_too_many1(settings, idp, saml_attributes, john, jane, caplog): |
|
265 |
settings.MELLON_PROVISION = False |
|
266 | ||
267 |
adapter = DefaultAdapter() |
|
268 |
settings.MELLON_LOOKUP_BY_ATTRIBUTES = [{'user_field': 'email', 'saml_attribute': 'saml_at1'}] |
|
269 |
saml_attributes['saml_at1'] = ['john.doe@example.com'] |
|
270 |
assert adapter.lookup_user(idp, saml_attributes) is None |
|
271 |
assert 'too many users found(2)' in caplog.records[-1].message |
|
272 | ||
273 | ||
274 |
def test_lookup_user_by_attributes_too_manyi2(settings, idp, saml_attributes, john, jane, caplog): |
|
275 |
settings.MELLON_PROVISION = False |
|
276 | ||
277 |
adapter = DefaultAdapter() |
|
278 |
saml_attributes['saml_at1'] = ['john.doe'] |
|
279 |
saml_attributes['saml_at2'] = ['jane.doe'] |
|
280 | ||
281 |
settings.MELLON_LOOKUP_BY_ATTRIBUTES = [ |
|
282 |
{'user_field': 'username', 'saml_attribute': 'saml_at1'}, |
|
283 |
{'user_field': 'username', 'saml_attribute': 'saml_at2'}, |
|
284 |
] |
|
285 |
assert adapter.lookup_user(idp, saml_attributes) is None |
|
286 |
assert 'too many users found(2)' in caplog.records[-1].message |
|
287 | ||
288 | ||
289 |
def test_lookup_user_by_attributes_found(settings, idp, saml_attributes, john, jane, caplog): |
|
290 |
settings.MELLON_PROVISION = False |
|
291 | ||
292 |
adapter = DefaultAdapter() |
|
293 |
saml_attributes['saml_at1'] = ['john.doe'] |
|
294 |
settings.MELLON_LOOKUP_BY_ATTRIBUTES = [ |
|
295 |
{'user_field': 'username', 'saml_attribute': 'saml_at1'}, |
|
296 |
] |
|
297 |
assert adapter.lookup_user(idp, saml_attributes) == john |
|
298 | ||
299 | ||
300 |
def test_lookup_user_by_attributes_ignore_case(settings, idp, saml_attributes, john, jane, caplog): |
|
301 |
settings.MELLON_PROVISION = False |
|
302 | ||
303 |
adapter = DefaultAdapter() |
|
304 |
saml_attributes['saml_at1'] = ['Jane.Doe'] |
|
305 |
settings.MELLON_LOOKUP_BY_ATTRIBUTES = [ |
|
306 |
{'user_field': 'username', 'saml_attribute': 'saml_at1'}, |
|
307 |
] |
|
308 |
assert adapter.lookup_user(idp, saml_attributes) is None |
|
309 | ||
310 |
settings.MELLON_LOOKUP_BY_ATTRIBUTES = [ |
|
311 |
{'user_field': 'username', 'saml_attribute': 'saml_at1', 'ignore-case': True}, |
|
312 |
] |
|
313 |
assert adapter.lookup_user(idp, saml_attributes) == jane |
|
199 |
- |