From e1a538889f69bbb1c7cd363b7f29fbe7b8ba04b6 Mon Sep 17 00:00:00 2001 From: Benjamin Dauvergne Date: Sun, 8 May 2022 21:21:59 +0200 Subject: [PATCH 5/9] utils: add NaturalKeyRelatedField class (#62013) --- src/authentic2/utils/api.py | 79 +++++++++++++++++++++ tests/test_utils_api.py | 134 ++++++++++++++++++++++++++++++++++++ 2 files changed, 213 insertions(+) create mode 100644 src/authentic2/utils/api.py create mode 100644 tests/test_utils_api.py diff --git a/src/authentic2/utils/api.py b/src/authentic2/utils/api.py new file mode 100644 index 00000000..9506613f --- /dev/null +++ b/src/authentic2/utils/api.py @@ -0,0 +1,79 @@ +# authentic2 - versatile identity manager +# Copyright (C) 2010-2022 Entr'ouvert +# +# This program is free software: you can redistribute it and/or modify it +# under the terms of the GNU Affero General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + + +from django.db import models +from rest_framework import exceptions, serializers + + +class NaturalKeyRelatedField(serializers.RelatedField): + def to_representation(self, value): + if value is None: + return None + return self._instance_to_natural_key(value) + + def to_internal_value(self, data): + if data is None: + return None + return self._natural_key_to_instance(self.get_queryset(), data) + + def _instance_to_natural_key(self, instance): + model = type(instance) + fields = set() + for natural_key_description in model._meta.natural_key: + for name in natural_key_description: + name = name.split('__')[0] + fields.add(name) + raw = {name: getattr(instance, name) for name in fields} + return { + name: self._instance_to_natural_key(value) if isinstance(value, models.Model) else value + for name, value in raw.items() + } + + def _natural_key_to_instance(self, queryset, data): + if data is None: + return data + + model = queryset.model + natural_keys = {} + for name, value in data.items(): + field = model._meta.get_field(name) + if field.related_model: + qs = field.related_model._base_manager + natural_keys[name] = self._natural_key_to_instance(qs, value) + else: + natural_keys[name] = value + for natural_key_description in model._meta.natural_key: + lookups = {} + for name in natural_key_description: + real_name = name.split('__')[0] + if real_name not in natural_keys: + break + value = natural_keys[real_name] + if name.endswith('__isnull'): + if value is not None: + break + lookups[name] = True + else: + lookups[name] = value + else: + try: + return queryset.get(**lookups) + except model.DoesNotExist: + pass + except model.MultipleObjectsReturned: + raise exceptions.ValidationError('multiple objects returned') + raise exceptions.ValidationError('object not found') diff --git a/tests/test_utils_api.py b/tests/test_utils_api.py new file mode 100644 index 00000000..70c259e3 --- /dev/null +++ b/tests/test_utils_api.py @@ -0,0 +1,134 @@ +# authentic2 - versatile identity manager +# Copyright (C) 2010-2022 Entr'ouvert +# +# This program is free software: you can redistribute it and/or modify it +# under the terms of the GNU Affero General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +import pytest +from rest_framework.exceptions import ValidationError + +from authentic2.a2_rbac.models import OrganizationalUnit as OU +from authentic2.a2_rbac.models import Role +from authentic2.models import Service +from authentic2.utils.api import NaturalKeyRelatedField +from tests.utils import scoped_db_fixture + + +class TestNaturalKeyRelatedField: + @scoped_db_fixture(scope='class', autouse=True) + def fixture(self): + class Namespace: + ou = OU.objects.create(name='ou', uuid='1' * 32) + service = Service.objects.create(name='service', slug='service', ou=ou) + role = Role.objects.create(name='role', ou=ou, uuid='2' * 32) + ou2 = OU.objects.create(name='ou2', uuid='3' * 32) + + yield Namespace + + def test_to_representation(self, db, fixture): + assert NaturalKeyRelatedField(read_only=True).to_representation(fixture.role) == { + 'name': 'role', + 'ou': {'name': 'ou', 'slug': 'ou', 'uuid': '1' * 32}, + 'service': None, + 'slug': 'role', + 'uuid': '2' * 32, + } + + def test_to_representation_service(self, db, fixture): + fixture.role.service = fixture.service + fixture.role.save() + assert NaturalKeyRelatedField(read_only=True).to_representation(fixture.role) == { + 'name': 'role', + 'ou': {'name': 'ou', 'slug': 'ou', 'uuid': '1' * 32}, + 'service': { + 'slug': 'service', + 'ou': {'name': 'ou', 'slug': 'ou', 'uuid': '1' * 32}, + }, + 'slug': 'role', + 'uuid': '2' * 32, + } + + @pytest.mark.parametrize( + 'value', + [ + {'uuid': '2' * 32}, + {'name': 'role'}, + {'slug': 'role'}, + {'name': 'role', 'ou': {'name': 'ou'}}, + {'slug': 'role', 'ou': {'slug': 'ou'}}, + {'slug': 'role', 'ou': {'uuid': '1' * 32}}, + ], + ids=[ + 'by uuid', + 'by name', + 'by slug', + 'by name and ou by name', + 'by name and ou by slug', + 'by name and ou by uuid', + ], + ) + def test_to_internal_value_role(self, value, db, fixture): + assert NaturalKeyRelatedField(queryset=Role.objects.all()).to_internal_value(value) == fixture.role + + @pytest.mark.parametrize( + 'value', + [ + {'name': 'role'}, + {'slug': 'role'}, + ], + ids=['by name', 'by slug'], + ) + def test_to_internal_value_role_ambiguous(self, value, db, fixture): + Role.objects.create(slug='role', name='role', ou=fixture.ou2) + with pytest.raises(ValidationError, match='multiple'): + assert ( + NaturalKeyRelatedField(queryset=Role.objects.all()).to_internal_value(value) == fixture.role + ) + + @pytest.mark.parametrize( + 'value', + [ + {'name': 'role', 'ou': {'slug': 'ou'}}, + {'slug': 'role', 'ou': {'slug': 'ou'}}, + ], + ids=['by name and ou', 'by slug and ou'], + ) + def test_to_internal_value_role_unique(self, value, db, fixture): + Role.objects.create(slug='role', name='role', ou=fixture.ou2) + assert NaturalKeyRelatedField(queryset=Role.objects.all()).to_internal_value(value) == fixture.role + + @pytest.mark.parametrize( + 'value', + [ + {'uuid': '2' * 32}, + {'name': 'role'}, + {'slug': 'role'}, + {'name': 'role', 'ou': {'name': 'ou'}}, + {'slug': 'role', 'ou': {'slug': 'ou'}}, + {'slug': 'role', 'ou': {'uuid': '1' * 32}}, + ], + ids=[ + 'by uuid', + 'by name', + 'by slug', + 'by name and ou by name', + 'by name and ou by slug', + 'by name and ou by uuid', + ], + ) + def test_to_internal_value_role_not_found(self, value, db, fixture): + Role.objects.all().delete() + with pytest.raises(ValidationError, match='not found'): + assert ( + NaturalKeyRelatedField(queryset=Role.objects.all()).to_internal_value(value) == fixture.role + ) -- 2.35.1