Projet

Général

Profil

0001-users-add-cronjob-to-delete-users-24430.patch

Benjamin Dauvergne, 24 septembre 2021 10:15

Télécharger (4,59 ko)

Voir les différences:

Subject: [PATCH] users: add cronjob to delete users (#24430)

 tests/test_users.py | 49 ++++++++++++++++++++++++++++++++++++++++++++-
 wcs/publisher.py    |  7 +++++++
 wcs/sql.py          | 19 ++++++++++++++++++
 wcs/users.py        | 19 ++++++++++++++++++
 4 files changed, 93 insertions(+), 1 deletion(-)
tests/test_users.py
1
import datetime
1 2
import shutil
2 3

  
3 4
import pytest
4
from quixote import cleanup
5
from quixote import cleanup, get_publisher
5 6

  
6 7
from wcs import fields
7 8
from wcs.variables import LazyUser
......
92 93
    with pytest.raises(AttributeError):
93 94
        # noqa pylint: disable=pointless-statement
94 95
        user.xxx
96

  
97

  
98
def test_clean_deleted_users():
99
    from wcs.formdef import FormDef
100
    from wcs.qommon import storage as st
101

  
102
    User = pub.user_class
103

  
104
    User.wipe()
105
    FormDef.wipe()
106

  
107
    formdef = FormDef()
108
    formdef.name = 'foobar'
109
    formdef.url_name = 'foobar'
110
    formdef.fields = []
111
    formdef.store()
112
    data_class = formdef.data_class()
113

  
114
    user1 = User()
115
    user1.name = 'Pierre'
116
    user1.deleted_timestamp = datetime.datetime.now()
117
    user1.store()
118

  
119
    user2 = User()
120
    user2.name = 'Jean'
121
    user2.deleted_timestamp = datetime.datetime.now()
122
    user2.store()
123

  
124
    formdata1 = data_class()
125
    formdata1.user_id = user1.id
126
    formdata1.store()
127

  
128
    assert User.count() == 2
129

  
130
    get_publisher().clean_deleted_users()
131

  
132
    assert User.count() == 1
133

  
134
    assert len(User.select([st.Equal('name', 'Pierre')])) == 1
135
    assert len(User.select([st.Equal('name', 'Jean')])) == 0
136

  
137
    data_class.wipe()
138

  
139
    get_publisher().clean_deleted_users()
140

  
141
    assert User.count() == 0
wcs/publisher.py
117 117
        cls.register_cronjob(
118 118
            CronJob(cls.apply_global_action_timeouts, name='evaluate_global_action_timeouts', minutes=[0])
119 119
        )
120
        cls.register_cronjob(CronJob(
121
            cls.clean_deleted_users,
122
            name='clean_deleted_users', minutes=[0]))
120 123
        data_sources.register_cronjob()
121 124
        formdef.register_cronjobs()
122 125

  
......
470 473
            return value_.get_value()
471 474
        return value_
472 475

  
476
    def clean_deleted_users(self):
477
        for user_id in self.user_class.get_to_delete_ids():
478
            self.user_class.remove_object(user_id)
479

  
473 480

  
474 481
set_publisher_class(WcsPublisher)
475 482
WcsPublisher.register_extra_dir(os.path.join(os.path.dirname(__file__), 'extra'))
wcs/sql.py
2600 2600

  
2601 2601
        return objects
2602 2602

  
2603
    @classmethod
2604
    def get_to_delete_ids(cls):
2605
        from .formdef import FormDef
2606

  
2607
        deleted_ids = set(
2608
            user.id
2609
            for user in cls.select([NotNull('deleted_timestamp')])
2610
        )
2611

  
2612
        active_ids = set()
2613
        if FormDef.count():
2614
            # without any FormDef global views are not created.
2615
            active_ids = set(
2616
                formdata.user_id
2617
                for formdata in AnyFormData.select([Contains('user_id', deleted_ids)], iterator=True)
2618
                if formdata.user_id
2619
            )
2620
        return (deleted_ids - active_ids)
2621

  
2603 2622

  
2604 2623
class Role(SqlMixin, wcs.roles.Role):
2605 2624
    _table_name = 'roles'
wcs/users.py
317 317
        self.deleted_timestamp = datetime.datetime.now()
318 318
        self.store()
319 319

  
320
    @classmethod
321
    def get_to_delete_ids(cls):
322
        from .formdef import FormDef
323

  
324
        deleted_ids = set(
325
            user.id
326
            for user in cls.select([st.NotNull('deleted_timestamp')])
327
        )
328

  
329
        active_ids = set()
330
        for formdef in FormDef.select():
331
            data_class = formdef.data_class()
332
            active_ids.update(
333
                formdata.user_id
334
                for formdata in data_class.select(iterator=True)
335
                if formdata.user_id
336
            )
337

  
338
        return (deleted_ids - active_ids)
320 339

  
321 340
Substitutions.register(
322 341
    'session_user_display_name',
323
-