0003-users-add-cronjob-to-delete-users-24430.patch
tests/test_users.py | ||
---|---|---|
7 | 7 | |
8 | 8 |
import pytest |
9 | 9 | |
10 |
from quixote import cleanup |
|
10 |
from quixote import cleanup, get_publisher
|
|
11 | 11 | |
12 | 12 |
from wcs import publisher |
13 | 13 |
from wcs import fields |
... | ... | |
77 | 77 | |
78 | 78 |
with pytest.raises(AttributeError): |
79 | 79 |
user.xxx |
80 | ||
81 | ||
82 |
def test_clean_deleted_users(): |
|
83 |
from wcs.formdef import FormDef |
|
84 |
from wcs.qommon import storage as st |
|
85 | ||
86 |
User = pub.user_class |
|
87 | ||
88 |
User.wipe() |
|
89 |
FormDef.wipe() |
|
90 | ||
91 |
formdef = FormDef() |
|
92 |
formdef.name = 'foobar' |
|
93 |
formdef.url_name = 'foobar' |
|
94 |
formdef.fields = [] |
|
95 |
formdef.store() |
|
96 |
data_class = formdef.data_class() |
|
97 | ||
98 |
user1 = User() |
|
99 |
user1.name = 'Pierre' |
|
100 |
user1.deleted = True |
|
101 |
user1.store() |
|
102 | ||
103 |
user2 = User() |
|
104 |
user2.name = 'Jean' |
|
105 |
user2.deleted = True |
|
106 |
user2.store() |
|
107 | ||
108 |
formdata1 = data_class() |
|
109 |
formdata1.user_id = user1.id |
|
110 |
formdata1.store() |
|
111 | ||
112 |
assert User.count() == 2 |
|
113 | ||
114 |
get_publisher().clean_deleted_users() |
|
115 | ||
116 |
assert User.count() == 1 |
|
117 |
assert len(User.select([st.Equal('name', 'Pierre')])) == 1 |
|
118 |
assert len(User.select([st.Equal('name', 'Jean')])) == 0 |
wcs/publisher.py | ||
---|---|---|
123 | 123 |
super(WcsPublisher, cls).register_cronjobs() |
124 | 124 |
# every hour: check for global action timeouts |
125 | 125 |
cls.register_cronjob(CronJob(cls.apply_global_action_timeouts, minutes=[0])) |
126 |
cls.register_cronjob(CronJob(cls.clean_deleted_users, hourly=True)) |
|
126 | 127 | |
127 | 128 |
def is_using_postgresql(self): |
128 | 129 |
return bool(self.has_site_option('postgresql') and self.cfg.get('postgresql', {})) |
... | ... | |
358 | 359 |
from . import sql |
359 | 360 |
sql.cleanup_connection() |
360 | 361 | |
362 |
def clean_deleted_users(self): |
|
363 |
for user_id in self.user_class.get_to_delete_ids(): |
|
364 |
self.user_class.remove_object(user_id) |
|
365 | ||
366 | ||
361 | 367 |
set_publisher_class(WcsPublisher) |
362 | 368 |
WcsPublisher.register_extra_dir(os.path.join(os.path.dirname(__file__), 'extra')) |
363 | 369 |
wcs/sql.py | ||
---|---|---|
1925 | 1925 | |
1926 | 1926 |
return objects |
1927 | 1927 | |
1928 |
@classmethod |
|
1929 |
def get_to_delete_ids(cls): |
|
1930 |
from .formdef import FormDef |
|
1931 | ||
1932 |
deleted_ids = set(str(user.id) for user in cls.select([Equal('deleted', True)])) |
|
1933 |
for formdef in FormDef.select(): |
|
1934 |
for formdata in formdef.data_class().select([Contains('user_id', deleted_ids)], iterator=True): |
|
1935 |
user_id = str(formdata.user_id) |
|
1936 |
if user_id in deleted_ids: |
|
1937 |
deleted_ids.remove(str(formdata.user_id)) |
|
1938 |
return deleted_ids |
|
1939 | ||
1928 | 1940 | |
1929 | 1941 |
class Session(SqlMixin, wcs.sessions.BasicSession): |
1930 | 1942 |
_table_name = 'sessions' |
wcs/users.py | ||
---|---|---|
253 | 253 |
self.form_data = {} |
254 | 254 |
self.store() |
255 | 255 | |
256 |
@classmethod |
|
257 |
def get_to_delete_ids(cls): |
|
258 |
from .formdef import FormDef |
|
259 | ||
260 |
deleted_ids = set(str(user.id) for user in cls.select([st.Equal('deleted', True)])) |
|
261 |
for formdef in FormDef.select(): |
|
262 |
data_class = formdef.data_class() |
|
263 |
formdatas = data_class.select( |
|
264 |
[st.Contains('user_id', deleted_ids)], |
|
265 |
iterator=True) |
|
266 |
for formdata in formdatas: |
|
267 |
user_id = str(formdata.user_id) |
|
268 |
if user_id in deleted_ids: |
|
269 |
deleted_ids.remove(str(formdata.user_id)) |
|
270 |
return deleted_ids |
|
271 | ||
272 |
@classmethod |
|
273 |
def clean_deleted_users_sql(cls): |
|
274 |
from wcs.sql import AnyFormData |
|
275 |
from wcs.formdef import FormDef |
|
276 | ||
277 |
deleted = cls.select([st.Equal('deleted', True)]) |
|
278 |
deleted_ids = set(str(u.id) for u in deleted) |
|
279 |
active_ids = set() |
|
280 |
if FormDef.count(): |
|
281 |
# without any FormDef global views are not created. |
|
282 |
active_ids = set(f.user_id for f in AnyFormData.select( |
|
283 |
[st.Contains('user_id', deleted_ids)], iterator=True)) |
|
284 |
to_delete_ids = deleted_ids - active_ids |
|
285 |
for user_id in to_delete_ids: |
|
286 |
cls.remove_object(user_id) |
|
287 | ||
288 |
@classmethod |
|
289 |
def clean_deleted_users_pickle(cls): |
|
290 |
from wcs.formdef import FormDef |
|
291 | ||
292 |
deleted = cls.select([st.Equal('deleted', True)]) |
|
293 |
deleted_ids = set(str(u.id) for u in deleted) |
|
294 |
active_ids = set() |
|
295 |
for formdef in FormDef.select(): |
|
296 |
for formdata in formdef.data_class().select([st.Contains('user_id', deleted_ids)], iterator=True): |
|
297 |
active_ids.add(str(formdata.user_id)) |
|
298 |
to_delete_ids = deleted_ids - active_ids |
|
299 |
for user_id in to_delete_ids: |
|
300 |
cls.remove_object(user_id) |
|
301 | ||
256 | 302 | |
257 | 303 |
Substitutions.register('session_user_display_name', category=N_('User'), comment=N_('Session User Display Name')) |
258 | 304 |
Substitutions.register('session_user_email', category=N_('User'), comment=N_('Session User Email')) |
259 |
- |