From 7a8d7bc63de31d42c1ae7ae955c803d0e0d0ce01 Mon Sep 17 00:00:00 2001 From: Benjamin Dauvergne Date: Tue, 13 Sep 2016 17:13:31 +0200 Subject: [PATCH] send provisionning messages after request treatment in a thread (fixes #9396) All objects to provision are collected into the Provisionning singleton object in thread local dictionnaries. When request processing is finished the ProvisionningMiddleware launch a thread which will send provisionning messages. --- debian/debian_config_common.py | 4 + hobo/agent/authentic2/apps.py | 208 +--------------------- hobo/agent/authentic2/middleware.py | 14 ++ hobo/agent/authentic2/provisionning.py | 303 +++++++++++++++++++++++++++++++++ hobo/multitenant/apps.py | 1 + hobo/multitenant/models.py | 4 +- tests_authentic/conftest.py | 7 +- tests_authentic/test_hobo_deploy.py | 2 +- tests_authentic/test_provisionning.py | 222 ++++++++++++++++++++---- tox.ini | 2 +- 10 files changed, 528 insertions(+), 239 deletions(-) create mode 100644 hobo/agent/authentic2/middleware.py create mode 100644 hobo/agent/authentic2/provisionning.py diff --git a/debian/debian_config_common.py b/debian/debian_config_common.py index 1486d26..7ade363 100644 --- a/debian/debian_config_common.py +++ b/debian/debian_config_common.py @@ -209,6 +209,10 @@ if 'authentic2' not in INSTALLED_APPS: MIDDLEWARE_CLASSES = MIDDLEWARE_CLASSES + ( 'mellon.middleware.PassiveAuthenticationMiddleware', ) +else: + MIDDLEWARE_CLASSES = MIDDLEWARE_CLASSES + ( + 'hobo.agent.authentic2.middleware.ProvisionningMiddleware', + ) TENANT_SETTINGS_LOADERS = ( 'hobo.multitenant.settings_loaders.TemplateVars', diff --git a/hobo/agent/authentic2/apps.py b/hobo/agent/authentic2/apps.py index b2f437d..deb0e71 100644 --- a/hobo/agent/authentic2/apps.py +++ b/hobo/agent/authentic2/apps.py @@ -14,191 +14,8 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -import json -from urlparse import urljoin - from django.apps import AppConfig -from django.db.models.signals import post_save, post_delete, pre_delete, m2m_changed -from django.conf import settings -from django.contrib.auth import get_user_model -from django.db import connection -from django.core.urlresolvers import reverse - -from django_rbac.utils import get_role_model - -from hobo.agent.common import notify_agents -from authentic2.models import AttributeValue -from authentic2.saml.models import LibertyProvider -from authentic2.a2_rbac.models import OrganizationalUnit - - -def get_ou(role_or_through): - if hasattr(role_or_through, 'ou_id'): - return role_or_through.ou - else: - return role_or_through.role.ou - - -def get_audience(role_or_through): - ou = get_ou(role_or_through) - if ou: - qs = LibertyProvider.objects.filter(ou=ou) - else: - qs = LibertyProvider.objects.filter(ou__isnull=True) - return [(service, service.entity_id) for service in qs] - - -def get_related_roles(role_or_through): - ou = get_ou(role_or_through) - Role = get_role_model() - qs = Role.objects.filter(admin_scope_id__isnull=True) \ - .prefetch_related('attributes') - if ou: - qs = qs.filter(ou=ou) - else: - qs = qs.filter(ou__isnull=True) - for role in qs: - role.emails = [] - role.emails_to_members = False - role.details = u'' - for attribute in role.attributes.all(): - if (attribute.name in ('emails', 'emails_to_members', 'details') - and attribute.kind == 'json'): - setattr(role, attribute.name, json.loads(attribute.value)) - return qs - - -def notify_roles(sender, instance, **kwargs): - if not getattr(settings, 'HOBO_ROLE_EXPORT', True): - return - if instance.slug.startswith('_'): - return - try: - notify_agents({ - '@type': 'provision', - 'audience': [audience for service, audience in get_audience(instance)], - 'full': True, - 'objects': { - '@type': 'role', - 'data': [ - { - 'uuid': role.uuid, - 'name': role.name, - 'slug': role.slug, - 'description': role.description, - 'details': role.details, - 'emails': role.emails, - 'emails_to_members': role.emails_to_members, - } for role in get_related_roles(instance) - ], - } - }) - except OrganizationalUnit.DoesNotExist: - pass - - -def get_entity_id(): - tenant = getattr(connection, 'tenant', None) - assert tenant - base_url = tenant.get_base_url() - return urljoin(base_url, reverse('a2-idp-saml-metadata')) - - -def provision_user(sender, instance, **kwargs): - User = get_user_model() - if not isinstance(instance, User): - return - if not getattr(settings, 'HOBO_ROLE_EXPORT', True): - return - data = {} - for av in AttributeValue.objects.with_owner(instance): - data[str(av.attribute.name)] = av.to_python() - - roles = instance.roles_and_parents() \ - .prefetch_related('attributes') - is_superuser = instance.is_superuser - data.update({ - 'uuid': instance.uuid, - 'username': instance.username, - 'first_name': instance.first_name, - 'last_name': instance.last_name, - 'email': instance.email, - 'roles': [ - { - 'uuid': role.uuid, - 'name': role.name, - 'slug': role.slug, - } for role in roles], - }) - - for service, audience in get_audience(instance): - role_is_superuser = False - for role in roles: - if role.service_id != service.pk: - continue - for attribute in role.attributes.all(): - if attribute.name == 'is_superuser' and attribute.value == 'true': - role_is_superuser = True - data['is_superuser'] = is_superuser or role_is_superuser - notify_agents({ - '@type': 'provision', - 'issuer': unicode(get_entity_id()), - 'audience': [audience], - 'full': False, - 'objects': { - '@type': 'user', - 'data': [data], - } - }) - - -def deprovision_user(sender, instance, **kwargs): - User = get_user_model() - if not isinstance(instance, User): - return - if not getattr(settings, 'HOBO_ROLE_EXPORT', True): - return - notify_agents({ - '@type': 'deprovision', - 'issuer': unicode(get_entity_id()), - 'audience': [audience for service, audience in get_audience(instance)], - 'full': False, - 'objects': { - '@type': 'user', - 'data': [ - { - 'uuid': instance.uuid, - } - ], - } - }) - - -def provision_user_on_role_change(sender, action, instance, model, pk_set, - reverse, **kwargs): - if not action.startswith('post'): - return - if action.endswith('_clear'): - return - if reverse: - provision_user(sender, instance, **kwargs) - else: - for user in model.objects.filter(pk__in=pk_set): - provision_user(sender, user, **kwargs) - - -def provision_user_on_attribute_value_save(sender, instance, **kwargs): - User = get_user_model() - if not isinstance(instance.owner, User): - return - provision_user(User, instance.owner) - - -def provision_user_on_attribute_value_delete(sender, instance, **kwargs): - User = get_user_model() - if not isinstance(instance.owner, User): - return - provision_user(User, instance.owner) +from django.db.models.signals import pre_save, pre_delete, m2m_changed, post_save class Authentic2AgentConfig(AppConfig): @@ -207,18 +24,11 @@ class Authentic2AgentConfig(AppConfig): verbose_name = 'Authentic2 Agent' def ready(self): - Role = get_role_model() - post_save.connect(notify_roles, sender=Role) - post_delete.connect(notify_roles, sender=Role) - post_save.connect(notify_roles, Role) - post_delete.connect(notify_roles, Role) - post_save.connect(notify_roles, Role.members.through) - post_delete.connect(notify_roles, Role.members.through) - post_save.connect(provision_user) - pre_delete.connect(deprovision_user) - post_save.connect(provision_user_on_attribute_value_save, sender=AttributeValue) - post_delete.connect(provision_user_on_attribute_value_delete, sender=AttributeValue) - m2m_changed.connect(provision_user_on_role_change, - sender=Role.members.through) - settings.A2_MANAGER_ROLE_FORM_CLASS = \ - 'hobo.agent.authentic2.role_forms.RoleForm' + from . import provisionning + + engine = provisionning.Provisionning() + pre_save.connect(engine.pre_save) + post_save.connect(engine.post_save) + pre_delete.connect(engine.pre_delete) + m2m_changed.connect(engine.m2m_changed) + provisionning.provisionning = engine diff --git a/hobo/agent/authentic2/middleware.py b/hobo/agent/authentic2/middleware.py new file mode 100644 index 0000000..c8701f3 --- /dev/null +++ b/hobo/agent/authentic2/middleware.py @@ -0,0 +1,14 @@ +from .provisionning import provisionning + + +class ProvisionningMiddleware(object): + def process_request(self, request): + provisionning.start() + + def process_exception(self, request, exception): + provisionning.clean() + + def process_response(self, request, response): + provisionning.provision() + return response + diff --git a/hobo/agent/authentic2/provisionning.py b/hobo/agent/authentic2/provisionning.py new file mode 100644 index 0000000..fa588c3 --- /dev/null +++ b/hobo/agent/authentic2/provisionning.py @@ -0,0 +1,303 @@ +import json +from urlparse import urljoin +import threading +import copy +import logging + +from django.contrib.auth import get_user_model +from django.db import connection +from django.core.urlresolvers import reverse +from django.conf import settings + +from django_rbac.utils import get_role_model, get_ou_model, get_role_parenting_model +from hobo.agent.common import notify_agents +from authentic2.saml.models import LibertyProvider +from authentic2.a2_rbac.models import RoleAttribute +from authentic2.models import AttributeValue + + +class Provisionning(object): + local = threading.local() + threads = set() + + def __init__(self): + self.User = get_user_model() + self.Role = get_role_model() + self.OU = get_ou_model() + self.RoleParenting = get_role_parenting_model() + self.logger = logging.getLogger(__name__) + + def start(self): + self.local.saved = {} + self.local.deleted = {} + + def clean(self): + if hasattr(self.local, 'saved'): + del self.local.saved + if hasattr(self.local, 'deleted'): + del self.local.deleted + + def saved(self, *args): + if not hasattr(self.local, 'saved'): + return + + for instance in args: + klass = self.User if isinstance(instance, self.User) else self.Role + self.local.saved.setdefault(klass, set()).add(instance) + + def deleted(self, *args): + if not hasattr(self.local, 'saved'): + return + + for instance in args: + klass = self.User if isinstance(instance, self.User) else self.Role + self.local.deleted.setdefault(klass, set()).add(instance) + self.local.saved.get(klass, set()).discard(instance) + + def resolve_ou(self, instances, ous): + for instance in instances: + if instance.ou_id in ous: + instance.ou = ous[instance.ou_id] + + def notify_users(self, ous, users, mode='provision'): + self.resolve_ou(users, ous) + + ous = {} + for user in users: + ous.setdefault(user.ou, set()).add(user) + + def user_to_json(service, user): + data = {} + roles = user.roles_and_parents().prefetch_related('attributes') + data.update({ + 'uuid': user.uuid, + 'username': user.username, + 'first_name': user.first_name, + 'last_name': user.last_name, + 'email': user.email, + 'roles': [ + { + 'uuid': role.uuid, + 'name': role.name, + 'slug': role.slug, + } for role in roles], + }) + for av in AttributeValue.objects.with_owner(user): + data[str(av.attribute.name)] = av.to_python() + roles = user.roles_and_parents().prefetch_related('attributes') + # check if user is superuser through a role + role_is_superuser = False + for role in roles: + if role.service_id != service.pk: + continue + for attribute in role.attributes.all(): + if attribute.name == 'is_superuser' and attribute.value == 'true': + role_is_superuser = True + data['is_superuser'] = user.is_superuser or role_is_superuser + return data + + issuer = unicode(self.get_entity_id()) + if mode == 'provision': + for ou, users in ous.iteritems(): + for service, audience in self.get_audience(ou): + self.logger.info(u'provisionning user %s to %s', user, audience) + notify_agents({ + '@type': 'provision', + 'issuer': issuer, + 'audience': [audience], + 'full': False, + 'objects': { + '@type': 'user', + 'data': [user_to_json(service, user)], + } + }) + else: + for ou, users in ous.iteritems(): + audience = [audience for s, audience in self.get_audience(ou)] + self.logger.info(u'deprovisionning users %s from %s', users, audience) + notify_agents({ + '@type': 'deprovision', + 'issuer': issuer, + 'audience': audience, + 'full': False, + 'objects': { + '@type': 'user', + 'data': [{ + 'uuid': user.uuid, + } for user in users] + } + }) + + def notify_roles(self, ous, roles, mode='provision', full=False): + roles = set([role for role in roles if not role.slug.startswith('_')]) + if mode == 'provision': + self.complete_roles(roles) + + if not roles: + return + + self.resolve_ou(roles, ous) + ous = {} + for role in roles: + ous.setdefault(role.ou, []).append(role) + + def helper(ou, roles): + if mode == 'provision': + data = [ + { + 'uuid': role.uuid, + 'name': role.name, + 'slug': role.slug, + 'description': role.description, + 'details': role.details, + 'emails': role.emails, + 'emails_to_members': role.emails_to_members, + } for role in roles + ] + else: + data = [ + { + 'uuid': role.uuid, + } for role in roles + ] + + audience = [entity_id for service, entity_id in self.get_audience(ou)] + self.logger.info(u'%sning roles %s to %s', mode, role, audience) + notify_agents({ + '@type': mode, + 'audience': audience, + 'full': full, + 'objects': { + '@type': 'role', + 'data': data, + } + }) + + global_roles = set(ous.get(None, [])) + for ou, ou_roles in ous.iteritems(): + sent_roles = set(ou_roles) | global_roles + helper(ou, sent_roles) + + def provision(self): + if not getattr(settings, 'HOBO_ROLE_EXPORT', True): + return + # exit early if not started + if not hasattr(self.local, 'saved') or not hasattr(self.local, 'deleted'): + return + + t = threading.Thread(target=self.do_provision, kwargs={ + 'saved': getattr(self.local, 'saved', {}), + 'deleted': getattr(self.local, 'deleted', {}), + }) + t.start() + self.threads.add(t) + self.clean() + + def do_provision(self, saved, deleted, thread=None): + try: + ous = {ou.id: ou for ou in self.OU.objects.all()} + self.notify_roles(ous, saved.get(self.Role, [])) + self.notify_roles(ous, deleted.get(self.Role, []), mode='deprovision') + self.notify_users(ous, saved.get(self.User, [])) + self.notify_users(ous, deleted.get(self.User, []), mode='deprovision') + except Exception: + # last step, clear everything + self.logger.exception(u'error in provisionning thread') + finally: + self.threads.discard(threading.current_thread()) + + def wait(self): + for thread in list(self.threads): + thread.join() + + def __enter__(self): + self.start() + + def __exit__(self, exc_type, exc_value, exc_tb): + if exc_type is None: + self.provision() + self.wait() + else: + self.clean() + + def get_audience(self, ou): + if ou: + qs = LibertyProvider.objects.filter(ou=ou) + else: + qs = LibertyProvider.objects.filter(ou__isnull=True) + return [(service, service.entity_id) for service in qs] + + def complete_roles(self, roles): + for role in roles: + role.emails = [] + role.emails_to_members = False + role.details = u'' + for attribute in role.attributes.all(): + if (attribute.name in ('emails', 'emails_to_members', 'details') + and attribute.kind == 'json'): + setattr(role, attribute.name, json.loads(attribute.value)) + + def get_entity_id(self): + tenant = getattr(connection, 'tenant', None) + assert tenant + base_url = tenant.get_base_url() + return urljoin(base_url, reverse('a2-idp-saml-metadata')) + + def pre_save(self, sender, instance, raw, using, update_fields, **kwargs): + # we skip new instances + if not instance.pk: + return + if not isinstance(instance, (self.User, self.Role, RoleAttribute, AttributeValue)): + return + # ignore last_login update on login + if isinstance(instance, self.User) and update_fields == ['last_login']: + return + if isinstance(instance, RoleAttribute): + instance = instance.role + elif isinstance(instance, AttributeValue): + if not isinstance(instance.owner, self.User): + return + instance = instance.owner + self.saved(instance) + + def post_save(self, sender, instance, created, raw, using, update_fields, **kwargs): + # during post_save we only handle new instances + if isinstance(instance, self.RoleParenting): + self.saved(*list(instance.child.all_members())) + return + if not created: + return + if not isinstance(instance, (self.User, self.Role, RoleAttribute, AttributeValue)): + return + if isinstance(instance, RoleAttribute): + instance = instance.role + elif isinstance(instance, AttributeValue): + if not isinstance(instance.owner, self.User): + return + instance = instance.owner + self.saved(instance) + + def pre_delete(self, sender, instance, using, **kwargs): + if isinstance(instance, (self.User, self.Role)): + self.deleted(copy.copy(instance)) + elif isinstance(instance, RoleAttribute): + instance = instance.role + self.saved(instance) + elif isinstance(instance, AttributeValue): + if not isinstance(instance.owner, self.User): + return + instance = instance.owner + self.saved(instance) + elif isinstance(instance, self.RoleParenting): + self.saved(*list(instance.child.all_members())) + + def m2m_changed(self, sender, instance, action, reverse, model, pk_set, using, **kwargs): + if not action.startswith('post'): + return + if action.endswith('_clear'): + return + if reverse: + self.saved(instance) + else: + for user in model.objects.filter(pk__in=pk_set): + self.saved(user) diff --git a/hobo/multitenant/apps.py b/hobo/multitenant/apps.py index 7bc418f..51e2ff3 100644 --- a/hobo/multitenant/apps.py +++ b/hobo/multitenant/apps.py @@ -19,6 +19,7 @@ class TenantAwareThread(threading.Thread): super(TenantAwareThread, self).run() finally: connection.set_tenant(old_tenant) + connection.close() class _Timer(TenantAwareThread): diff --git a/hobo/multitenant/models.py b/hobo/multitenant/models.py index fc450ba..2fa97c8 100644 --- a/hobo/multitenant/models.py +++ b/hobo/multitenant/models.py @@ -65,8 +65,8 @@ class Tenant(TenantMixin): "the public schema. Current schema is %s." % connection.schema_name) - os.rename(self.get_directory(), self.get_directory()+'.invalid') + os.rename(self.get_directory(), self.get_directory() + '.invalid') - if schema_exists(self.schema_name) and (self.auto_drop_schema or force_drop) and not django_is_in_test_mode(): + if schema_exists(self.schema_name) and (self.auto_drop_schema or force_drop): cursor = connection.cursor() cursor.execute('DROP SCHEMA %s CASCADE' % self.schema_name) diff --git a/tests_authentic/conftest.py b/tests_authentic/conftest.py index a37ba62..2f82b9b 100644 --- a/tests_authentic/conftest.py +++ b/tests_authentic/conftest.py @@ -5,19 +5,23 @@ import json import pytest + @pytest.fixture def tenant_base(request, settings): base = tempfile.mkdtemp('combo-tenant-base') settings.TENANT_BASE = base + def fin(): shutil.rmtree(base) request.addfinalizer(fin) return base + @pytest.fixture(scope='function') -def tenant(db, request, settings, tenant_base): +def tenant(transactional_db, request, settings, tenant_base): from hobo.multitenant.models import Tenant base = tenant_base + @pytest.mark.django_db def make_tenant(name): tenant_dir = os.path.join(base, name) @@ -54,6 +58,7 @@ def tenant(db, request, settings, tenant_base): t.create_schema() return t tenants = [make_tenant('authentic.example.net')] + def fin(): from django.db import connection connection.set_schema_to_public() diff --git a/tests_authentic/test_hobo_deploy.py b/tests_authentic/test_hobo_deploy.py index 9ced6e9..22b9580 100644 --- a/tests_authentic/test_hobo_deploy.py +++ b/tests_authentic/test_hobo_deploy.py @@ -47,7 +47,7 @@ def test_hobo_deploy(tenant_base, settings, mocker, skeleton_dir): # As a user is created, notify_agents is called, as celery is not running # we just block it - mocker.patch('hobo.agent.authentic2.apps.notify_agents') + mocker.patch('hobo.agent.authentic2.provisionning.notify_agents') requests_get = mocker.patch('requests.get') meta1 = '''