Projet

Général

Profil

0003-users-add-cronjob-to-delete-users-24430.patch

Benjamin Dauvergne, 04 février 2020 16:54

Télécharger (5,75 ko)

Voir les différences:

Subject: [PATCH 3/4] users: add cronjob to delete users (#24430)

 tests/test_users.py | 41 +++++++++++++++++++++++++++++++++++++++-
 wcs/publisher.py    |  6 ++++++
 wcs/sql.py          | 12 ++++++++++++
 wcs/users.py        | 46 +++++++++++++++++++++++++++++++++++++++++++++
 4 files changed, 104 insertions(+), 1 deletion(-)
tests/test_users.py
7 7

  
8 8
import pytest
9 9

  
10
from quixote import cleanup
10
from quixote import cleanup, get_publisher
11 11

  
12 12
from wcs import publisher
13 13
from wcs import fields
......
78 78

  
79 79
    with pytest.raises(AttributeError):
80 80
        user.xxx
81

  
82

  
83
def test_clean_deleted_users():
84
    from wcs.formdef import FormDef
85
    from wcs.qommon import storage as st
86

  
87
    User = pub.user_class
88

  
89
    User.wipe()
90
    FormDef.wipe()
91

  
92
    formdef = FormDef()
93
    formdef.name = 'foobar'
94
    formdef.url_name = 'foobar'
95
    formdef.fields = []
96
    formdef.store()
97
    data_class = formdef.data_class()
98

  
99
    user1 = User()
100
    user1.name = 'Pierre'
101
    user1.deleted = True
102
    user1.store()
103

  
104
    user2 = User()
105
    user2.name = 'Jean'
106
    user2.deleted = True
107
    user2.store()
108

  
109
    formdata1 = data_class()
110
    formdata1.user_id = user1.id
111
    formdata1.store()
112

  
113
    assert User.count() == 2
114

  
115
    get_publisher().clean_deleted_users()
116

  
117
    assert User.count() == 1
118
    assert len(User.select([st.Equal('name', 'Pierre')])) == 1
119
    assert len(User.select([st.Equal('name', 'Jean')])) == 0
wcs/publisher.py
124 124
        super(WcsPublisher, cls).register_cronjobs()
125 125
        # every hour: check for global action timeouts
126 126
        cls.register_cronjob(CronJob(cls.apply_global_action_timeouts, minutes=[0]))
127
        cls.register_cronjob(CronJob(cls.clean_deleted_users, minutes=[0]))
127 128

  
128 129
    def is_using_postgresql(self):
129 130
        return bool(self.has_site_option('postgresql') and self.cfg.get('postgresql', {}))
......
359 360
            from . import sql
360 361
            sql.cleanup_connection()
361 362

  
363
    def clean_deleted_users(self):
364
        for user_id in self.user_class.get_to_delete_ids():
365
            self.user_class.remove_object(user_id)
366

  
367

  
362 368
set_publisher_class(WcsPublisher)
363 369
WcsPublisher.register_extra_dir(os.path.join(os.path.dirname(__file__), 'extra'))
364 370

  
wcs/sql.py
1960 1960

  
1961 1961
        return objects
1962 1962

  
1963
    @classmethod
1964
    def get_to_delete_ids(cls):
1965
        from .formdef import FormDef
1966

  
1967
        deleted_ids = set(str(user.id) for user in cls.select([Equal('deleted', True)]))
1968
        for formdef in FormDef.select():
1969
            for formdata in formdef.data_class().select([Contains('user_id', deleted_ids)], iterator=True):
1970
                user_id = str(formdata.user_id)
1971
                if user_id in deleted_ids:
1972
                    deleted_ids.remove(str(formdata.user_id))
1973
        return deleted_ids
1974

  
1963 1975

  
1964 1976
class Session(SqlMixin, wcs.sessions.BasicSession):
1965 1977
    _table_name = 'sessions'
wcs/users.py
254 254
        self.form_data = {}
255 255
        self.store()
256 256

  
257
    @classmethod
258
    def get_to_delete_ids(cls):
259
        from .formdef import FormDef
260

  
261
        deleted_ids = set(str(user.id) for user in cls.select([st.Equal('deleted', True)]))
262
        for formdef in FormDef.select():
263
            data_class = formdef.data_class()
264
            formdatas = data_class.select(
265
                [st.Contains('user_id', deleted_ids)],
266
                iterator=True)
267
            for formdata in formdatas:
268
                user_id = str(formdata.user_id)
269
                if user_id in deleted_ids:
270
                    deleted_ids.remove(str(formdata.user_id))
271
        return deleted_ids
272

  
273
    @classmethod
274
    def clean_deleted_users_sql(cls):
275
        from wcs.sql import AnyFormData
276
        from wcs.formdef import FormDef
277

  
278
        deleted = cls.select([st.Equal('deleted', True)])
279
        deleted_ids = set(str(u.id) for u in deleted)
280
        active_ids = set()
281
        if FormDef.count():
282
            # without any FormDef global views are not created.
283
            active_ids = set(f.user_id for f in AnyFormData.select(
284
                [st.Contains('user_id', deleted_ids)], iterator=True))
285
        to_delete_ids = deleted_ids - active_ids
286
        for user_id in to_delete_ids:
287
            cls.remove_object(user_id)
288

  
289
    @classmethod
290
    def clean_deleted_users_pickle(cls):
291
        from wcs.formdef import FormDef
292

  
293
        deleted = cls.select([st.Equal('deleted', True)])
294
        deleted_ids = set(str(u.id) for u in deleted)
295
        active_ids = set()
296
        for formdef in FormDef.select():
297
            for formdata in formdef.data_class().select([st.Contains('user_id', deleted_ids)], iterator=True):
298
                active_ids.add(str(formdata.user_id))
299
        to_delete_ids = deleted_ids - active_ids
300
        for user_id in to_delete_ids:
301
            cls.remove_object(user_id)
302

  
257 303

  
258 304
Substitutions.register('session_user_display_name', category=N_('User'), comment=N_('Session User Display Name'))
259 305
Substitutions.register('session_user_email', category=N_('User'), comment=N_('Session User Email'))
260
-