Projet

Général

Profil

0001-agent-a2-prevent-useless-thread-launching-34484.patch

Benjamin Dauvergne, 04 juillet 2019 00:04

Télécharger (14,5 ko)

Voir les différences:

Subject: [PATCH] agent/a2: prevent useless thread launching (#34484)

 hobo/agent/authentic2/middleware.py    |   5 +-
 hobo/agent/authentic2/provisionning.py | 170 ++++++++++++++-----------
 2 files changed, 100 insertions(+), 75 deletions(-)
hobo/agent/authentic2/middleware.py
6 6
        provisionning.start()
7 7

  
8 8
    def process_exception(self, request, exception):
9
        provisionning.clean()
9
        provisionning.stop(provision=False)
10 10

  
11 11
    def process_response(self, request, response):
12
        provisionning.provision()
12
        provisionning.provision(provision=True, wait=False)
13 13
        return response
14

  
hobo/agent/authentic2/provisionning.py
15 15
from authentic2.a2_rbac.models import RoleAttribute
16 16
from authentic2.models import AttributeValue
17 17

  
18
User = get_user_model()
19
Role = get_role_model()
20
OU = get_ou_model()
21
RoleParenting = get_role_parenting_model()
18 22

  
19
class Provisionning(object):
20
    local = threading.local()
23
logger = logging.getLogger(__name__)
24

  
25

  
26
class Provisionning(threading.local):
27
    __slots__ = ['threads']
21 28
    threads = set()
22 29

  
23 30
    def __init__(self):
24
        self.User = get_user_model()
25
        self.Role = get_role_model()
26
        self.OU = get_ou_model()
27
        self.RoleParenting = get_role_parenting_model()
28
        self.logger = logging.getLogger(__name__)
31
        self.stack = []
29 32

  
30 33
    def start(self):
31
        self.local.saved = {}
32
        self.local.deleted = {}
34
        self.stack.append({
35
            'saved': {},
36
            'deleted': {},
37
        })
38

  
39
    def stop(self, provision=True, wait=True):
40
        context = self.stack.pop()
33 41

  
34
    def clean(self):
35
        if hasattr(self.local, 'saved'):
36
            del self.local.saved
37
        if hasattr(self.local, 'deleted'):
38
            del self.local.deleted
42
        if provision:
43
            self.provision(**context)
44
            if wait:
45
                self.wait()
39 46

  
40
    def saved(self, *args):
41
        if not hasattr(self.local, 'saved'):
47
    @property
48
    def saved(self):
49
        if self.stack:
50
            return self.stack[-1]['saved']
51
        return None
52

  
53
    @property
54
    def deleted(self):
55
        if self.stack:
56
            return self.stack[-1]['deleted']
57
        return None
58

  
59
    def add_saved(self, *args):
60
        if not self.stack:
42 61
            return
43 62

  
44 63
        for instance in args:
45
            klass = self.User if isinstance(instance, self.User) else self.Role
46
            self.local.saved.setdefault(klass, set()).add(instance)
64
            klass = User if isinstance(instance, User) else Role
65
            self.saved.setdefault(klass, set()).add(instance)
47 66

  
48
    def deleted(self, *args):
49
        if not hasattr(self.local, 'saved'):
67
    def add_deleted(self, *args):
68
        if not self.stack:
50 69
            return
51 70

  
52 71
        for instance in args:
53
            klass = self.User if isinstance(instance, self.User) else self.Role
54
            self.local.deleted.setdefault(klass, set()).add(instance)
55
            self.local.saved.get(klass, set()).discard(instance)
72
            klass = User if isinstance(instance, User) else Role
73
            self.deleted.setdefault(klass, set()).add(instance)
74
            self.saved.get(klass, set()).discard(instance)
56 75

  
57 76
    def resolve_ou(self, instances, ous):
58 77
        for instance in instances:
......
61 80

  
62 81
    def notify_users(self, ous, users, mode='provision'):
63 82
        if mode == 'provision':
64
            users = (self.User.objects.filter(id__in=[u.id for u in users])
83
            users = (User.objects.filter(id__in=[u.id for u in users])
65 84
                     .select_related('ou').prefetch_related('attribute_values__attribute'))
66 85
        else:
67 86
            self.resolve_ou(users, ous)
......
105 124
            # Find roles giving a superuser attribute
106 125
            # If there is any role of this kind, we do one provisionning message for each user and
107 126
            # each service.
108
            roles_with_attributes = (self.Role.objects.filter(members__in=users)
127
            roles_with_attributes = (Role.objects.filter(members__in=users)
109 128
                                     .parents(include_self=True)
110 129
                                     .filter(attributes__name='is_superuser')
111 130
                                     .exists())
112 131

  
113
            all_roles = (self.Role.objects.filter(members__in=users).parents()
132
            all_roles = (Role.objects.filter(members__in=users).parents()
114 133
                         .prefetch_related('attributes').distinct())
115 134
            roles = dict((r.id, r) for r in all_roles)
116 135
            user_roles = {}
117 136
            parents = {}
118
            for rp in self.RoleParenting.objects.filter(child__in=all_roles):
137
            for rp in RoleParenting.objects.filter(child__in=all_roles):
119 138
                parents.setdefault(rp.child.id, []).append(rp.parent.id)
120
            Through = self.Role.members.through
139
            Through = Role.members.through
121 140
            for u_id, r_id in Through.objects.filter(role__members__in=users).values_list('user_id',
122 141
                                                                                      'role_id'):
123 142
                user_roles.setdefault(u_id, set()).add(roles[r_id])
......
133 152
                for ou, users in ous.iteritems():
134 153
                    for service, audience in self.get_audience(ou):
135 154
                        for user in users:
136
                            self.logger.info(u'provisionning user %s to %s', user, audience)
155
                            logger.info(u'provisionning user %s to %s', user, audience)
137 156
                            notify_agents({
138 157
                                '@type': 'provision',
139 158
                                'issuer': issuer,
......
149 168
                    audience = [a for service, a in self.get_audience(ou)]
150 169
                    if not audience:
151 170
                        continue
152
                    self.logger.info(u'provisionning users %s to %s',
171
                    logger.info(u'provisionning users %s to %s',
153 172
                                     u', '.join(map(unicode, users)), u', '.join(audience))
154 173
                    notify_agents({
155 174
                        '@type': 'provision',
......
162 181
                        }
163 182
                    })
164 183
        elif users:
165
            audience = [audience for ou in self.OU.objects.all()
184
            audience = [audience for ou in OU.objects.all()
166 185
                        for s, audience in self.get_audience(ou)]
167
            self.logger.info(u'deprovisionning users %s from %s', u', '.join(map(unicode, users)),
186
            logger.info(u'deprovisionning users %s from %s', u', '.join(map(unicode, users)),
168 187
                             u', '.join(audience))
169 188
            notify_agents({
170 189
                '@type': 'deprovision',
......
213 232
                ]
214 233

  
215 234
            audience = [entity_id for service, entity_id in self.get_audience(ou)]
216
            self.logger.info(u'%sning roles %s to %s', mode, roles, audience)
235
            logger.info(u'%sning roles %s to %s', mode, roles, audience)
217 236
            notify_agents({
218 237
                '@type': mode,
219 238
                'audience': audience,
......
229 248
            sent_roles = set(ou_roles) | global_roles
230 249
            helper(ou, sent_roles)
231 250

  
232
    def provision(self):
251
    def provision(self, saved, deleted):
252
        # Returns if:
253
        # - we are not in a tenant
254
        # - provsionning is disabled
255
        # - there is nothing to do
233 256
        if (not hasattr(connection, 'tenant') or not connection.tenant or not
234 257
                hasattr(connection.tenant, 'domain_url')):
235 258
            return
236 259
        if not getattr(settings, 'HOBO_ROLE_EXPORT', True):
237 260
            return
238
        # exit early if not started
239
        if not hasattr(self.local, 'saved') or not hasattr(self.local, 'deleted'):
261
        if not (saved or deleted):
240 262
            return
241 263

  
242
        t = threading.Thread(target=self.do_provision, kwargs={
243
            'saved': getattr(self.local, 'saved', {}),
244
            'deleted': getattr(self.local, 'deleted', {}),
245
        })
264
        t = threading.Thread(
265
            target=self.do_provision,
266
            kwargs={'saved': saved, 'deleted': deleted})
246 267
        t.start()
247 268
        self.threads.add(t)
248 269

  
249 270
    def do_provision(self, saved, deleted, thread=None):
250 271
        try:
251
            ous = {ou.id: ou for ou in self.OU.objects.all()}
252
            self.notify_roles(ous, saved.get(self.Role, []))
253
            self.notify_roles(ous, deleted.get(self.Role, []), mode='deprovision')
254
            self.notify_users(ous, saved.get(self.User, []))
255
            self.notify_users(ous, deleted.get(self.User, []), mode='deprovision')
272
            ous = {ou.id: ou for ou in OU.objects.all()}
273
            self.notify_roles(ous, saved.get(Role, []))
274
            self.notify_roles(ous, deleted.get(Role, []), mode='deprovision')
275
            self.notify_users(ous, saved.get(User, []))
276
            self.notify_users(ous, deleted.get(User, []), mode='deprovision')
256 277
        except Exception:
257 278
            # last step, clear everything
258
            self.logger.exception(u'error in provisionning thread')
279
            logger.exception(u'error in provisionning thread')
259 280
        finally:
260 281
            self.threads.discard(threading.current_thread())
261 282

  
......
267 288
        self.start()
268 289

  
269 290
    def __exit__(self, exc_type, exc_value, exc_tb):
270
        if exc_type is None:
271
            self.provision()
272
            self.clean()
273
            self.wait()
274
        else:
275
            self.clean()
291
        if not self.stack:
292
            return
293
        self.stop(provision=exc_type is None)
276 294

  
277 295
    def get_audience(self, ou):
278 296
        if ou:
......
298 316
        return urljoin(base_url, reverse('a2-idp-saml-metadata'))
299 317

  
300 318
    def pre_save(self, sender, instance, raw, using, update_fields, **kwargs):
319
        if not self.stack:
320
            return
301 321
        # we skip new instances
302 322
        if not instance.pk:
303 323
            return
304
        if not isinstance(instance, (self.User, self.Role, RoleAttribute, AttributeValue)):
324
        if not isinstance(instance, (User, Role, RoleAttribute, AttributeValue)):
305 325
            return
306 326
        # ignore last_login update on login
307
        if isinstance(instance, self.User) and update_fields == ['last_login']:
327
        if isinstance(instance, User) and update_fields == ['last_login']:
308 328
            return
309 329
        if isinstance(instance, RoleAttribute):
310 330
            instance = instance.role
311 331
        elif isinstance(instance, AttributeValue):
312
            if not isinstance(instance.owner, self.User):
332
            if not isinstance(instance.owner, User):
313 333
                return
314 334
            instance = instance.owner
315
        self.saved(instance)
335
        self.add_saved(instance)
316 336

  
317 337
    def post_save(self, sender, instance, created, raw, using, update_fields, **kwargs):
338
        if not self.stack:
339
            return
318 340
        # during post_save we only handle new instances
319
        if isinstance(instance, self.RoleParenting):
320
            self.saved(*list(instance.child.all_members()))
341
        if isinstance(instance, RoleParenting):
342
            self.add_saved(*list(instance.child.all_members()))
321 343
            return
322 344
        if not created:
323 345
            return
324
        if not isinstance(instance, (self.User, self.Role, RoleAttribute, AttributeValue)):
346
        if not isinstance(instance, (User, Role, RoleAttribute, AttributeValue)):
325 347
            return
326 348
        if isinstance(instance, RoleAttribute):
327 349
            instance = instance.role
328 350
        elif isinstance(instance, AttributeValue):
329
            if not isinstance(instance.owner, self.User):
351
            if not isinstance(instance.owner, User):
330 352
                return
331 353
            instance = instance.owner
332
        self.saved(instance)
354
        self.add_saved(instance)
333 355

  
334 356
    def pre_delete(self, sender, instance, using, **kwargs):
335
        if isinstance(instance, (self.User, self.Role)):
336
            self.deleted(copy.copy(instance))
357
        if not self.stack:
358
            return
359
        if isinstance(instance, (User, Role)):
360
            self.add_deleted(copy.copy(instance))
337 361
        elif isinstance(instance, RoleAttribute):
338 362
            instance = instance.role
339
            self.saved(instance)
363
            self.add_saved(instance)
340 364
        elif isinstance(instance, AttributeValue):
341
            if not isinstance(instance.owner, self.User):
365
            if not isinstance(instance.owner, User):
342 366
                return
343 367
            instance = instance.owner
344
            self.saved(instance)
345
        elif isinstance(instance, self.RoleParenting):
346
            self.saved(*list(instance.child.all_members()))
368
            self.add_saved(instance)
369
        elif isinstance(instance, RoleParenting):
370
            self.add_saved(*list(instance.child.all_members()))
347 371

  
348 372
    def m2m_changed(self, sender, instance, action, reverse, model, pk_set, using, **kwargs):
373
        if not self.stack:
374
            return
349 375
        if action != 'pre_clear' and action.startswith('pre_'):
350 376
            return
351
        if sender is self.Role.members.through:
352
            self.saved(instance)
377
        if sender is Role.members.through:
378
            self.add_saved(instance)
353 379
            # on a clear, pk_set is None
354 380
            for other_instance in model.objects.filter(pk__in=pk_set or []):
355
                self.saved(other_instance)
381
                self.add_saved(other_instance)
356 382
            if action == 'pre_clear':
357 383
                # when the action is pre_clear we need to lookup the current value of the members
358 384
                # relation, to re-provision all previously enroled users.
359 385
                if not reverse:
360 386
                    for other_instance in instance.members.all():
361
                        self.saved(other_instance)
387
                        self.add_saved(other_instance)
362
-