0003-users-add-cronjob-to-delete-users-24430.patch
tests/test_users.py | ||
---|---|---|
7 | 7 | |
8 | 8 |
import pytest |
9 | 9 | |
10 |
from quixote import cleanup |
|
10 |
from quixote import cleanup, get_publisher
|
|
11 | 11 | |
12 | 12 |
from wcs import publisher |
13 | 13 |
from wcs import fields |
... | ... | |
77 | 77 | |
78 | 78 |
with pytest.raises(AttributeError): |
79 | 79 |
user.xxx |
80 | ||
81 | ||
82 |
def test_clean_deleted_users(): |
|
83 |
from wcs.formdef import FormDef |
|
84 |
from wcs.qommon import storage as st |
|
85 | ||
86 |
User = pub.user_class |
|
87 | ||
88 |
User.wipe() |
|
89 |
FormDef.wipe() |
|
90 | ||
91 |
formdef = FormDef() |
|
92 |
formdef.name = 'foobar' |
|
93 |
formdef.url_name = 'foobar' |
|
94 |
formdef.fields = [] |
|
95 |
formdef.store() |
|
96 |
data_class = formdef.data_class() |
|
97 | ||
98 |
user1 = User() |
|
99 |
user1.name = 'Pierre' |
|
100 |
user1.deleted = True |
|
101 |
user1.store() |
|
102 | ||
103 |
user2 = User() |
|
104 |
user2.name = 'Jean' |
|
105 |
user2.deleted = True |
|
106 |
user2.store() |
|
107 | ||
108 |
formdata1 = data_class() |
|
109 |
formdata1.user_id = user1.id |
|
110 |
formdata1.store() |
|
111 | ||
112 |
assert User.count() == 2 |
|
113 | ||
114 |
get_publisher().clean_deleted_users() |
|
115 | ||
116 |
assert User.count() == 1 |
|
117 |
assert len(User.select([st.Equal('name', 'Pierre')])) == 1 |
|
118 |
assert len(User.select([st.Equal('name', 'Jean')])) == 0 |
wcs/publisher.py | ||
---|---|---|
115 | 115 |
super(WcsPublisher, cls).register_cronjobs() |
116 | 116 |
# every hour: check for global action timeouts |
117 | 117 |
cls.register_cronjob(CronJob(cls.apply_global_action_timeouts, hourly=True)) |
118 |
cls.register_cronjob(CronJob(cls.clean_deleted_users, hourly=True)) |
|
118 | 119 | |
119 | 120 |
def is_using_postgresql(self): |
120 | 121 |
return bool(self.has_site_option('postgresql') and self.cfg.get('postgresql', {})) |
... | ... | |
309 | 310 |
import sql |
310 | 311 |
sql.cleanup_connection() |
311 | 312 | |
313 |
def clean_deleted_users(self): |
|
314 |
for user_id in self.user_class.get_to_delete_ids(): |
|
315 |
self.user_class.remove_object(user_id) |
|
316 | ||
317 | ||
312 | 318 |
set_publisher_class(WcsPublisher) |
313 | 319 |
WcsPublisher.register_extra_dir(os.path.join(os.path.dirname(__file__), 'extra')) |
314 | 320 |
wcs/sql.py | ||
---|---|---|
1794 | 1794 | |
1795 | 1795 |
return objects |
1796 | 1796 | |
1797 |
@classmethod |
|
1798 |
def get_to_delete_ids(cls): |
|
1799 |
from .formdef import FormDef |
|
1800 | ||
1801 |
deleted_ids = set(str(user.id) for user in cls.select([Equal('deleted', True)])) |
|
1802 |
for formdef in FormDef.select(): |
|
1803 |
for formdata in formdef.data_class().select([Contains('user_id', deleted_ids)], iterator=True): |
|
1804 |
user_id = str(formdata.user_id) |
|
1805 |
if user_id in deleted_ids: |
|
1806 |
deleted_ids.remove(str(formdata.user_id)) |
|
1807 |
return deleted_ids |
|
1808 | ||
1797 | 1809 | |
1798 | 1810 |
class Session(SqlMixin, wcs.sessions.BasicSession): |
1799 | 1811 |
_table_name = 'sessions' |
wcs/users.py | ||
---|---|---|
14 | 14 |
# You should have received a copy of the GNU General Public License |
15 | 15 |
# along with this program; if not, see <http://www.gnu.org/licenses/>. |
16 | 16 | |
17 |
from quixote import get_publisher |
|
18 | ||
17 | 19 |
from qommon import _ |
18 | 20 |
from qommon.misc import simplify |
19 | 21 |
from qommon.storage import StorableObject |
20 | 22 |
from qommon import get_cfg |
23 |
from qommon.cron import CronJob |
|
21 | 24 |
import wcs.qommon.storage as st |
22 | 25 | |
23 | 26 |
from qommon.substitution import Substitutions, invalidate_substitution_cache |
27 |
from qommon.publisher import get_publisher_class |
|
28 | ||
24 | 29 | |
25 | 30 |
class User(StorableObject): |
26 | 31 |
_names = 'users' |
... | ... | |
221 | 226 |
def get_full_name(self): |
222 | 227 |
return self.display_name |
223 | 228 | |
229 |
@classmethod |
|
230 |
def get_to_delete_ids(cls): |
|
231 |
from .formdef import FormDef |
|
232 | ||
233 |
deleted_ids = set(str(user.id) for user in cls.select([st.Equal('deleted', True)])) |
|
234 |
for formdef in FormDef.select(): |
|
235 |
data_class = formdef.data_class() |
|
236 |
formdatas = data_class.select( |
|
237 |
[st.Contains('user_id', deleted_ids)], |
|
238 |
iterator=True) |
|
239 |
for formdata in formdatas: |
|
240 |
user_id = str(formdata.user_id) |
|
241 |
if user_id in deleted_ids: |
|
242 |
deleted_ids.remove(str(formdata.user_id)) |
|
243 |
return deleted_ids |
|
244 | ||
245 |
@classmethod |
|
246 |
def clean_deleted_users_sql(cls): |
|
247 |
from wcs.sql import AnyFormData |
|
248 |
from wcs.formdef import FormDef |
|
249 | ||
250 |
deleted = cls.select([st.Equal('deleted', True)]) |
|
251 |
deleted_ids = set(str(u.id) for u in deleted) |
|
252 |
active_ids = set() |
|
253 |
if FormDef.count(): |
|
254 |
# without any FormDef global views are not created. |
|
255 |
active_ids = set(f.user_id for f in AnyFormData.select( |
|
256 |
[st.Contains('user_id', deleted_ids)], iterator=True)) |
|
257 |
to_delete_ids = deleted_ids - active_ids |
|
258 |
for user_id in to_delete_ids: |
|
259 |
cls.remove_object(user_id) |
|
260 | ||
261 |
@classmethod |
|
262 |
def clean_deleted_users_pickle(cls): |
|
263 |
from wcs.formdef import FormDef |
|
264 | ||
265 |
deleted = cls.select([st.Equal('deleted', True)]) |
|
266 |
deleted_ids = set(str(u.id) for u in deleted) |
|
267 |
active_ids = set() |
|
268 |
for formdef in FormDef.select(): |
|
269 |
for formdata in formdef.data_class().select([st.Contains('user_id', deleted_ids)], iterator=True): |
|
270 |
active_ids.add(str(formdata.user_id)) |
|
271 |
to_delete_ids = deleted_ids - active_ids |
|
272 |
for user_id in to_delete_ids: |
|
273 |
cls.remove_object(user_id) |
|
274 | ||
224 | 275 | |
225 | 276 |
Substitutions.register('session_user_display_name', category=N_('User'), comment=N_('Session User Display Name')) |
226 | 277 |
Substitutions.register('session_user_email', category=N_('User'), comment=N_('Session User Email')) |
227 |
- |