0003-users-add-cronjob-to-delete-users-24430.patch
tests/test_users.py | ||
---|---|---|
7 | 7 | |
8 | 8 |
import pytest |
9 | 9 | |
10 |
from quixote import cleanup |
|
10 |
from quixote import cleanup, get_publisher
|
|
11 | 11 | |
12 | 12 |
from wcs import publisher |
13 | 13 |
from wcs import fields |
... | ... | |
78 | 78 | |
79 | 79 |
with pytest.raises(AttributeError): |
80 | 80 |
user.xxx |
81 | ||
82 | ||
83 |
def test_clean_deleted_users(): |
|
84 |
from wcs.formdef import FormDef |
|
85 |
from wcs.qommon import storage as st |
|
86 | ||
87 |
User = pub.user_class |
|
88 | ||
89 |
User.wipe() |
|
90 |
FormDef.wipe() |
|
91 | ||
92 |
formdef = FormDef() |
|
93 |
formdef.name = 'foobar' |
|
94 |
formdef.url_name = 'foobar' |
|
95 |
formdef.fields = [] |
|
96 |
formdef.store() |
|
97 |
data_class = formdef.data_class() |
|
98 | ||
99 |
user1 = User() |
|
100 |
user1.name = 'Pierre' |
|
101 |
user1.deleted = True |
|
102 |
user1.store() |
|
103 | ||
104 |
user2 = User() |
|
105 |
user2.name = 'Jean' |
|
106 |
user2.deleted = True |
|
107 |
user2.store() |
|
108 | ||
109 |
formdata1 = data_class() |
|
110 |
formdata1.user_id = user1.id |
|
111 |
formdata1.store() |
|
112 | ||
113 |
assert User.count() == 2 |
|
114 | ||
115 |
get_publisher().clean_deleted_users() |
|
116 | ||
117 |
assert User.count() == 1 |
|
118 |
assert len(User.select([st.Equal('name', 'Pierre')])) == 1 |
|
119 |
assert len(User.select([st.Equal('name', 'Jean')])) == 0 |
wcs/publisher.py | ||
---|---|---|
124 | 124 |
super(WcsPublisher, cls).register_cronjobs() |
125 | 125 |
# every hour: check for global action timeouts |
126 | 126 |
cls.register_cronjob(CronJob(cls.apply_global_action_timeouts, minutes=[0])) |
127 |
cls.register_cronjob(CronJob(cls.clean_deleted_users, minutes=[0])) |
|
127 | 128 | |
128 | 129 |
def is_using_postgresql(self): |
129 | 130 |
return bool(self.has_site_option('postgresql') and self.cfg.get('postgresql', {})) |
... | ... | |
359 | 360 |
from . import sql |
360 | 361 |
sql.cleanup_connection() |
361 | 362 | |
363 |
def clean_deleted_users(self): |
|
364 |
for user_id in self.user_class.get_to_delete_ids(): |
|
365 |
self.user_class.remove_object(user_id) |
|
366 | ||
367 | ||
362 | 368 |
set_publisher_class(WcsPublisher) |
363 | 369 |
WcsPublisher.register_extra_dir(os.path.join(os.path.dirname(__file__), 'extra')) |
364 | 370 |
wcs/sql.py | ||
---|---|---|
1960 | 1960 | |
1961 | 1961 |
return objects |
1962 | 1962 | |
1963 |
@classmethod |
|
1964 |
def get_to_delete_ids(cls): |
|
1965 |
from .formdef import FormDef |
|
1966 | ||
1967 |
deleted_ids = set(str(user.id) for user in cls.select([Equal('deleted', True)])) |
|
1968 |
for formdef in FormDef.select(): |
|
1969 |
for formdata in formdef.data_class().select([Contains('user_id', deleted_ids)], iterator=True): |
|
1970 |
user_id = str(formdata.user_id) |
|
1971 |
if user_id in deleted_ids: |
|
1972 |
deleted_ids.remove(str(formdata.user_id)) |
|
1973 |
return deleted_ids |
|
1974 | ||
1963 | 1975 | |
1964 | 1976 |
class Session(SqlMixin, wcs.sessions.BasicSession): |
1965 | 1977 |
_table_name = 'sessions' |
wcs/users.py | ||
---|---|---|
254 | 254 |
self.form_data = {} |
255 | 255 |
self.store() |
256 | 256 | |
257 |
@classmethod |
|
258 |
def get_to_delete_ids(cls): |
|
259 |
from .formdef import FormDef |
|
260 | ||
261 |
deleted_ids = set(str(user.id) for user in cls.select([st.Equal('deleted', True)])) |
|
262 |
for formdef in FormDef.select(): |
|
263 |
data_class = formdef.data_class() |
|
264 |
formdatas = data_class.select( |
|
265 |
[st.Contains('user_id', deleted_ids)], |
|
266 |
iterator=True) |
|
267 |
for formdata in formdatas: |
|
268 |
user_id = str(formdata.user_id) |
|
269 |
if user_id in deleted_ids: |
|
270 |
deleted_ids.remove(str(formdata.user_id)) |
|
271 |
return deleted_ids |
|
272 | ||
273 |
@classmethod |
|
274 |
def clean_deleted_users_sql(cls): |
|
275 |
from wcs.sql import AnyFormData |
|
276 |
from wcs.formdef import FormDef |
|
277 | ||
278 |
deleted = cls.select([st.Equal('deleted', True)]) |
|
279 |
deleted_ids = set(str(u.id) for u in deleted) |
|
280 |
active_ids = set() |
|
281 |
if FormDef.count(): |
|
282 |
# without any FormDef global views are not created. |
|
283 |
active_ids = set(f.user_id for f in AnyFormData.select( |
|
284 |
[st.Contains('user_id', deleted_ids)], iterator=True)) |
|
285 |
to_delete_ids = deleted_ids - active_ids |
|
286 |
for user_id in to_delete_ids: |
|
287 |
cls.remove_object(user_id) |
|
288 | ||
289 |
@classmethod |
|
290 |
def clean_deleted_users_pickle(cls): |
|
291 |
from wcs.formdef import FormDef |
|
292 | ||
293 |
deleted = cls.select([st.Equal('deleted', True)]) |
|
294 |
deleted_ids = set(str(u.id) for u in deleted) |
|
295 |
active_ids = set() |
|
296 |
for formdef in FormDef.select(): |
|
297 |
for formdata in formdef.data_class().select([st.Contains('user_id', deleted_ids)], iterator=True): |
|
298 |
active_ids.add(str(formdata.user_id)) |
|
299 |
to_delete_ids = deleted_ids - active_ids |
|
300 |
for user_id in to_delete_ids: |
|
301 |
cls.remove_object(user_id) |
|
302 | ||
257 | 303 | |
258 | 304 |
Substitutions.register('session_user_display_name', category=N_('User'), comment=N_('Session User Display Name')) |
259 | 305 |
Substitutions.register('session_user_email', category=N_('User'), comment=N_('Session User Email')) |
260 |
- |