From aa0c5c985b8f602a9ea39a22b5ef6d176394a45e Mon Sep 17 00:00:00 2001 From: Valentin Deniaud Date: Wed, 27 Jul 2022 16:21:40 +0200 Subject: [PATCH 1/7] auth_saml: migrate JSON fields to models (#67025) --- ..._samlattributelookup_setattributeaction.py | 136 +++++++++++++++++ .../migrations/0005_auto_20220727_1704.py | 140 ++++++++++++++++++ src/authentic2_auth_saml/models.py | 44 ++++++ tests/test_auth_saml.py | 93 ++++++++++++ 4 files changed, 413 insertions(+) create mode 100644 src/authentic2_auth_saml/migrations/0004_addroleaction_renameattributeaction_samlattributelookup_setattributeaction.py create mode 100644 src/authentic2_auth_saml/migrations/0005_auto_20220727_1704.py diff --git a/src/authentic2_auth_saml/migrations/0004_addroleaction_renameattributeaction_samlattributelookup_setattributeaction.py b/src/authentic2_auth_saml/migrations/0004_addroleaction_renameattributeaction_samlattributelookup_setattributeaction.py new file mode 100644 index 00000000..1914de9d --- /dev/null +++ b/src/authentic2_auth_saml/migrations/0004_addroleaction_renameattributeaction_samlattributelookup_setattributeaction.py @@ -0,0 +1,136 @@ +# Generated by Django 2.2.26 on 2022-08-11 13:16 + +import django.db.models.deletion +from django.conf import settings +from django.db import migrations, models + +import authentic2.utils.evaluate + + +class Migration(migrations.Migration): + + dependencies = [ + migrations.swappable_dependency(settings.RBAC_ROLE_MODEL), + ('authentic2_auth_saml', '0003_auto_20220726_1713'), + ] + + operations = [ + migrations.CreateModel( + name='SetAttributeAction', + fields=[ + ( + 'id', + models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID'), + ), + ('attribute', models.CharField(max_length=32, verbose_name='User attribute name')), + ('saml_attribute', models.CharField(max_length=128, verbose_name='SAML attribute name')), + ( + 'mandatory', + models.BooleanField( + default=False, help_text='Deny login if action fails.', verbose_name='Mandatory' + ), + ), + ( + 'authenticator', + models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, + related_name='set_attribute_actions', + to='authentic2_auth_saml.SAMLAuthenticator', + ), + ), + ], + options={ + 'verbose_name': 'Set an attribute', + 'default_related_name': 'set_attribute_actions', + }, + ), + migrations.CreateModel( + name='SAMLAttributeLookup', + fields=[ + ( + 'id', + models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID'), + ), + ('user_field', models.CharField(max_length=32, verbose_name='User field')), + ('saml_attribute', models.CharField(max_length=128, verbose_name='SAML attribute')), + ('ignore_case', models.BooleanField(default=False, verbose_name='Ignore case')), + ( + 'authenticator', + models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, + related_name='attribute_lookups', + to='authentic2_auth_saml.SAMLAuthenticator', + ), + ), + ], + options={'verbose_name': 'Attribute lookup', 'default_related_name': 'attribute_lookups'}, + ), + migrations.CreateModel( + name='RenameAttributeAction', + fields=[ + ( + 'id', + models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID'), + ), + ('from_name', models.CharField(max_length=128, verbose_name='From')), + ('to_name', models.CharField(max_length=32, verbose_name='To')), + ( + 'authenticator', + models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, + related_name='rename_attribute_actions', + to='authentic2_auth_saml.SAMLAuthenticator', + ), + ), + ], + options={ + 'verbose_name': 'Rename an attribute', + 'default_related_name': 'rename_attribute_actions', + }, + ), + migrations.CreateModel( + name='AddRoleAction', + fields=[ + ( + 'id', + models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID'), + ), + ( + 'condition', + models.CharField( + blank=True, + max_length=256, + validators=[authentic2.utils.evaluate.condition_validator], + verbose_name='Condition', + ), + ), + ( + 'mandatory', + models.BooleanField( + default=False, help_text='Deny login if action fails.', verbose_name='Mandatory' + ), + ), + ( + 'authenticator', + models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, + related_name='add_role_actions', + to='authentic2_auth_saml.SAMLAuthenticator', + ), + ), + ( + 'role', + models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, + related_name='add_role_actions', + to=settings.RBAC_ROLE_MODEL, + verbose_name='Role', + ), + ), + ], + options={ + 'verbose_name': 'Add a role', + 'default_related_name': 'add_role_actions', + }, + ), + ] diff --git a/src/authentic2_auth_saml/migrations/0005_auto_20220727_1704.py b/src/authentic2_auth_saml/migrations/0005_auto_20220727_1704.py new file mode 100644 index 00000000..d1d4397f --- /dev/null +++ b/src/authentic2_auth_saml/migrations/0005_auto_20220727_1704.py @@ -0,0 +1,140 @@ +# Generated by Django 2.2.26 on 2022-07-27 15:04 + +from django.core.exceptions import MultipleObjectsReturned +from django.db import migrations + + +def get_key(obj, name, max_length=None, default=''): + setting = obj.get(name) + + expected_type = type(default) + if not isinstance(setting, expected_type): + setting = None + + if setting is None: + setting = default + + return setting[:max_length] if max_length else setting + + +def get_ou(role_desc, ou_model): + ou_desc = role_desc.get('ou') + if ou_desc is None: + return None + if not isinstance(ou_desc, dict): + return + slug = ou_desc.get('slug') + name = ou_desc.get('name') + if slug: + if not isinstance(slug, str): + return + try: + return ou_model.objects.get(slug=slug) + except ou_model.DoesNotExist: + return + elif name: + if not isinstance(name, str): + return + try: + return ou_model.objects.get(name=name) + except ou_model.DoesNotExist: + pass + + +def get_role(mapping, role_model, ou_model): + role_desc = mapping.get('role') + if not role_desc or not isinstance(role_desc, dict): + return + slug = role_desc.get('slug') + name = role_desc.get('name') + ou = get_ou(role_desc, ou_model) + + kwargs = {} + if ou: + kwargs['ou'] = ou + + if slug: + if not isinstance(slug, str): + return + kwargs['slug'] = slug + elif name: + if not isinstance(name, str): + return + kwargs['name'] = name + else: + return + + try: + return role_model.objects.get(**kwargs) + except role_model.DoesNotExist: + pass + except MultipleObjectsReturned: + pass + + +def migrate_jsonfields(apps, schema_editor): + SAMLAuthenticator = apps.get_model('authentic2_auth_saml', 'SAMLAuthenticator') + SAMLAttributeLookup = apps.get_model('authentic2_auth_saml', 'SAMLAttributeLookup') + SetAttributeAction = apps.get_model('authentic2_auth_saml', 'SetAttributeAction') + AddRoleAction = apps.get_model('authentic2_auth_saml', 'AddRoleAction') + RenameAttributeAction = apps.get_model('authentic2_auth_saml', 'RenameAttributeAction') + Role = apps.get_model('a2_rbac', 'Role') + OU = apps.get_model('a2_rbac', 'OrganizationalUnit') + + for authenticator in SAMLAuthenticator.objects.all(): + for obj in authenticator.lookup_by_attributes: + saml_attribute = get_key(obj, 'saml_attribute', 128) + user_field = get_key(obj, 'user_field', 32) + if saml_attribute and user_field: + SAMLAttributeLookup.objects.create( + authenticator=authenticator, + saml_attribute=saml_attribute, + user_field=user_field, + ignore_case=get_key(obj, 'ignore-case', default=False), + ) + for obj in authenticator.a2_attribute_mapping: + action = obj.get('action') or '' + action = action.replace('_', '-') + if not action or action == 'set-attribute': + attribute = get_key(obj, 'attribute', 32) + saml_attribute = get_key(obj, 'saml_attribute', 128) + if attribute and saml_attribute: + SetAttributeAction.objects.create( + authenticator=authenticator, + attribute=attribute, + saml_attribute=saml_attribute, + mandatory=get_key(obj, 'mandatory', default=False), + ) + elif action == 'rename': + from_name = get_key(obj, 'from', 128) + to_name = get_key(obj, 'to', 32) + if from_name and to_name: + RenameAttributeAction.objects.create( + authenticator=authenticator, + from_name=from_name, + to_name=to_name, + ) + elif action in ('toggle-role', 'add-role'): + role = get_role(obj, Role, OU) + if role: + AddRoleAction.objects.create( + authenticator=authenticator, + role=role, + condition=get_key(obj, 'condition', 256), + mandatory=get_key(obj, 'mandatory', default=False), + ) + + +class Migration(migrations.Migration): + + dependencies = [ + ( + 'authentic2_auth_saml', + '0004_addroleaction_renameattributeaction_samlattributelookup_setattributeaction', + ), + ('a2_rbac', '0029_use_unique_constraints'), + ] + + operations = [ + migrations.RunPython(migrate_jsonfields, reverse_code=migrations.RunPython.noop), + ] diff --git a/src/authentic2_auth_saml/models.py b/src/authentic2_auth_saml/models.py index 9b95643f..85f1bf77 100644 --- a/src/authentic2_auth_saml/models.py +++ b/src/authentic2_auth_saml/models.py @@ -19,6 +19,7 @@ from django.core.exceptions import ValidationError from django.db import models from django.utils.translation import gettext_lazy as _ +from authentic2.a2_rbac.models import Role from authentic2.apps.authenticators.models import BaseAuthenticator from authentic2.utils.misc import redirect_to_login @@ -216,3 +217,46 @@ class SAMLAuthenticator(BaseAuthenticator): def profile(self, request, *args, **kwargs): return views.profile(request, *args, **kwargs) + + +class SAMLAttributeLookup(models.Model): + authenticator = models.ForeignKey( + SAMLAuthenticator, on_delete=models.CASCADE, related_name='attribute_lookups' + ) + user_field = models.CharField(_('User field'), max_length=32) + saml_attribute = models.CharField(_('SAML attribute'), max_length=128) + ignore_case = models.BooleanField(_('Ignore case'), default=False) + + class Meta: + verbose_name = _('Attribute lookup') + +class RenameAttributeAction(models.Model): + authenticator = models.ForeignKey(SAMLAuthenticator, on_delete=models.CASCADE) + from_name = models.CharField(_('From'), max_length=128) + to_name = models.CharField(_('To'), max_length=32) + + class Meta: + default_related_name = 'rename_attribute_actions' + verbose_name = _('Rename an attribute') + + +class SetAttributeAction(models.Model): + authenticator = models.ForeignKey(SAMLAuthenticator, on_delete=models.CASCADE) + attribute = models.CharField(_('User attribute name'), max_length=32) + saml_attribute = models.CharField(_('SAML attribute name'), max_length=128) + mandatory = models.BooleanField(_('Mandatory'), default=False, help_text=_('Deny login if action fails.')) + + class Meta: + default_related_name = 'set_attribute_actions' + verbose_name = _('Set an attribute') + + +class AddRoleAction(models.Model): + authenticator = models.ForeignKey(SAMLAuthenticator, on_delete=models.CASCADE) + role = models.ForeignKey(Role, verbose_name=_('Role'), on_delete=models.CASCADE) + condition = models.CharField(_('Condition'), max_length=256, blank=True) + mandatory = models.BooleanField(_('Mandatory'), default=False, help_text=_('Deny login if action fails.')) + + class Meta: + default_related_name = 'add_role_actions' + verbose_name = _('Add a role') diff --git a/tests/test_auth_saml.py b/tests/test_auth_saml.py index c4e65c20..9af317fb 100644 --- a/tests/test_auth_saml.py +++ b/tests/test_auth_saml.py @@ -576,3 +576,96 @@ def test_saml_authenticator_data_migration_bad_settings(migration, settings): assert authenticator.error_redirect_after_timeout == 120 assert authenticator.authn_classref == '' assert authenticator.superuser_mapping == {} + + +def test_saml_authenticator_data_migration_json_fields(migration, settings): + migrate_from = [ + ( + 'authentic2_auth_saml', + '0004_addroleaction_renameattributeaction_samlattributelookup_setattributeaction', + ), + ('a2_rbac', '0029_use_unique_constraints'), + ] + migrate_to = [ + ('authentic2_auth_saml', '0005_auto_20220727_1704'), + ('a2_rbac', '0029_use_unique_constraints'), + ] + + old_apps = migration.before(migrate_from) + SAMLAuthenticator = old_apps.get_model('authentic2_auth_saml', 'SAMLAuthenticator') + Role = old_apps.get_model('a2_rbac', 'Role') + OU = old_apps.get_model('a2_rbac', 'OrganizationalUnit') + + ou = OU.objects.create(name='Test OU', slug='test-ou') + role = Role.objects.create(name='Test role', slug='test-role', ou=ou) + + SAMLAuthenticator.objects.create( + metadata='meta1.xml', + slug='idp1', + lookup_by_attributes=[ + {'saml_attribute': 'email', 'user_field': 'email'}, + {'saml_attribute': 'saml_name', 'user_field': 'first_name', 'ignore-case': True}, + ], + a2_attribute_mapping=[ + { + 'attribute': 'email', + 'saml_attribute': 'mail', + 'mandatory': True, + }, + {'action': 'rename', 'from': 'a' * 129, 'to': 'first_name'}, + { + 'attribute': 'first_name', + 'saml_attribute': 'first_name', + }, + { + 'attribute': 'invalid', + 'saml_attribute': '', + }, + { + 'attribute': 'invalid', + 'saml_attribute': None, + }, + { + 'attribute': 'invalid', + }, + { + 'action': 'add-role', + 'role': { + 'name': role.name, + 'ou': { + 'name': role.ou.name, + }, + }, + 'condition': "roles == 'A'", + }, + ], + ) + + new_apps = migration.apply(migrate_to) + SAMLAuthenticator = new_apps.get_model('authentic2_auth_saml', 'SAMLAuthenticator') + authenticator = SAMLAuthenticator.objects.get() + + attribute_lookup1, attribute_lookup2 = authenticator.attribute_lookups.all().order_by('pk') + assert attribute_lookup1.saml_attribute == 'email' + assert attribute_lookup1.user_field == 'email' + assert attribute_lookup1.ignore_case is False + assert attribute_lookup2.saml_attribute == 'saml_name' + assert attribute_lookup2.user_field == 'first_name' + assert attribute_lookup2.ignore_case is True + + set_attribute1, set_attribute2 = authenticator.set_attribute_actions.all().order_by('pk') + assert set_attribute1.attribute == 'email' + assert set_attribute1.saml_attribute == 'mail' + assert set_attribute1.mandatory is True + assert set_attribute2.attribute == 'first_name' + assert set_attribute2.saml_attribute == 'first_name' + assert set_attribute2.mandatory is False + + rename_attribute = authenticator.rename_attribute_actions.get() + assert rename_attribute.from_name == 'a' * 128 + assert rename_attribute.to_name == 'first_name' + + add_role = authenticator.add_role_actions.get() + assert add_role.role.pk == role.pk + assert add_role.condition == "roles == 'A'" + assert add_role.mandatory is False -- 2.30.2