0003-users-add-cronjob-to-delete-users-24430.patch
tests/test_users.py | ||
---|---|---|
77 | 77 | |
78 | 78 |
with pytest.raises(AttributeError): |
79 | 79 |
user.xxx |
80 | ||
81 | ||
82 |
def test_clean_deleted_users(): |
|
83 |
from wcs.formdef import FormDef |
|
84 |
from wcs.qommon import storage as st |
|
85 | ||
86 |
User = pub.user_class |
|
87 | ||
88 |
User.wipe() |
|
89 |
FormDef.wipe() |
|
90 | ||
91 |
formdef = FormDef() |
|
92 |
formdef.name = 'foobar' |
|
93 |
formdef.url_name = 'foobar' |
|
94 |
formdef.fields = [] |
|
95 |
formdef.store() |
|
96 |
data_class = formdef.data_class() |
|
97 | ||
98 |
user1 = User() |
|
99 |
user1.name = 'Pierre' |
|
100 |
user1.deleted = True |
|
101 |
user1.store() |
|
102 | ||
103 |
user2 = User() |
|
104 |
user2.name = 'Jean' |
|
105 |
user2.deleted = True |
|
106 |
user2.store() |
|
107 | ||
108 |
formdata1 = data_class() |
|
109 |
formdata1.user_id = user1.id |
|
110 |
formdata1.store() |
|
111 | ||
112 |
assert User.count() == 2 |
|
113 | ||
114 |
User.clean_deleted_users() |
|
115 | ||
116 |
assert User.count() == 1 |
|
117 |
assert len(User.select([st.Equal('name', 'Pierre')])) == 1 |
|
118 |
assert len(User.select([st.Equal('name', 'Jean')])) == 0 |
wcs/formdef.py | ||
---|---|---|
1040 | 1040 |
roles_node = tree.find(node_name) |
1041 | 1041 |
roles = [] |
1042 | 1042 |
setattr(formdef, attr_name, roles) |
1043 |
for child in roles_node.getchildren():
|
|
1043 |
for child in roles_node: |
|
1044 | 1044 |
role_id = None |
1045 | 1045 |
value = child.text.encode(charset) |
1046 | 1046 |
if value.startswith('_') or value == 'logged-users': |
... | ... | |
1061 | 1061 |
if tree.find('roles') is not None: |
1062 | 1062 |
roles_node = tree.find('roles') |
1063 | 1063 |
formdef.workflow_roles = {} |
1064 |
for child in roles_node.getchildren():
|
|
1064 |
for child in roles_node: |
|
1065 | 1065 |
role_key = child.attrib['role_key'] |
1066 | 1066 |
role_id = None |
1067 | 1067 |
value = child.text.encode(charset) |
... | ... | |
1083 | 1083 |
if tree.find('geolocations') is not None: |
1084 | 1084 |
geolocations_node = tree.find('geolocations') |
1085 | 1085 |
formdef.geolocations = {} |
1086 |
for child in geolocations_node.getchildren():
|
|
1086 |
for child in geolocations_node: |
|
1087 | 1087 |
geoloc_key = child.attrib['key'] |
1088 | 1088 |
geoloc_value = child.text.encode(charset) |
1089 | 1089 |
formdef.geolocations[geoloc_key] = geoloc_value |
... | ... | |
1091 | 1091 |
if tree.find('required_authentication_contexts') is not None: |
1092 | 1092 |
node = tree.find('required_authentication_contexts') |
1093 | 1093 |
formdef.required_authentication_contexts = [] |
1094 |
for child in node.getchildren():
|
|
1094 |
for child in node: |
|
1095 | 1095 |
formdef.required_authentication_contexts.append(str(child.text)) |
1096 | 1096 | |
1097 | 1097 |
return formdef |
wcs/users.py | ||
---|---|---|
14 | 14 |
# You should have received a copy of the GNU General Public License |
15 | 15 |
# along with this program; if not, see <http://www.gnu.org/licenses/>. |
16 | 16 | |
17 |
from quixote import get_publisher |
|
18 | ||
17 | 19 |
from qommon import _ |
18 | 20 |
from qommon.misc import simplify |
19 | 21 |
from qommon.storage import StorableObject |
20 | 22 |
from qommon import get_cfg |
23 |
from qommon.cron import CronJob |
|
21 | 24 |
import wcs.qommon.storage as st |
22 | 25 | |
23 | 26 |
from qommon.substitution import Substitutions, invalidate_substitution_cache |
27 |
from qommon.publisher import get_publisher_class |
|
28 | ||
24 | 29 | |
25 | 30 |
class User(StorableObject): |
26 | 31 |
_names = 'users' |
... | ... | |
221 | 226 |
def get_full_name(self): |
222 | 227 |
return self.display_name |
223 | 228 | |
229 |
@classmethod |
|
230 |
def clean_deleted_users(cls): |
|
231 |
# really delete users without any form |
|
232 |
if get_publisher().is_using_postgresql(): |
|
233 |
cls.clean_deleted_users_sql() |
|
234 |
else: |
|
235 |
cls.clean_deleted_users_pickle() |
|
236 | ||
237 |
@classmethod |
|
238 |
def clean_deleted_users_sql(cls): |
|
239 |
from wcs.sql import AnyFormData |
|
240 |
deleted = cls.select([st.Equal('deleted', True)]) |
|
241 |
deleted_ids = set(u.id for u in deleted) |
|
242 |
active_ids = set(f.user_id for f in AnyFormData.select([st.Contains('user_id', deleted_ids)], iterator=True)) |
|
243 |
to_delete_ids = deleted_ids - active_ids |
|
244 |
for user_id in to_delete_ids: |
|
245 |
cls.remove_object(user_id) |
|
246 | ||
247 |
@classmethod |
|
248 |
def clean_deleted_users_pickle(cls): |
|
249 |
from wcs.formdef import FormDef |
|
250 | ||
251 |
deleted = cls.select([st.Equal('deleted', True)]) |
|
252 |
deleted_ids = set(u.id for u in deleted) |
|
253 |
active_ids = set() |
|
254 |
for formdef in FormDef.select(): |
|
255 |
for formdata in formdef.data_class().select([st.Contains('user_id', deleted_ids)], iterator=True): |
|
256 |
active_ids.add(formdata.user_id) |
|
257 |
to_delete_ids = deleted_ids - active_ids |
|
258 |
for user_id in to_delete_ids: |
|
259 |
cls.remove_object(user_id) |
|
260 | ||
224 | 261 | |
225 | 262 |
Substitutions.register('session_user_display_name', category=N_('User'), comment=N_('Session User Display Name')) |
226 | 263 |
Substitutions.register('session_user_email', category=N_('User'), comment=N_('Session User Email')) |
227 | 264 |
Substitutions.register_dynamic_source(User) |
265 | ||
266 |
if get_publisher_class(): |
|
267 |
# clean deleted users hourly |
|
268 |
get_publisher_class().register_cronjob(CronJob(User.clean_deleted_users, minutes=[20], hours=[6])) |
|
228 |
- |