Projet

Général

Profil

0003-users-add-cronjob-to-delete-users-24430.patch

Benjamin Dauvergne, 04 janvier 2020 18:29

Télécharger (5,75 ko)

Voir les différences:

Subject: [PATCH 3/4] users: add cronjob to delete users (#24430)

 tests/test_users.py | 41 +++++++++++++++++++++++++++++++++++++++-
 wcs/publisher.py    |  6 ++++++
 wcs/sql.py          | 12 ++++++++++++
 wcs/users.py        | 46 +++++++++++++++++++++++++++++++++++++++++++++
 4 files changed, 104 insertions(+), 1 deletion(-)
tests/test_users.py
7 7

  
8 8
import pytest
9 9

  
10
from quixote import cleanup
10
from quixote import cleanup, get_publisher
11 11

  
12 12
from wcs import publisher
13 13
from wcs import fields
......
77 77

  
78 78
    with pytest.raises(AttributeError):
79 79
        user.xxx
80

  
81

  
82
def test_clean_deleted_users():
83
    from wcs.formdef import FormDef
84
    from wcs.qommon import storage as st
85

  
86
    User = pub.user_class
87

  
88
    User.wipe()
89
    FormDef.wipe()
90

  
91
    formdef = FormDef()
92
    formdef.name = 'foobar'
93
    formdef.url_name = 'foobar'
94
    formdef.fields = []
95
    formdef.store()
96
    data_class = formdef.data_class()
97

  
98
    user1 = User()
99
    user1.name = 'Pierre'
100
    user1.deleted = True
101
    user1.store()
102

  
103
    user2 = User()
104
    user2.name = 'Jean'
105
    user2.deleted = True
106
    user2.store()
107

  
108
    formdata1 = data_class()
109
    formdata1.user_id = user1.id
110
    formdata1.store()
111

  
112
    assert User.count() == 2
113

  
114
    get_publisher().clean_deleted_users()
115

  
116
    assert User.count() == 1
117
    assert len(User.select([st.Equal('name', 'Pierre')])) == 1
118
    assert len(User.select([st.Equal('name', 'Jean')])) == 0
wcs/publisher.py
123 123
        super(WcsPublisher, cls).register_cronjobs()
124 124
        # every hour: check for global action timeouts
125 125
        cls.register_cronjob(CronJob(cls.apply_global_action_timeouts, minutes=[0]))
126
        cls.register_cronjob(CronJob(cls.clean_deleted_users, hourly=True))
126 127

  
127 128
    def is_using_postgresql(self):
128 129
        return bool(self.has_site_option('postgresql') and self.cfg.get('postgresql', {}))
......
358 359
            from . import sql
359 360
            sql.cleanup_connection()
360 361

  
362
    def clean_deleted_users(self):
363
        for user_id in self.user_class.get_to_delete_ids():
364
            self.user_class.remove_object(user_id)
365

  
366

  
361 367
set_publisher_class(WcsPublisher)
362 368
WcsPublisher.register_extra_dir(os.path.join(os.path.dirname(__file__), 'extra'))
363 369

  
wcs/sql.py
1925 1925

  
1926 1926
        return objects
1927 1927

  
1928
    @classmethod
1929
    def get_to_delete_ids(cls):
1930
        from .formdef import FormDef
1931

  
1932
        deleted_ids = set(str(user.id) for user in cls.select([Equal('deleted', True)]))
1933
        for formdef in FormDef.select():
1934
            for formdata in formdef.data_class().select([Contains('user_id', deleted_ids)], iterator=True):
1935
                user_id = str(formdata.user_id)
1936
                if user_id in deleted_ids:
1937
                    deleted_ids.remove(str(formdata.user_id))
1938
        return deleted_ids
1939

  
1928 1940

  
1929 1941
class Session(SqlMixin, wcs.sessions.BasicSession):
1930 1942
    _table_name = 'sessions'
wcs/users.py
253 253
        self.form_data = {}
254 254
        self.store()
255 255

  
256
    @classmethod
257
    def get_to_delete_ids(cls):
258
        from .formdef import FormDef
259

  
260
        deleted_ids = set(str(user.id) for user in cls.select([st.Equal('deleted', True)]))
261
        for formdef in FormDef.select():
262
            data_class = formdef.data_class()
263
            formdatas = data_class.select(
264
                [st.Contains('user_id', deleted_ids)],
265
                iterator=True)
266
            for formdata in formdatas:
267
                user_id = str(formdata.user_id)
268
                if user_id in deleted_ids:
269
                    deleted_ids.remove(str(formdata.user_id))
270
        return deleted_ids
271

  
272
    @classmethod
273
    def clean_deleted_users_sql(cls):
274
        from wcs.sql import AnyFormData
275
        from wcs.formdef import FormDef
276

  
277
        deleted = cls.select([st.Equal('deleted', True)])
278
        deleted_ids = set(str(u.id) for u in deleted)
279
        active_ids = set()
280
        if FormDef.count():
281
            # without any FormDef global views are not created.
282
            active_ids = set(f.user_id for f in AnyFormData.select(
283
                [st.Contains('user_id', deleted_ids)], iterator=True))
284
        to_delete_ids = deleted_ids - active_ids
285
        for user_id in to_delete_ids:
286
            cls.remove_object(user_id)
287

  
288
    @classmethod
289
    def clean_deleted_users_pickle(cls):
290
        from wcs.formdef import FormDef
291

  
292
        deleted = cls.select([st.Equal('deleted', True)])
293
        deleted_ids = set(str(u.id) for u in deleted)
294
        active_ids = set()
295
        for formdef in FormDef.select():
296
            for formdata in formdef.data_class().select([st.Contains('user_id', deleted_ids)], iterator=True):
297
                active_ids.add(str(formdata.user_id))
298
        to_delete_ids = deleted_ids - active_ids
299
        for user_id in to_delete_ids:
300
            cls.remove_object(user_id)
301

  
256 302

  
257 303
Substitutions.register('session_user_display_name', category=N_('User'), comment=N_('Session User Display Name'))
258 304
Substitutions.register('session_user_email', category=N_('User'), comment=N_('Session User Email'))
259
-