0001-misc-store-Role-also-in-SQL-51772.patch
tests/admin_pages/test_all.py | ||
---|---|---|
6 | 6 | |
7 | 7 |
from wcs.qommon.ident.password_accounts import PasswordAccount |
8 | 8 |
from wcs.qommon.http_request import HTTPRequest |
9 |
from wcs.roles import Role |
|
10 | 9 | |
11 | 10 |
from utilities import get_app, login, create_temporary_pub, clean_temporary_pub |
12 | 11 | |
... | ... | |
50 | 49 |
return user1 |
51 | 50 | |
52 | 51 | |
53 |
def create_role(): |
|
54 |
Role.wipe()
|
|
55 |
role = Role(name='foobar')
|
|
52 |
def create_role(pub):
|
|
53 |
pub.role_class.wipe()
|
|
54 |
role = pub.role_class(name='foobar')
|
|
56 | 55 |
role.store() |
57 | 56 |
return role |
58 | 57 | |
... | ... | |
96 | 95 | |
97 | 96 |
def test_admin_for_all(pub): |
98 | 97 |
user = create_superuser(pub) |
99 |
role = create_role() |
|
98 |
role = create_role(pub)
|
|
100 | 99 | |
101 | 100 |
try: |
102 | 101 |
open(os.path.join(pub.app_dir, 'ADMIN_FOR_ALL'), 'w').close() |
tests/admin_pages/test_block.py | ||
---|---|---|
45 | 45 | |
46 | 46 |
def test_block_new(pub, blocks_feature): |
47 | 47 |
create_superuser(pub) |
48 |
create_role() |
|
48 |
create_role(pub)
|
|
49 | 49 |
BlockDef.wipe() |
50 | 50 |
app = login(get_app(pub)) |
51 | 51 |
resp = app.get('/backoffice/forms/') |
... | ... | |
76 | 76 | |
77 | 77 |
def test_block_options(pub, blocks_feature): |
78 | 78 |
create_superuser(pub) |
79 |
create_role() |
|
80 | 79 |
BlockDef.wipe() |
81 | 80 |
block = BlockDef() |
82 | 81 |
block.name = 'foobar' |
... | ... | |
134 | 133 | |
135 | 134 |
def test_block_export_import(pub, blocks_feature): |
136 | 135 |
create_superuser(pub) |
137 |
create_role() |
|
138 | 136 |
BlockDef.wipe() |
139 | 137 |
block = BlockDef() |
140 | 138 |
block.name = 'foobar' |
... | ... | |
182 | 180 | |
183 | 181 |
def test_block_delete(pub, blocks_feature): |
184 | 182 |
create_superuser(pub) |
185 |
create_role() |
|
186 | 183 |
BlockDef.wipe() |
187 | 184 |
FormDef.wipe() |
188 | 185 |
block = BlockDef() |
... | ... | |
219 | 216 | |
220 | 217 |
def test_block_edit_duplicate_delete_field(pub, blocks_feature): |
221 | 218 |
create_superuser(pub) |
222 |
create_role() |
|
223 | 219 |
BlockDef.wipe() |
224 | 220 |
block = BlockDef() |
225 | 221 |
block.name = 'foobar' |
... | ... | |
248 | 244 | |
249 | 245 |
def test_block_use_in_formdef(pub, blocks_feature): |
250 | 246 |
create_superuser(pub) |
251 |
create_role() |
|
252 | 247 |
FormDef.wipe() |
253 | 248 |
BlockDef.wipe() |
254 | 249 |
block = BlockDef() |
tests/admin_pages/test_card.py | ||
---|---|---|
9 | 9 |
from wcs.carddef import CardDef |
10 | 10 |
from wcs.formdef import FormDef |
11 | 11 |
from wcs.qommon.http_request import HTTPRequest |
12 |
from wcs.roles import Role |
|
13 | 12 |
from wcs.wf.form import FormWorkflowStatusItem, WorkflowFormFieldsFormDef |
14 | 13 |
from wcs.workflows import Workflow |
15 | 14 |
from wcs.workflows import WorkflowBackofficeFieldsFormDef |
... | ... | |
357 | 356 | |
358 | 357 |
def test_card_custom_view_data_source(pub): |
359 | 358 |
user = create_superuser(pub) |
360 |
Role.wipe()
|
|
361 |
role = Role(name='foobar')
|
|
359 |
pub.role_class.wipe()
|
|
360 |
role = pub.role_class(name='foobar')
|
|
362 | 361 |
role.store() |
363 | 362 |
user.roles = [role.id] |
364 | 363 |
user.store() |
... | ... | |
418 | 417 | |
419 | 418 |
def test_carddef_usage(pub): |
420 | 419 |
user = create_superuser(pub) |
421 |
Role.wipe()
|
|
422 |
role = Role(name='foobar')
|
|
420 |
pub.role_class.wipe()
|
|
421 |
role = pub.role_class(name='foobar')
|
|
423 | 422 |
role.store() |
424 | 423 |
user.roles = [role.id] |
425 | 424 |
user.store() |
tests/admin_pages/test_datasource.py | ||
---|---|---|
23 | 23 |
from wcs import fields |
24 | 24 | |
25 | 25 |
from utilities import get_app, login, create_temporary_pub, clean_temporary_pub, HttpRequestsMocking |
26 |
from .test_all import create_superuser, create_role
|
|
26 |
from .test_all import create_superuser |
|
27 | 27 | |
28 | 28 | |
29 | 29 |
def pytest_generate_tests(metafunc): |
... | ... | |
488 | 488 | |
489 | 489 |
def test_data_sources_export(pub): |
490 | 490 |
create_superuser(pub) |
491 |
create_role() |
|
492 | 491 | |
493 | 492 |
NamedDataSource.wipe() |
494 | 493 |
data_source = NamedDataSource(name='foobar') |
... | ... | |
508 | 507 | |
509 | 508 |
def test_data_sources_import(pub): |
510 | 509 |
create_superuser(pub) |
511 |
create_role() |
|
512 | 510 | |
513 | 511 |
NamedDataSource.wipe() |
514 | 512 |
data_source = NamedDataSource(name='foobar') |
tests/admin_pages/test_form.py | ||
---|---|---|
17 | 17 |
from wcs.categories import Category |
18 | 18 |
from wcs.data_sources import NamedDataSource |
19 | 19 |
from wcs.wscalls import NamedWsCall |
20 |
from wcs.roles import Role |
|
21 | 20 |
from wcs.workflows import Workflow |
22 | 21 |
from wcs.formdef import FormDef |
23 | 22 |
from wcs.carddef import CardDef |
... | ... | |
53 | 52 | |
54 | 53 | |
55 | 54 |
def test_forms(pub): |
56 |
user = create_superuser(pub) |
|
55 |
create_superuser(pub) |
|
56 |
pub.role_class.wipe() |
|
57 | 57 |
app = login(get_app(pub)) |
58 | 58 |
resp = app.get('/backoffice/forms/') |
59 | 59 |
assert 'You first have to define roles.' in resp.text |
60 |
assert not 'New Form' in resp.text
|
|
60 |
assert 'New Form' not in resp.text
|
|
61 | 61 | |
62 | 62 | |
63 | 63 |
def test_forms_new(pub): |
64 | 64 |
create_superuser(pub) |
65 | 65 |
app = login(get_app(pub)) |
66 |
create_role() |
|
66 |
create_role(pub)
|
|
67 | 67 | |
68 |
FormDef.wipe() |
|
68 | 69 |
# create a new form |
69 | 70 |
resp = app.get('/backoffice/forms/') |
70 | 71 |
assert 'New Form' in resp.text |
... | ... | |
80 | 81 |
assert formdef.name == 'form title' |
81 | 82 |
assert formdef.url_name == 'form-title' |
82 | 83 |
assert formdef.fields == [] |
83 |
assert formdef.disabled == True
|
|
84 |
assert formdef.disabled is True
|
|
84 | 85 | |
85 | 86 | |
86 | 87 |
def test_forms_new_popup(pub): |
87 | 88 |
FormDef.wipe() |
88 | 89 |
create_superuser(pub) |
89 | 90 |
app = login(get_app(pub)) |
90 |
create_role() |
|
91 |
create_role(pub)
|
|
91 | 92 | |
92 | 93 |
# create a new form |
93 | 94 |
resp = app.get('/backoffice/forms/') |
... | ... | |
105 | 106 |
assert formdef.name == 'form title' |
106 | 107 |
assert formdef.url_name == 'form-title' |
107 | 108 |
assert formdef.fields == [] |
108 |
assert formdef.disabled == True
|
|
109 |
assert formdef.disabled is True
|
|
109 | 110 | |
110 | 111 | |
111 | 112 |
def assert_option_display(resp, label, value): |
... | ... | |
116 | 117 | |
117 | 118 |
def test_forms_edit(pub): |
118 | 119 |
create_superuser(pub) |
119 |
create_role() |
|
120 |
create_role(pub)
|
|
120 | 121 | |
121 | 122 |
FormDef.wipe() |
122 | 123 |
formdef = FormDef() |
... | ... | |
138 | 139 |
assert resp.location == 'http://example.net/backoffice/forms/1/' |
139 | 140 |
resp = resp.follow() |
140 | 141 |
assert_option_display(resp, 'Confirmation Page', 'Disabled') |
141 |
assert FormDef.get(1).confirmation == False
|
|
142 |
assert FormDef.get(1).confirmation is False
|
|
142 | 143 | |
143 | 144 |
# try cancel button |
144 | 145 |
resp = resp.click('Confirmation Page') |
... | ... | |
148 | 149 |
assert resp.location == 'http://example.net/backoffice/forms/1/' |
149 | 150 |
resp = resp.follow() |
150 | 151 |
assert_option_display(resp, 'Confirmation Page', 'Disabled') |
151 |
assert FormDef.get(1).confirmation == False
|
|
152 |
assert FormDef.get(1).confirmation is False
|
|
152 | 153 | |
153 | 154 |
# Limit to one form |
154 | 155 |
assert_option_display(resp, 'Limit to one form', 'Disabled') |
... | ... | |
267 | 268 | |
268 | 269 |
def test_form_title_change(pub): |
269 | 270 |
create_superuser(pub) |
270 |
create_role() |
|
271 |
create_role(pub)
|
|
271 | 272 | |
272 | 273 |
FormDef.wipe() |
273 | 274 |
formdef = FormDef() |
... | ... | |
320 | 321 | |
321 | 322 |
def test_forms_edit_publication_date(pub): |
322 | 323 |
create_superuser(pub) |
323 |
create_role() |
|
324 |
create_role(pub)
|
|
324 | 325 | |
325 | 326 |
FormDef.wipe() |
326 | 327 |
formdef = FormDef() |
... | ... | |
355 | 356 | |
356 | 357 |
def test_form_category(pub): |
357 | 358 |
create_superuser(pub) |
358 |
create_role() |
|
359 |
create_role(pub)
|
|
359 | 360 | |
360 | 361 |
FormDef.wipe() |
361 | 362 |
formdef = FormDef() |
... | ... | |
379 | 380 | |
380 | 381 |
def test_form_category_select(pub): |
381 | 382 |
create_superuser(pub) |
382 |
create_role() |
|
383 |
create_role(pub)
|
|
383 | 384 | |
384 | 385 |
FormDef.wipe() |
385 | 386 |
formdef = FormDef() |
... | ... | |
408 | 409 | |
409 | 410 |
def test_form_workflow(pub): |
410 | 411 |
create_superuser(pub) |
411 |
create_role() |
|
412 |
create_role(pub)
|
|
412 | 413 | |
413 | 414 |
FormDef.wipe() |
414 | 415 |
formdef = FormDef() |
... | ... | |
432 | 433 | |
433 | 434 |
def test_form_workflow_change(pub): |
434 | 435 |
create_superuser(pub) |
435 |
create_role() |
|
436 |
create_role(pub)
|
|
436 | 437 | |
437 | 438 |
FormDef.wipe() |
438 | 439 |
formdef = FormDef() |
... | ... | |
463 | 464 | |
464 | 465 |
def test_form_workflow_link(pub): |
465 | 466 |
create_superuser(pub) |
466 |
create_role() |
|
467 |
create_role(pub)
|
|
467 | 468 | |
468 | 469 |
FormDef.wipe() |
469 | 470 |
formdef = FormDef() |
... | ... | |
487 | 488 |
assert '/backoffice/workflows/%s/' % workflow.id in resp.text |
488 | 489 | |
489 | 490 |
# check workflow link is not displayed if user has no access right |
490 |
pub.cfg['admin-permissions'] = {'workflows': ['x']} # block access
|
|
491 |
pub.cfg['admin-permissions'] = {'workflows': ['x']} # block access |
|
491 | 492 |
pub.write_cfg() |
492 | 493 |
resp = app.get('/backoffice/forms/%s/' % formdef.id) |
493 | 494 |
assert '/backoffice/workflows/%s/' % workflow.id not in resp.text |
... | ... | |
495 | 496 | |
496 | 497 |
def test_form_workflow_remapping(pub): |
497 | 498 |
create_superuser(pub) |
498 |
create_role() |
|
499 |
create_role(pub)
|
|
499 | 500 | |
500 | 501 |
FormDef.wipe() |
501 | 502 |
formdef = FormDef() |
... | ... | |
561 | 562 | |
562 | 563 |
def test_form_submitter_roles(pub): |
563 | 564 |
create_superuser(pub) |
564 |
role = create_role()
|
|
565 |
create_role(pub)
|
|
565 | 566 | |
566 | 567 |
FormDef.wipe() |
567 | 568 |
formdef = FormDef() |
... | ... | |
594 | 595 |
assert FormDef.get(formdef.id).required_authentication_contexts == ['fedict'] |
595 | 596 | |
596 | 597 |
# check internal roles are not advertised |
597 |
role2 = Role(name='internal')
|
|
598 |
role2 = pub.role_class(name='internal')
|
|
598 | 599 |
role2.internal = True |
599 | 600 |
role2.store() |
600 | 601 | |
... | ... | |
607 | 608 | |
608 | 609 |
def test_form_workflow_role(pub): |
609 | 610 |
create_superuser(pub) |
610 |
role = create_role() |
|
611 |
role = create_role(pub)
|
|
611 | 612 | |
612 | 613 |
FormDef.wipe() |
613 | 614 |
formdef = FormDef() |
... | ... | |
627 | 628 |
assert FormDef.get(1).workflow_roles == {'_receiver': '1'} |
628 | 629 | |
629 | 630 |
# check it doesn't fail if a second role with the same name exists |
630 |
role = Role(name='foobar')
|
|
631 |
role = pub.role_class(name='foobar')
|
|
631 | 632 |
role.store() |
632 | 633 |
resp = app.get('/backoffice/forms/1/') |
633 | 634 |
resp = resp.click(href='role/_receiver') |
... | ... | |
635 | 636 | |
636 | 637 |
def test_form_workflow_options(pub): |
637 | 638 |
create_superuser(pub) |
638 |
create_role() |
|
639 |
create_role(pub)
|
|
639 | 640 | |
640 | 641 |
Workflow.wipe() |
641 | 642 |
workflow = Workflow(name='Workflow One') |
... | ... | |
656 | 657 | |
657 | 658 |
def test_form_workflow_variables(pub): |
658 | 659 |
create_superuser(pub) |
659 |
create_role() |
|
660 |
create_role(pub)
|
|
660 | 661 | |
661 | 662 |
FormDef.wipe() |
662 | 663 |
formdef = FormDef() |
... | ... | |
721 | 722 | |
722 | 723 |
def test_form_workflow_table_variables(pub): |
723 | 724 |
create_superuser(pub) |
724 |
create_role() |
|
725 |
create_role(pub)
|
|
725 | 726 | |
726 | 727 |
Workflow.wipe() |
727 | 728 |
workflow = Workflow(name='Workflow One') |
... | ... | |
768 | 769 | |
769 | 770 |
def test_form_roles(pub): |
770 | 771 |
create_superuser(pub) |
771 |
role = create_role() |
|
772 |
role = create_role(pub)
|
|
772 | 773 | |
773 | 774 |
FormDef.wipe() |
774 | 775 |
formdef = FormDef() |
... | ... | |
791 | 792 | |
792 | 793 |
def test_form_always_advertise(pub): |
793 | 794 |
create_superuser(pub) |
794 |
role = create_role() |
|
795 |
role = create_role(pub)
|
|
795 | 796 | |
796 | 797 |
FormDef.wipe() |
797 | 798 |
formdef = FormDef() |
... | ... | |
866 | 867 | |
867 | 868 |
def test_form_delete(pub): |
868 | 869 |
create_superuser(pub) |
869 |
create_role() |
|
870 |
create_role(pub)
|
|
870 | 871 | |
871 | 872 |
FormDef.wipe() |
872 | 873 |
formdef = FormDef() |
... | ... | |
886 | 887 | |
887 | 888 |
def test_form_delete_with_data(pub): |
888 | 889 |
create_superuser(pub) |
889 |
create_role() |
|
890 |
create_role(pub)
|
|
890 | 891 | |
891 | 892 |
FormDef.wipe() |
892 | 893 |
formdef = FormDef() |
... | ... | |
920 | 921 | |
921 | 922 |
def test_form_duplicate(pub): |
922 | 923 |
create_superuser(pub) |
923 |
create_role() |
|
924 |
create_role(pub)
|
|
924 | 925 | |
925 | 926 |
FormDef.wipe() |
926 | 927 |
formdef = FormDef() |
... | ... | |
947 | 948 | |
948 | 949 |
def test_form_export(pub): |
949 | 950 |
create_superuser(pub) |
950 |
create_role() |
|
951 |
create_role(pub)
|
|
951 | 952 | |
952 | 953 |
FormDef.wipe() |
953 | 954 |
formdef = FormDef() |
... | ... | |
967 | 968 | |
968 | 969 | |
969 | 970 |
def test_form_import(pub): |
970 |
user = create_superuser(pub)
|
|
971 |
role = create_role()
|
|
971 |
create_superuser(pub) |
|
972 |
create_role(pub)
|
|
972 | 973 | |
973 | 974 |
FormDef.wipe() |
974 | 975 |
formdef = FormDef() |
... | ... | |
1040 | 1041 | |
1041 | 1042 |
def test_form_import_from_url(pub): |
1042 | 1043 |
create_superuser(pub) |
1043 |
create_role() |
|
1044 |
create_role(pub)
|
|
1044 | 1045 | |
1045 | 1046 |
FormDef.wipe() |
1046 | 1047 |
formdef = FormDef() |
... | ... | |
1071 | 1072 | |
1072 | 1073 |
def test_form_qrcode(pub): |
1073 | 1074 |
create_superuser(pub) |
1074 |
create_role() |
|
1075 |
create_role(pub)
|
|
1075 | 1076 | |
1076 | 1077 |
FormDef.wipe() |
1077 | 1078 |
formdef = FormDef() |
... | ... | |
1090 | 1091 | |
1091 | 1092 |
def test_form_description(pub): |
1092 | 1093 |
create_superuser(pub) |
1093 |
create_role() |
|
1094 |
create_role(pub)
|
|
1094 | 1095 | |
1095 | 1096 |
FormDef.wipe() |
1096 | 1097 |
formdef = FormDef() |
... | ... | |
1112 | 1113 | |
1113 | 1114 |
def test_form_enable_from_fields_page(pub): |
1114 | 1115 |
create_superuser(pub) |
1115 |
create_role() |
|
1116 |
create_role(pub)
|
|
1116 | 1117 | |
1117 | 1118 |
FormDef.wipe() |
1118 | 1119 |
formdef = FormDef() |
... | ... | |
1132 | 1133 | |
1133 | 1134 |
def test_form_new_field(pub): |
1134 | 1135 |
create_superuser(pub) |
1135 |
create_role() |
|
1136 |
create_role(pub)
|
|
1136 | 1137 | |
1137 | 1138 |
FormDef.wipe() |
1138 | 1139 |
formdef = FormDef() |
... | ... | |
1171 | 1172 | |
1172 | 1173 |
def test_form_delete_field(pub): |
1173 | 1174 |
create_superuser(pub) |
1174 |
create_role() |
|
1175 |
create_role(pub)
|
|
1175 | 1176 | |
1176 | 1177 |
FormDef.wipe() |
1177 | 1178 |
formdef = FormDef() |
... | ... | |
1198 | 1199 | |
1199 | 1200 |
def test_form_delete_field_existing_data(pub): |
1200 | 1201 |
create_superuser(pub) |
1201 |
create_role() |
|
1202 |
create_role(pub)
|
|
1202 | 1203 | |
1203 | 1204 |
FormDef.wipe() |
1204 | 1205 |
formdef = FormDef() |
... | ... | |
1239 | 1240 | |
1240 | 1241 |
def test_form_delete_page_field(pub): |
1241 | 1242 |
create_superuser(pub) |
1242 |
create_role() |
|
1243 |
create_role(pub)
|
|
1243 | 1244 | |
1244 | 1245 |
FormDef.wipe() |
1245 | 1246 |
formdef = FormDef() |
... | ... | |
1286 | 1287 | |
1287 | 1288 | |
1288 | 1289 |
def test_form_duplicate_field(pub): |
1289 |
user = create_superuser(pub)
|
|
1290 |
create_role() |
|
1290 |
create_superuser(pub) |
|
1291 |
create_role(pub)
|
|
1291 | 1292 | |
1292 | 1293 |
FormDef.wipe() |
1293 | 1294 |
formdef = FormDef() |
... | ... | |
1310 | 1311 | |
1311 | 1312 |
def test_form_duplicate_file_field(pub): |
1312 | 1313 |
create_superuser(pub) |
1313 |
create_role() |
|
1314 |
create_role(pub)
|
|
1314 | 1315 | |
1315 | 1316 |
FormDef.wipe() |
1316 | 1317 |
formdef = FormDef() |
... | ... | |
1337 | 1338 | |
1338 | 1339 |
def test_form_edit_field(pub): |
1339 | 1340 |
create_superuser(pub) |
1340 |
create_role() |
|
1341 |
create_role(pub)
|
|
1341 | 1342 | |
1342 | 1343 |
FormDef.wipe() |
1343 | 1344 |
formdef = FormDef() |
... | ... | |
1358 | 1359 |
assert resp.location == 'http://example.net/backoffice/forms/1/fields/#itemId_1' |
1359 | 1360 | |
1360 | 1361 |
assert FormDef.get(1).fields[0].label == 'changed field' |
1361 |
assert FormDef.get(1).fields[0].required == False
|
|
1362 |
assert FormDef.get(1).fields[0].required is False
|
|
1362 | 1363 | |
1363 | 1364 | |
1364 | 1365 |
def test_form_edit_field_advanced(pub): |
1365 | 1366 |
create_superuser(pub) |
1366 |
create_role() |
|
1367 |
create_role(pub)
|
|
1367 | 1368 | |
1368 | 1369 |
FormDef.wipe() |
1369 | 1370 |
formdef = FormDef() |
... | ... | |
1436 | 1437 | |
1437 | 1438 |
def test_form_prefill_field(pub): |
1438 | 1439 |
create_superuser(pub) |
1439 |
create_role() |
|
1440 |
create_role(pub)
|
|
1440 | 1441 | |
1441 | 1442 |
FormDef.wipe() |
1442 | 1443 |
formdef = FormDef() |
... | ... | |
1484 | 1485 | |
1485 | 1486 |
def test_form_edit_string_field_validation(pub): |
1486 | 1487 |
create_superuser(pub) |
1487 |
create_role() |
|
1488 |
create_role(pub)
|
|
1488 | 1489 | |
1489 | 1490 |
FormDef.wipe() |
1490 | 1491 |
formdef = FormDef() |
... | ... | |
1533 | 1534 | |
1534 | 1535 |
def test_form_edit_item_field(pub): |
1535 | 1536 |
create_superuser(pub) |
1536 |
create_role() |
|
1537 |
create_role(pub)
|
|
1537 | 1538 | |
1538 | 1539 |
FormDef.wipe() |
1539 | 1540 |
formdef = FormDef() |
... | ... | |
1560 | 1561 |
resp = resp.follow() |
1561 | 1562 | |
1562 | 1563 |
assert FormDef.get(1).fields[0].label == 'changed field' |
1563 |
assert FormDef.get(1).fields[0].required == False
|
|
1564 |
assert FormDef.get(1).fields[0].required is False
|
|
1564 | 1565 |
assert FormDef.get(1).fields[0].items is None |
1565 | 1566 | |
1566 | 1567 |
# edit and fill with one item |
... | ... | |
1574 | 1575 | |
1575 | 1576 |
def test_form_edit_item_field_data_source(pub): |
1576 | 1577 |
create_superuser(pub) |
1577 |
create_role() |
|
1578 |
create_role(pub)
|
|
1578 | 1579 | |
1579 | 1580 |
FormDef.wipe() |
1580 | 1581 |
formdef = FormDef() |
... | ... | |
1663 | 1664 |
def test_form_edit_item_field_geojson_data_source(pub, http_requests): |
1664 | 1665 |
NamedDataSource.wipe() |
1665 | 1666 |
create_superuser(pub) |
1666 |
create_role() |
|
1667 |
create_role(pub)
|
|
1667 | 1668 | |
1668 | 1669 |
NamedDataSource.wipe() |
1669 | 1670 |
data_source = NamedDataSource(name='foobar') |
... | ... | |
1702 | 1703 | |
1703 | 1704 |
def test_form_edit_items_field(pub): |
1704 | 1705 |
create_superuser(pub) |
1705 |
create_role() |
|
1706 |
create_role(pub)
|
|
1706 | 1707 | |
1707 | 1708 |
FormDef.wipe() |
1708 | 1709 |
formdef = FormDef() |
... | ... | |
1774 | 1775 | |
1775 | 1776 |
def test_form_edit_page_field(pub): |
1776 | 1777 |
create_superuser(pub) |
1777 |
create_role() |
|
1778 |
create_role(pub)
|
|
1778 | 1779 | |
1779 | 1780 |
FormDef.wipe() |
1780 | 1781 |
formdef = FormDef() |
... | ... | |
1829 | 1830 | |
1830 | 1831 |
def test_form_edit_comment_field(pub): |
1831 | 1832 |
create_superuser(pub) |
1832 |
create_role() |
|
1833 |
create_role(pub)
|
|
1833 | 1834 | |
1834 | 1835 |
FormDef.wipe() |
1835 | 1836 |
formdef = FormDef() |
... | ... | |
1919 | 1920 | |
1920 | 1921 |
def test_form_comment_field_wysiwygtextwidget_validation(pub): |
1921 | 1922 |
create_superuser(pub) |
1922 |
create_role() |
|
1923 |
create_role(pub)
|
|
1923 | 1924 | |
1924 | 1925 |
FormDef.wipe() |
1925 | 1926 |
formdef = FormDef() |
... | ... | |
1958 | 1959 | |
1959 | 1960 |
def test_form_edit_map_field(pub): |
1960 | 1961 |
create_superuser(pub) |
1961 |
create_role() |
|
1962 |
create_role(pub)
|
|
1962 | 1963 | |
1963 | 1964 |
FormDef.wipe() |
1964 | 1965 |
formdef = FormDef() |
... | ... | |
2026 | 2027 | |
2027 | 2028 |
def test_form_edit_field_warnings(pub): |
2028 | 2029 |
create_superuser(pub) |
2029 |
create_role() |
|
2030 |
create_role(pub)
|
|
2030 | 2031 | |
2031 | 2032 |
FormDef.wipe() |
2032 | 2033 |
formdef = FormDef() |
... | ... | |
2063 | 2064 | |
2064 | 2065 |
def test_form_limit_display_to_page(pub): |
2065 | 2066 |
create_superuser(pub) |
2066 |
create_role() |
|
2067 |
create_role(pub)
|
|
2067 | 2068 | |
2068 | 2069 |
FormDef.wipe() |
2069 | 2070 |
formdef = FormDef() |
... | ... | |
2089 | 2090 | |
2090 | 2091 |
def test_form_fields_reorder(pub): |
2091 | 2092 |
create_superuser(pub) |
2092 |
create_role() |
|
2093 |
create_role(pub)
|
|
2093 | 2094 | |
2094 | 2095 |
FormDef.wipe() |
2095 | 2096 |
formdef = FormDef() |
... | ... | |
2152 | 2153 | |
2153 | 2154 |
def test_form_move_page_fields(pub): |
2154 | 2155 |
create_superuser(pub) |
2155 |
create_role() |
|
2156 |
create_role(pub)
|
|
2156 | 2157 | |
2157 | 2158 |
FormDef.wipe() |
2158 | 2159 |
formdef = FormDef() |
... | ... | |
2197 | 2198 | |
2198 | 2199 |
def test_form_legacy_int_id(pub): |
2199 | 2200 |
create_superuser(pub) |
2200 |
create_role() |
|
2201 |
create_role(pub)
|
|
2201 | 2202 | |
2202 | 2203 |
Category.wipe() |
2203 | 2204 |
cat = Category(name='Foo') |
... | ... | |
2214 | 2215 |
formdef.name = 'form title' |
2215 | 2216 |
formdef.fields = [] |
2216 | 2217 | |
2217 |
role = Role(name='ZAB') # Z to get sorted last
|
|
2218 |
role = pub.role_class(name='ZAB') # Z to get sorted last
|
|
2218 | 2219 |
role.store() |
2219 | 2220 | |
2220 | 2221 |
# set attributes using integers |
... | ... | |
2249 | 2250 | |
2250 | 2251 |
def test_form_anonymise(pub): |
2251 | 2252 |
create_superuser(pub) |
2252 |
create_role() |
|
2253 |
create_role(pub)
|
|
2253 | 2254 | |
2254 | 2255 |
FormDef.wipe() |
2255 | 2256 |
formdef = FormDef() |
... | ... | |
2316 | 2317 | |
2317 | 2318 |
def test_form_public_url(pub): |
2318 | 2319 |
create_superuser(pub) |
2319 |
create_role() |
|
2320 |
create_role(pub)
|
|
2320 | 2321 | |
2321 | 2322 |
FormDef.wipe() |
2322 | 2323 |
formdef = FormDef() |
... | ... | |
2332 | 2333 | |
2333 | 2334 |
def test_form_archive(pub): |
2334 | 2335 |
create_superuser(pub) |
2335 |
create_role() |
|
2336 |
create_role(pub)
|
|
2336 | 2337 | |
2337 | 2338 |
if pub.is_using_postgresql(): |
2338 | 2339 |
# this doesn't exist in SQL |
... | ... | |
2382 | 2383 | |
2383 | 2384 | |
2384 | 2385 |
def test_form_overwrite(pub): |
2385 |
user = create_superuser(pub)
|
|
2386 |
role = create_role()
|
|
2386 |
create_superuser(pub) |
|
2387 |
create_role(pub)
|
|
2387 | 2388 | |
2388 | 2389 |
FormDef.wipe() |
2389 | 2390 |
formdef = FormDef() |
... | ... | |
2498 | 2499 | |
2499 | 2500 |
def test_form_export_import_export_overwrite(pub): |
2500 | 2501 |
create_superuser(pub) |
2501 |
create_role() |
|
2502 |
create_role(pub)
|
|
2502 | 2503 | |
2503 | 2504 |
FormDef.wipe() |
2504 | 2505 |
formdef = FormDef() |
tests/admin_pages/test_role.py | ||
---|---|---|
3 | 3 |
import pytest |
4 | 4 | |
5 | 5 |
from wcs.qommon.http_request import HTTPRequest |
6 |
from wcs.roles import Role |
|
7 | 6 |
from wcs.formdef import FormDef |
8 | 7 | |
9 | 8 |
from utilities import get_app, login, create_temporary_pub, clean_temporary_pub |
... | ... | |
42 | 41 | |
43 | 42 |
def test_roles_new(pub): |
44 | 43 |
create_superuser(pub) |
45 |
Role.wipe()
|
|
44 |
pub.role_class.wipe()
|
|
46 | 45 |
app = login(get_app(pub)) |
47 | 46 |
resp = app.get('/backoffice/roles/') |
48 | 47 |
resp = resp.click('New Role') |
... | ... | |
55 | 54 |
resp = resp.click('a new role') |
56 | 55 |
assert '<h2>a new role' in resp.text |
57 | 56 | |
58 |
assert Role.get(1).name == 'a new role'
|
|
59 |
assert Role.get(1).details == 'bla bla bla'
|
|
57 |
assert pub.role_class.get(1).name == 'a new role'
|
|
58 |
assert pub.role_class.get(1).details == 'bla bla bla'
|
|
60 | 59 | |
61 | 60 | |
62 | 61 |
def test_roles_edit(pub): |
63 | 62 |
create_superuser(pub) |
64 |
Role.wipe()
|
|
65 |
role = Role(name='foobar')
|
|
63 |
pub.role_class.wipe()
|
|
64 |
role = pub.role_class(name='foobar')
|
|
66 | 65 |
role.store() |
67 | 66 | |
68 | 67 |
app = login(get_app(pub)) |
... | ... | |
80 | 79 |
assert '<h2>baz' in resp.text |
81 | 80 |
assert 'Holders of this role will receive all emails adressed to the role.' in resp.text |
82 | 81 | |
83 |
assert Role.get(1).details == 'bla bla bla'
|
|
84 |
assert Role.get(1).emails_to_members is True
|
|
82 |
assert pub.role_class.get(1).details == 'bla bla bla'
|
|
83 |
assert pub.role_class.get(1).emails_to_members is True
|
|
85 | 84 | |
86 | 85 | |
87 | 86 |
def test_roles_matching_formdefs(pub): |
88 | 87 |
create_superuser(pub) |
89 |
Role.wipe()
|
|
90 |
role = Role(name='foo')
|
|
88 |
pub.role_class.wipe()
|
|
89 |
role = pub.role_class(name='foo')
|
|
91 | 90 |
role.store() |
92 | 91 | |
93 | 92 |
FormDef.wipe() |
... | ... | |
119 | 118 | |
120 | 119 |
def test_roles_delete(pub): |
121 | 120 |
create_superuser(pub) |
122 |
Role.wipe()
|
|
123 |
role = Role(name='foobar')
|
|
121 |
pub.role_class.wipe()
|
|
122 |
role = pub.role_class(name='foobar')
|
|
124 | 123 |
role.store() |
125 | 124 | |
126 | 125 |
app = login(get_app(pub)) |
... | ... | |
130 | 129 |
resp = resp.forms[0].submit() |
131 | 130 |
assert resp.location == 'http://example.net/backoffice/roles/' |
132 | 131 |
resp = resp.follow() |
133 |
assert Role.count() == 0 |
|
132 |
assert pub.role_class.count() == 0 |
tests/admin_pages/test_settings.py | ||
---|---|---|
24 | 24 |
from wcs.categories import Category, CardDefCategory |
25 | 25 |
from wcs.data_sources import NamedDataSource |
26 | 26 |
from wcs.wscalls import NamedWsCall |
27 |
from wcs.roles import Role |
|
28 | 27 |
from wcs.workflows import Workflow, CommentableWorkflowStatusItem |
29 | 28 |
from wcs.wf.export_to_model import ExportToModel |
30 | 29 |
from wcs.formdef import FormDef |
... | ... | |
32 | 31 |
from wcs import fields |
33 | 32 | |
34 | 33 |
from utilities import get_app, login, create_temporary_pub, clean_temporary_pub |
35 |
from .test_all import create_superuser, create_role
|
|
34 |
from .test_all import create_superuser |
|
36 | 35 | |
37 | 36 | |
38 | 37 |
def pytest_generate_tests(metafunc): |
... | ... | |
97 | 96 |
FormDef.wipe() |
98 | 97 |
CardDef.wipe() |
99 | 98 |
Workflow.wipe() |
100 |
Role.wipe()
|
|
99 |
pub.role_class.wipe()
|
|
101 | 100 |
Category.wipe() |
102 | 101 |
CardDefCategory.wipe() |
103 | 102 |
NamedDataSource.wipe() |
... | ... | |
133 | 132 |
carddef.store() |
134 | 133 |
Category(name='baz').store() |
135 | 134 |
CardDefCategory(name='foobar').store() |
136 |
Role(name='qux').store()
|
|
135 |
pub.role_class(name='qux').store()
|
|
137 | 136 |
NamedDataSource(name='quux').store() |
138 | 137 |
ds = NamedDataSource(name='agenda') |
139 | 138 |
ds.external = 'agenda' |
... | ... | |
176 | 175 |
assert 'workflows/1' not in filelist |
177 | 176 |
assert 'workflows_xml/1' in filelist |
178 | 177 |
assert 'models/export_to_model-1.upload' not in filelist |
179 |
assert 'roles/1' in filelist |
|
178 |
assert 'roles/1' not in filelist |
|
179 |
assert 'roles_xml/1' in filelist |
|
180 | 180 |
assert 'categories/1' in filelist |
181 | 181 |
assert 'carddef_categories/1' in filelist |
182 | 182 |
assert 'datasources/1' in filelist |
... | ... | |
208 | 208 |
assert CardDef.count() == 1 |
209 | 209 |
assert CardDef.select()[0].url_name == 'bar' |
210 | 210 |
assert ApiAccess.count() == 1 |
211 |
assert pub.role_class.count() == 1 |
|
211 | 212 | |
212 | 213 |
# check roles are found by name |
213 | 214 |
wipe() |
214 |
role = Role(name='qux')
|
|
215 |
role = pub.role_class(name='qux')
|
|
215 | 216 |
role.store() |
216 | 217 | |
217 | 218 |
workflow = Workflow(name='Workflow One') |
... | ... | |
250 | 251 |
filelist = zipf.namelist() |
251 | 252 |
assert 'formdefs_xml/%s' % formdef.id in filelist |
252 | 253 |
assert 'workflows_xml/%s' % workflow.id in filelist |
253 |
assert 'roles/%s' % role.id not in filelist |
|
254 |
assert 'roles_xml/%s' % role.id not in filelist
|
|
254 | 255 | |
255 | 256 |
FormDef.wipe() |
256 | 257 |
Workflow.wipe() |
257 |
Role.wipe()
|
|
258 |
pub.role_class.wipe()
|
|
258 | 259 | |
259 | 260 |
# create role beforehand, it should be matched by name |
260 |
role = Role(name='qux')
|
|
261 |
role = pub.role_class(name='qux')
|
|
261 | 262 |
role.id = '012345' |
262 | 263 |
role.store() |
263 | 264 | |
... | ... | |
282 | 283 |
zip_content = io.BytesIO(resp.body) |
283 | 284 |
zipf = zipfile.ZipFile(zip_content, 'a') |
284 | 285 |
filelist = zipf.namelist() |
285 |
assert len([x for x in filelist if 'roles/' in x]) == 0 |
|
286 |
assert len([x for x in filelist if 'roles_xml/' in x]) == 0
|
|
286 | 287 | |
287 | 288 |
# check an error is displayed if such an import is then used and roles are |
288 | 289 |
# missing. |
289 | 290 |
FormDef.wipe() |
290 | 291 |
Workflow.wipe() |
291 |
Role.wipe()
|
|
292 |
pub.role_class.wipe()
|
|
292 | 293 |
resp = app.get('/backoffice/settings/import') |
293 | 294 |
resp.form['file'] = Upload('export.wcs', zip_content.getvalue()) |
294 | 295 |
resp = resp.form.submit('submit') |
... | ... | |
579 | 580 | |
580 | 581 | |
581 | 582 |
def test_settings_auth_password(pub): |
582 |
Role.wipe()
|
|
583 |
pub.role_class.wipe()
|
|
583 | 584 | |
584 | 585 |
pub.user_class.wipe() # makes sure there are no users |
585 | 586 |
pub.cfg['identification'] = {'methods': ['password']} |
... | ... | |
724 | 725 | |
725 | 726 |
def test_settings_permissions(pub): |
726 | 727 |
create_superuser(pub) |
727 |
role1 = create_role()
|
|
728 |
role1.name = 'foobar1'
|
|
728 |
pub.role_class.wipe()
|
|
729 |
role1 = pub.role_class(name='foobar1')
|
|
729 | 730 |
role1.store() |
730 |
role2 = Role(name='foobar2')
|
|
731 |
role2 = pub.role_class(name='foobar2')
|
|
731 | 732 |
role2.store() |
732 |
role3 = Role(name='foobar3')
|
|
733 |
role3 = pub.role_class(name='foobar3')
|
|
733 | 734 |
role3.store() |
734 | 735 | |
735 | 736 |
app = login(get_app(pub)) |
... | ... | |
749 | 750 |
resp.forms[0]['permissions$c-0-0'].checked = False |
750 | 751 |
resp.forms[0]['permissions$c-1-0'].checked = True |
751 | 752 |
resp = resp.forms[0].submit() |
752 |
assert Role.get(role1.id).allows_backoffice_access is False
|
|
753 |
assert Role.get(role2.id).allows_backoffice_access is True
|
|
753 |
assert pub.role_class.get(role1.id).allows_backoffice_access is False
|
|
754 |
assert pub.role_class.get(role2.id).allows_backoffice_access is True
|
|
754 | 755 | |
755 | 756 |
# give some roles access to the forms workshop (2nd checkbox) and to the |
756 | 757 |
# workflows workshop (4th) |
tests/admin_pages/test_user.py | ||
---|---|---|
159 | 159 | |
160 | 160 | |
161 | 161 |
def test_users_edit_with_managing_idp(pub): |
162 |
create_role() |
|
162 |
create_role(pub)
|
|
163 | 163 |
pub.user_class.wipe() |
164 | 164 |
pub.cfg['sp'] = {'idp-manage-user-attributes': True} |
165 | 165 |
pub.write_cfg() |
... | ... | |
261 | 261 |
pub.user_class.wipe() |
262 | 262 |
PasswordAccount.wipe() |
263 | 263 |
create_superuser(pub) |
264 |
role = create_role() |
|
264 |
role = create_role(pub)
|
|
265 | 265 |
for i in range(50): |
266 | 266 |
user = pub.user_class(name='foo bar %s' % (i + 1)) |
267 | 267 |
user.store() |
... | ... | |
346 | 346 |
pub.user_class.wipe() |
347 | 347 | |
348 | 348 |
user = create_superuser(pub) |
349 |
role = create_role() |
|
349 |
role = create_role(pub)
|
|
350 | 350 |
user.roles = [role.id, 'XXX'] |
351 | 351 |
user.store() |
352 | 352 |
tests/admin_pages/test_workflow.py | ||
---|---|---|
15 | 15 | |
16 | 16 |
from wcs.qommon.http_request import HTTPRequest |
17 | 17 |
from wcs.qommon.errors import ConnectionError |
18 |
from wcs.roles import Role |
|
19 | 18 |
from wcs.workflows import ( |
20 | 19 |
Workflow, |
21 | 20 |
WorkflowCriticalityLevel, |
... | ... | |
25 | 24 |
ChoiceWorkflowStatusItem, |
26 | 25 |
JumpOnSubmitWorkflowStatusItem, |
27 | 26 |
WorkflowVariablesFieldsFormDef, |
27 |
item_classes, |
|
28 | 28 |
) |
29 | 29 |
from wcs.wf.create_carddata import CreateCarddataWorkflowStatusItem |
30 | 30 |
from wcs.wf.create_formdata import CreateFormdataWorkflowStatusItem, Mapping |
... | ... | |
42 | 42 |
from wcs import fields |
43 | 43 | |
44 | 44 |
from utilities import get_app, login, create_temporary_pub, clean_temporary_pub |
45 |
from .test_all import create_superuser, create_role
|
|
45 |
from .test_all import create_superuser |
|
46 | 46 | |
47 | 47 | |
48 | 48 |
def pytest_generate_tests(metafunc): |
... | ... | |
95 | 95 | |
96 | 96 |
def test_workflows_new(pub): |
97 | 97 |
create_superuser(pub) |
98 |
create_role() |
|
99 | 98 |
Workflow.wipe() |
100 | 99 |
app = login(get_app(pub)) |
101 | 100 |
resp = app.get('/backoffice/workflows/') |
... | ... | |
141 | 140 | |
142 | 141 |
def test_workflows_svg(pub): |
143 | 142 |
create_superuser(pub) |
144 |
role = create_role() |
|
143 |
pub.role_class.wipe() |
|
144 |
role = pub.role_class(name='foobar') |
|
145 |
role.store() |
|
145 | 146 |
Workflow.wipe() |
146 | 147 | |
147 | 148 |
workflow = Workflow(name='foo') |
... | ... | |
367 | 368 |
def test_workflows_export_import_create_role(pub): |
368 | 369 |
create_superuser(pub) |
369 | 370 | |
370 |
Role.wipe()
|
|
371 |
role = Role()
|
|
371 |
pub.role_class.wipe()
|
|
372 |
role = pub.role_class()
|
|
372 | 373 |
role.name = 'PLOP' |
373 | 374 |
role.store() |
374 | 375 | |
... | ... | |
409 | 410 |
resp = resp.follow() |
410 | 411 |
assert 'This workflow has been successfully imported' in resp.text |
411 | 412 |
assert Workflow.get(3).name == 'Copy of foo (2)' |
412 |
assert Role.count() == 1
|
|
413 |
assert Role.select()[0].name == 'PLOP'
|
|
414 |
assert Workflow.get(3).possible_status[0].items[0].by == [Role.select()[0].id]
|
|
413 |
assert pub.role_class.count() == 1
|
|
414 |
assert pub.role_class.select()[0].name == 'PLOP'
|
|
415 |
assert Workflow.get(3).possible_status[0].items[0].by == [pub.role_class.select()[0].id]
|
|
415 | 416 | |
416 | 417 |
# don't create role if they are managed by the identity provider |
417 |
Role.wipe()
|
|
418 |
pub.role_class.wipe()
|
|
418 | 419 | |
419 | 420 |
pub.cfg['sp'] = {'idp-manage-roles': True} |
420 | 421 |
pub.write_cfg() |
... | ... | |
523 | 524 | |
524 | 525 |
def test_workflows_edit_dispatch_action(pub): |
525 | 526 |
create_superuser(pub) |
526 |
role = create_role() |
|
527 |
pub.role_class.wipe() |
|
528 |
role = pub.role_class(name='foobar') |
|
529 |
role.store() |
|
527 | 530 |
Workflow.wipe() |
528 | 531 |
workflow = Workflow(name='foo') |
529 | 532 |
workflow.add_status(name='baz') |
... | ... | |
560 | 563 | |
561 | 564 |
def test_workflows_edit_dispatch_action_repeated_function(pub): |
562 | 565 |
create_superuser(pub) |
563 |
create_role() |
|
564 | 566 |
Workflow.wipe() |
565 | 567 |
workflow = Workflow(name='foo') |
566 | 568 |
workflow.add_status(name='baz') |
... | ... | |
591 | 593 | |
592 | 594 |
def test_workflows_edit_email_action(pub): |
593 | 595 |
create_superuser(pub) |
594 |
create_role() |
|
595 | 596 |
Workflow.wipe() |
596 | 597 |
workflow = Workflow(name='foo') |
597 | 598 |
st1 = workflow.add_status(name='baz') |
... | ... | |
740 | 741 | |
741 | 742 |
def test_workflows_edit_jump_previous(pub): |
742 | 743 |
create_superuser(pub) |
743 |
create_role() |
|
744 | 744 |
Workflow.wipe() |
745 | 745 |
workflow = Workflow(name='foo') |
746 | 746 |
st1 = workflow.add_status(name='baz') |
... | ... | |
792 | 792 | |
793 | 793 |
def test_workflows_edit_jump_timeout(pub): |
794 | 794 |
create_superuser(pub) |
795 |
create_role() |
|
796 | 795 |
Workflow.wipe() |
797 | 796 |
workflow = Workflow(name='foo') |
798 | 797 |
st1 = workflow.add_status(name='baz') |
... | ... | |
832 | 831 | |
833 | 832 |
def test_workflows_edit_sms_action(pub): |
834 | 833 |
create_superuser(pub) |
835 |
create_role() |
|
836 | 834 |
Workflow.wipe() |
837 | 835 |
workflow = Workflow(name='foo') |
838 | 836 |
workflow.add_status(name='baz') |
... | ... | |
859 | 857 | |
860 | 858 |
def test_workflows_edit_attachment_action(pub): |
861 | 859 |
create_superuser(pub) |
862 |
create_role() |
|
863 | 860 |
Workflow.wipe() |
864 | 861 |
workflow = Workflow(name='foo') |
865 | 862 |
workflow.add_status(name='baz') |
... | ... | |
924 | 921 | |
925 | 922 |
def test_workflows_edit_display_form_action(pub): |
926 | 923 |
create_superuser(pub) |
927 |
create_role() |
|
928 | 924 |
Workflow.wipe() |
929 | 925 |
workflow = Workflow(name='foo') |
930 | 926 |
workflow.add_status(name='baz') |
... | ... | |
969 | 965 | |
970 | 966 |
def test_workflows_edit_choice_action(pub): |
971 | 967 |
create_superuser(pub) |
972 |
create_role() |
|
973 | 968 |
Workflow.wipe() |
974 | 969 |
workflow = Workflow(name='foo') |
975 | 970 |
workflow.add_status(name='baz') |
... | ... | |
1002 | 997 | |
1003 | 998 |
def test_workflows_edit_choice_action_functions_only(pub): |
1004 | 999 |
create_superuser(pub) |
1005 |
create_role() |
|
1006 | 1000 |
Workflow.wipe() |
1007 | 1001 |
workflow = Workflow(name='foo') |
1008 | 1002 |
workflow.add_status(name='baz') |
... | ... | |
1033 | 1027 | |
1034 | 1028 |
def test_workflows_edit_choice_action_line_details(pub): |
1035 | 1029 |
create_superuser(pub) |
1036 |
create_role() |
|
1037 | 1030 | |
1038 | 1031 |
Workflow.wipe() |
1039 | 1032 |
wf = Workflow(name='foo') |
... | ... | |
1136 | 1129 | |
1137 | 1130 |
def test_workflows_action_subpath(pub): |
1138 | 1131 |
create_superuser(pub) |
1139 |
create_role() |
|
1140 | 1132 |
Workflow.wipe() |
1141 | 1133 |
workflow = Workflow(name='foo') |
1142 | 1134 |
baz_status = workflow.add_status(name='baz') |
... | ... | |
1153 | 1145 | |
1154 | 1146 |
def test_workflows_display_action_ezt_validation(pub): |
1155 | 1147 |
create_superuser(pub) |
1156 |
create_role() |
|
1157 | 1148 |
Workflow.wipe() |
1158 | 1149 |
workflow = Workflow(name='foo') |
1159 | 1150 |
baz_status = workflow.add_status(name='baz') |
... | ... | |
1191 | 1182 | |
1192 | 1183 |
def test_workflows_delete_action(pub): |
1193 | 1184 |
create_superuser(pub) |
1194 |
create_role() |
|
1195 | 1185 |
Workflow.wipe() |
1196 | 1186 |
workflow = Workflow(name='foo') |
1197 | 1187 |
workflow.add_status(name='baz') |
... | ... | |
1219 | 1209 | |
1220 | 1210 |
def test_workflows_variables(pub): |
1221 | 1211 |
create_superuser(pub) |
1222 |
create_role() |
|
1223 | 1212 | |
1224 | 1213 |
Workflow.wipe() |
1225 | 1214 |
workflow = Workflow(name='foo') |
... | ... | |
1278 | 1267 | |
1279 | 1268 | |
1280 | 1269 |
def test_workflows_variables_edit_with_all_action_types(pub): |
1281 |
test_workflows_add_all_actions(pub) |
|
1270 |
create_superuser(pub) |
|
1271 | ||
1272 |
Workflow.wipe() |
|
1273 |
workflow = Workflow(name='foo') |
|
1274 |
status = workflow.add_status(name='baz') |
|
1275 |
for item_class in item_classes: |
|
1276 |
status.append_item(item_class.key) |
|
1277 |
workflow.store() |
|
1282 | 1278 | |
1283 | 1279 |
app = login(get_app(pub)) |
1284 | 1280 |
resp = app.get('/backoffice/workflows/1/') |
... | ... | |
1320 | 1316 | |
1321 | 1317 |
def test_workflows_backoffice_fields(pub): |
1322 | 1318 |
create_superuser(pub) |
1323 |
create_role() |
|
1324 | 1319 | |
1325 | 1320 |
Workflow.wipe() |
1326 | 1321 |
workflow = Workflow(name='foo') |
... | ... | |
1428 | 1423 | |
1429 | 1424 |
def test_workflows_functions(pub): |
1430 | 1425 |
create_superuser(pub) |
1431 |
create_role() |
|
1432 | 1426 | |
1433 | 1427 |
Workflow.wipe() |
1434 | 1428 |
workflow = Workflow(name='foo') |
... | ... | |
1484 | 1478 | |
1485 | 1479 |
def test_workflows_functions_vs_visibility(pub): |
1486 | 1480 |
create_superuser(pub) |
1487 |
create_role() |
|
1488 | 1481 | |
1489 | 1482 |
Workflow.wipe() |
1490 | 1483 |
workflow = Workflow(name='foo') |
... | ... | |
1525 | 1518 | |
1526 | 1519 |
def test_workflows_global_actions(pub): |
1527 | 1520 |
create_superuser(pub) |
1528 |
create_role() |
|
1529 | 1521 | |
1530 | 1522 |
Workflow.wipe() |
1531 | 1523 |
workflow = Workflow(name='foo') |
... | ... | |
1573 | 1565 | |
1574 | 1566 |
def test_workflows_global_actions_edit(pub): |
1575 | 1567 |
create_superuser(pub) |
1576 |
create_role() |
|
1577 | 1568 | |
1578 | 1569 |
Workflow.wipe() |
1579 | 1570 |
workflow = Workflow(name='foo') |
... | ... | |
1635 | 1626 | |
1636 | 1627 |
def test_workflows_global_actions_timeout_triggers(pub): |
1637 | 1628 |
create_superuser(pub) |
1638 |
create_role() |
|
1639 | 1629 | |
1640 | 1630 |
Workflow.wipe() |
1641 | 1631 |
workflow = Workflow(name='foo') |
... | ... | |
1687 | 1677 | |
1688 | 1678 |
def test_workflows_global_actions_webservice_trigger(pub): |
1689 | 1679 |
create_superuser(pub) |
1690 |
create_role() |
|
1691 | 1680 | |
1692 | 1681 |
Workflow.wipe() |
1693 | 1682 |
workflow = Workflow(name='foo') |
... | ... | |
1825 | 1814 | |
1826 | 1815 |
def test_workflows_create_formdata(pub): |
1827 | 1816 |
create_superuser(pub) |
1828 |
create_role() |
|
1829 | 1817 | |
1830 | 1818 |
FormDef.wipe() |
1831 | 1819 |
target_formdef = FormDef() |
... | ... | |
2086 | 2074 | |
2087 | 2075 |
def test_workflows_criticality_levels(pub): |
2088 | 2076 |
create_superuser(pub) |
2089 |
create_role() |
|
2090 | 2077 | |
2091 | 2078 |
Workflow.wipe() |
2092 | 2079 |
workflow = Workflow(name='foo') |
... | ... | |
2151 | 2138 | |
2152 | 2139 |
def test_workflows_wscall_label(pub): |
2153 | 2140 |
create_superuser(pub) |
2154 |
create_role() |
|
2155 | 2141 | |
2156 | 2142 |
Workflow.wipe() |
2157 | 2143 |
workflow = Workflow(name='foo') |
... | ... | |
2175 | 2161 |
@pytest.mark.parametrize('value', [True, False]) |
2176 | 2162 |
def test_workflows_wscall_options(pub, value): |
2177 | 2163 |
create_superuser(pub) |
2178 |
create_role() |
|
2179 | 2164 | |
2180 | 2165 |
Workflow.wipe() |
2181 | 2166 |
workflow = Workflow(name='foo') |
... | ... | |
2280 | 2265 | |
2281 | 2266 |
def test_workflows_inspect_view(pub): |
2282 | 2267 |
create_superuser(pub) |
2283 |
role = create_role() |
|
2268 |
pub.role_class.wipe() |
|
2269 |
role = pub.role_class(name='foobar') |
|
2270 |
role.store() |
|
2284 | 2271 | |
2285 | 2272 |
Workflow.wipe() |
2286 | 2273 |
workflow = Workflow(name='foo') |
tests/api/test_access.py | ||
---|---|---|
17 | 17 |
from wcs.qommon.errors import AccessForbiddenError |
18 | 18 |
from wcs.qommon.http_request import HTTPRequest |
19 | 19 |
from wcs.qommon.ident.password_accounts import PasswordAccount |
20 |
from wcs.roles import Role |
|
21 | 20 | |
22 | 21 | |
23 | 22 |
def pytest_generate_tests(metafunc): |
... | ... | |
269 | 268 | |
270 | 269 | |
271 | 270 |
def test_get_user(pub, local_user): |
272 |
Role.wipe()
|
|
273 |
role = Role(name='Foo bar')
|
|
271 |
pub.role_class.wipe()
|
|
272 |
role = pub.role_class(name='Foo bar')
|
|
274 | 273 |
role.store() |
275 | 274 |
local_user.roles = [role.id] |
276 | 275 |
local_user.store() |
... | ... | |
294 | 293 |
resp.form['access_key'] = '5678' |
295 | 294 |
resp = resp.form.submit('submit') |
296 | 295 | |
297 |
Role.wipe()
|
|
298 |
role = Role(name='Foo bar')
|
|
296 |
pub.role_class.wipe()
|
|
297 |
role = pub.role_class(name='Foo bar')
|
|
299 | 298 |
role.store() |
300 | 299 |
local_user.roles = [role.id] |
301 | 300 |
local_user.store() |
tests/api/test_carddef.py | ||
---|---|---|
20 | 20 |
from wcs.data_sources import NamedDataSource |
21 | 21 |
from wcs.qommon.form import PicklableUpload |
22 | 22 |
from wcs.qommon.http_request import HTTPRequest |
23 |
from wcs.roles import Role |
|
24 | 23 |
from wcs.workflows import Workflow, WorkflowBackofficeFieldsFormDef |
25 | 24 | |
26 | 25 |
from .utils import sign_uri |
... | ... | |
67 | 66 | |
68 | 67 | |
69 | 68 |
def test_cards(pub, local_user): |
70 |
Role.wipe()
|
|
71 |
role = Role(name='test')
|
|
69 |
pub.role_class.wipe()
|
|
70 |
role = pub.role_class(name='test')
|
|
72 | 71 |
role.store() |
73 | 72 |
local_user.roles = [role.id] |
74 | 73 |
local_user.store() |
... | ... | |
164 | 163 | |
165 | 164 | |
166 | 165 |
def test_cards_import_csv(pub, local_user): |
167 |
Role.wipe()
|
|
168 |
role = Role(name='test')
|
|
166 |
pub.role_class.wipe()
|
|
167 |
role = pub.role_class(name='test')
|
|
169 | 168 |
role.store() |
170 | 169 |
local_user.roles = [role.id] |
171 | 170 |
local_user.store() |
... | ... | |
204 | 203 | |
205 | 204 | |
206 | 205 |
def test_card_submit(pub, local_user): |
207 |
Role.wipe()
|
|
208 |
role = Role(name='test')
|
|
206 |
pub.role_class.wipe()
|
|
207 |
role = pub.role_class(name='test')
|
|
209 | 208 |
role.store() |
210 | 209 |
local_user.roles = [role.id] |
211 | 210 |
local_user.store() |
... | ... | |
277 | 276 |
data_source.data_source = {'type': 'formula', 'value': 'http://example.com/jsonp'} |
278 | 277 |
data_source.store() |
279 | 278 | |
280 |
Role.wipe()
|
|
281 |
role = Role(name='test')
|
|
279 |
pub.role_class.wipe()
|
|
280 |
role = pub.role_class(name='test')
|
|
282 | 281 |
role.store() |
283 | 282 |
local_user.roles = [role.id] |
284 | 283 |
local_user.store() |
... | ... | |
369 | 368 |
data_source.data_source = {'type': 'formula', 'value': 'http://example.com/jsonp'} |
370 | 369 |
data_source.store() |
371 | 370 | |
372 |
Role.wipe()
|
|
373 |
role = Role(name='test')
|
|
371 |
pub.role_class.wipe()
|
|
372 |
role = pub.role_class(name='test')
|
|
374 | 373 |
role.store() |
375 | 374 |
local_user.roles = [role.id] |
376 | 375 |
local_user.store() |
... | ... | |
482 | 481 | |
483 | 482 | |
484 | 483 |
def test_formdef_submit_structured(pub, local_user): |
485 |
Role.wipe()
|
|
486 |
role = Role(name='test')
|
|
484 |
pub.role_class.wipe()
|
|
485 |
role = pub.role_class(name='test')
|
|
487 | 486 |
role.store() |
488 | 487 |
local_user.roles = [role.id] |
489 | 488 |
local_user.store() |
tests/api/test_category.py | ||
---|---|---|
9 | 9 |
from wcs.categories import Category |
10 | 10 |
from wcs.formdef import FormDef |
11 | 11 |
from wcs.qommon.http_request import HTTPRequest |
12 |
from wcs.roles import Role |
|
13 | 12 | |
14 | 13 |
from .utils import sign_uri |
15 | 14 | |
... | ... | |
205 | 204 |
assert resp.json['err'] == 0 |
206 | 205 |
assert len(resp.json['data']) == 0 |
207 | 206 | |
208 |
Role.wipe()
|
|
209 |
role = Role(name='test')
|
|
207 |
pub.role_class.wipe()
|
|
208 |
role = pub.role_class(name='test')
|
|
210 | 209 |
role.store() |
211 | 210 |
local_user.roles = [] |
212 | 211 |
local_user.store() |
tests/api/test_custom_view.py | ||
---|---|---|
14 | 14 |
from wcs.formdef import FormDef |
15 | 15 |
from wcs.qommon import ods |
16 | 16 |
from wcs.qommon.http_request import HTTPRequest |
17 |
from wcs.roles import Role |
|
18 | 17 | |
19 | 18 |
from .utils import sign_uri |
20 | 19 | |
... | ... | |
61 | 60 | |
62 | 61 |
@pytest.fixture |
63 | 62 |
def test_api_custom_view_access(pub, local_user): |
64 |
Role.wipe()
|
|
65 |
role = Role(name='test')
|
|
63 |
pub.role_class.wipe()
|
|
64 |
role = pub.role_class(name='test')
|
|
66 | 65 |
role.store() |
67 | 66 |
local_user.roles = [role.id] |
68 | 67 |
local_user.store() |
... | ... | |
163 | 162 | |
164 | 163 | |
165 | 164 |
def test_api_list_formdata_custom_view(pub, local_user): |
166 |
Role.wipe()
|
|
167 |
role = Role(name='test')
|
|
165 |
pub.role_class.wipe()
|
|
166 |
role = pub.role_class(name='test')
|
|
168 | 167 |
role.store() |
169 | 168 | |
170 | 169 |
FormDef.wipe() |
... | ... | |
212 | 211 | |
213 | 212 | |
214 | 213 |
def test_api_ods_formdata_custom_view(pub, local_user): |
215 |
Role.wipe()
|
|
216 |
role = Role(name='test')
|
|
214 |
pub.role_class.wipe()
|
|
215 |
role = pub.role_class(name='test')
|
|
217 | 216 |
role.store() |
218 | 217 | |
219 | 218 |
FormDef.wipe() |
... | ... | |
265 | 264 | |
266 | 265 | |
267 | 266 |
def test_api_geojson_formdata_custom_view(pub, local_user): |
268 |
Role.wipe()
|
|
269 |
role = Role(name='test')
|
|
267 |
pub.role_class.wipe()
|
|
268 |
role = pub.role_class(name='test')
|
|
270 | 269 |
role.store() |
271 | 270 | |
272 | 271 |
FormDef.wipe() |
tests/api/test_formdata.py | ||
---|---|---|
23 | 23 |
from wcs.qommon.form import PicklableUpload |
24 | 24 |
from wcs.qommon.http_request import HTTPRequest |
25 | 25 |
from wcs.qommon.ident.password_accounts import PasswordAccount |
26 |
from wcs.roles import Role |
|
27 | 26 |
from wcs.workflows import EditableWorkflowStatusItem, Workflow, WorkflowBackofficeFieldsFormDef |
28 | 27 | |
29 | 28 |
from .utils import sign_uri |
... | ... | |
89 | 88 | |
90 | 89 |
@pytest.fixture |
91 | 90 |
def ics_data(local_user): |
92 |
Role.wipe()
|
|
93 |
role = Role(name='test')
|
|
91 |
get_publisher().role_class.wipe()
|
|
92 |
role = get_publisher().role_class(name='test')
|
|
94 | 93 |
role.store() |
95 | 94 | |
96 | 95 |
FormDef.wipe() |
... | ... | |
150 | 149 |
] |
151 | 150 |
block.store() |
152 | 151 | |
153 |
Role.wipe()
|
|
154 |
role = Role(name='test')
|
|
152 |
pub.role_class.wipe()
|
|
153 |
role = pub.role_class(name='test')
|
|
155 | 154 |
role.id = '123' |
156 | 155 |
role.store() |
157 |
another_role = Role(name='another')
|
|
156 |
another_role = pub.role_class(name='another')
|
|
158 | 157 |
another_role.id = '321' |
159 | 158 |
another_role.store() |
160 | 159 |
FormDef.wipe() |
... | ... | |
280 | 279 | |
281 | 280 | |
282 | 281 |
def test_formdata_duplicated_varnames(pub, local_user): |
283 |
Role.wipe()
|
|
284 |
role = Role(name='test')
|
|
282 |
pub.role_class.wipe()
|
|
283 |
role = pub.role_class(name='test')
|
|
285 | 284 |
role.id = '123' |
286 | 285 |
role.store() |
287 |
another_role = Role(name='another')
|
|
286 |
another_role = pub.role_class(name='another')
|
|
288 | 287 |
another_role.id = '321' |
289 | 288 |
another_role.store() |
290 | 289 |
FormDef.wipe() |
... | ... | |
338 | 337 | |
339 | 338 | |
340 | 339 |
def test_formdata_edit(pub, local_user): |
341 |
Role.wipe()
|
|
342 |
role = Role(name='test')
|
|
340 |
pub.role_class.wipe()
|
|
341 |
role = pub.role_class(name='test')
|
|
343 | 342 |
role.id = '123' |
344 | 343 |
role.store() |
345 |
another_role = Role(name='another')
|
|
344 |
another_role = pub.role_class(name='another')
|
|
346 | 345 |
another_role.id = '321' |
347 | 346 |
another_role.store() |
348 | 347 |
local_user.roles = [role.id] |
... | ... | |
422 | 421 | |
423 | 422 | |
424 | 423 |
def test_formdata_with_workflow_data(pub, local_user): |
425 |
Role.wipe()
|
|
426 |
role = Role(name='test')
|
|
424 |
pub.role_class.wipe()
|
|
425 |
role = pub.role_class(name='test')
|
|
427 | 426 |
role.id = '123' |
428 | 427 |
role.store() |
429 | 428 | |
... | ... | |
465 | 464 | |
466 | 465 | |
467 | 466 |
def test_api_list_formdata(pub, local_user): |
468 |
Role.wipe()
|
|
469 |
role = Role(name='test')
|
|
467 |
pub.role_class.wipe()
|
|
468 |
role = pub.role_class(name='test')
|
|
470 | 469 |
role.store() |
471 | 470 | |
472 | 471 |
FormDef.wipe() |
... | ... | |
629 | 628 | |
630 | 629 | |
631 | 630 |
def test_api_anonymized_formdata(pub, local_user, admin_user): |
632 |
Role.wipe()
|
|
633 |
role = Role(name='test')
|
|
631 |
pub.role_class.wipe()
|
|
632 |
role = pub.role_class(name='test')
|
|
634 | 633 |
role.store() |
635 | 634 | |
636 | 635 |
FormDef.wipe() |
... | ... | |
734 | 733 | |
735 | 734 | |
736 | 735 |
def test_api_geojson_formdata(pub, local_user): |
737 |
Role.wipe()
|
|
738 |
role = Role(name='test')
|
|
736 |
pub.role_class.wipe()
|
|
737 |
role = pub.role_class(name='test')
|
|
739 | 738 |
role.store() |
740 | 739 | |
741 | 740 |
FormDef.wipe() |
... | ... | |
848 | 847 | |
849 | 848 | |
850 | 849 |
def test_api_ods_formdata(pub, local_user): |
851 |
Role.wipe()
|
|
852 |
role = Role(name='test')
|
|
850 |
pub.role_class.wipe()
|
|
851 |
role = pub.role_class(name='test')
|
|
853 | 852 |
role.store() |
854 | 853 | |
855 | 854 |
FormDef.wipe() |
... | ... | |
906 | 905 | |
907 | 906 | |
908 | 907 |
def test_api_global_geojson(pub, local_user): |
909 |
Role.wipe()
|
|
910 |
role = Role(name='test')
|
|
908 |
pub.role_class.wipe()
|
|
909 |
role = pub.role_class(name='test')
|
|
911 | 910 |
role.store() |
912 | 911 | |
913 | 912 |
FormDef.wipe() |
... | ... | |
965 | 964 |
pytest.skip('this requires SQL') |
966 | 965 |
return |
967 | 966 | |
968 |
Role.wipe()
|
|
969 |
role = Role(name='test')
|
|
967 |
pub.role_class.wipe()
|
|
968 |
role = pub.role_class(name='test')
|
|
970 | 969 |
role.store() |
971 | 970 | |
972 | 971 |
# check there's no crash if there are no formdefs |
... | ... | |
1039 | 1038 |
def test_api_global_listing_ignored_roles(pub, local_user): |
1040 | 1039 |
test_api_global_listing(pub, local_user) |
1041 | 1040 | |
1042 |
role = Role(name='test2')
|
|
1041 |
role = pub.role_class(name='test2')
|
|
1043 | 1042 |
role.store() |
1044 | 1043 | |
1045 | 1044 |
formdef = FormDef() |
... | ... | |
1082 | 1081 |
pytest.skip('this requires SQL') |
1083 | 1082 |
return |
1084 | 1083 | |
1085 |
Role.wipe()
|
|
1086 |
role = Role(name='test')
|
|
1084 |
pub.role_class.wipe()
|
|
1085 |
role = pub.role_class(name='test')
|
|
1087 | 1086 |
role.store() |
1088 | 1087 | |
1089 | 1088 |
# add proper role to user |
... | ... | |
1124 | 1123 | |
1125 | 1124 | |
1126 | 1125 |
def test_api_ics_formdata(pub, local_user, ics_data): |
1127 |
role = Role.select()[0]
|
|
1126 |
role = pub.role_class.select()[0]
|
|
1128 | 1127 | |
1129 | 1128 |
# check access is denied if the user has not the appropriate role |
1130 | 1129 |
resp = get_app(pub).get(sign_uri('/api/forms/test/ics/foobar', user=local_user), status=403) |
... | ... | |
1189 | 1188 | |
1190 | 1189 | |
1191 | 1190 |
def test_api_ics_formdata_http_auth(pub, local_user, admin_user, ics_data): |
1192 |
role = Role.select()[0]
|
|
1191 |
role = pub.role_class.select()[0]
|
|
1193 | 1192 | |
1194 | 1193 |
# check as admin |
1195 | 1194 |
app = login(get_app(pub)) |
... | ... | |
1232 | 1231 | |
1233 | 1232 | |
1234 | 1233 |
def test_api_ics_formdata_custom_view(pub, local_user, ics_data): |
1235 |
role = Role.select()[0]
|
|
1234 |
role = pub.role_class.select()[0]
|
|
1236 | 1235 | |
1237 | 1236 |
formdef = FormDef.get_by_urlname('test') |
1238 | 1237 |
tests/api/test_formdef.py | ||
---|---|---|
20 | 20 |
from wcs.formdef import FormDef |
21 | 21 |
from wcs.qommon.form import PicklableUpload |
22 | 22 |
from wcs.qommon.http_request import HTTPRequest |
23 |
from wcs.roles import Role |
|
24 | 23 |
from wcs.wf.jump import JumpWorkflowStatusItem |
25 | 24 |
from wcs.workflows import Workflow, WorkflowBackofficeFieldsFormDef |
26 | 25 | |
... | ... | |
68 | 67 | |
69 | 68 | |
70 | 69 |
def test_formdef_list(pub): |
71 |
Role.wipe()
|
|
72 |
role = Role(name='Foo bar')
|
|
70 |
pub.role_class.wipe()
|
|
71 |
role = pub.role_class(name='Foo bar')
|
|
73 | 72 |
role.id = '14' |
74 | 73 |
role.store() |
75 | 74 | |
... | ... | |
149 | 148 | |
150 | 149 | |
151 | 150 |
def test_limited_formdef_list(pub, local_user): |
152 |
Role.wipe()
|
|
153 |
role = Role(name='Foo bar')
|
|
151 |
pub.role_class.wipe()
|
|
152 |
role = pub.role_class(name='Foo bar')
|
|
154 | 153 |
role.id = '14' |
155 | 154 |
role.store() |
156 | 155 | |
... | ... | |
239 | 238 | |
240 | 239 | |
241 | 240 |
def test_backoffice_submission_formdef_list(pub, local_user): |
242 |
Role.wipe()
|
|
243 |
role = Role(name='Foo bar')
|
|
241 |
pub.role_class.wipe()
|
|
242 |
role = pub.role_class(name='Foo bar')
|
|
244 | 243 |
role.id = '14' |
245 | 244 |
role.store() |
246 | 245 | |
... | ... | |
445 | 444 | |
446 | 445 | |
447 | 446 |
def test_formdef_submit(pub, local_user): |
448 |
Role.wipe()
|
|
449 |
role = Role(name='test')
|
|
447 |
pub.role_class.wipe()
|
|
448 |
role = pub.role_class(name='test')
|
|
450 | 449 |
role.store() |
451 | 450 |
local_user.roles = [role.id] |
452 | 451 |
local_user.store() |
tests/api/test_user.py | ||
---|---|---|
12 | 12 |
from wcs.qommon.form import PicklableUpload |
13 | 13 |
from wcs.qommon.http_request import HTTPRequest |
14 | 14 |
from wcs.qommon.ident.password_accounts import PasswordAccount |
15 |
from wcs.roles import Role |
|
16 | 15 |
from wcs.workflows import Workflow, WorkflowVariablesFieldsFormDef |
17 | 16 | |
18 | 17 |
from .utils import sign_uri |
... | ... | |
77 | 76 | |
78 | 77 | |
79 | 78 |
def test_roles(pub, local_user): |
80 |
Role.wipe()
|
|
81 |
role = Role(name='Hello World')
|
|
79 |
pub.role_class.wipe()
|
|
80 |
role = pub.role_class(name='Hello World')
|
|
82 | 81 |
role.emails = ['toto@example.com', 'zozo@example.com'] |
83 | 82 |
role.details = 'kouign amann' |
84 | 83 |
role.store() |
... | ... | |
109 | 108 |
assert resp.json['data'][0]['user_email'] == local_user.email |
110 | 109 |
assert resp.json['data'][0]['user_id'] == local_user.id |
111 | 110 | |
112 |
role = Role(name='Foo bar')
|
|
111 |
role = pub.role_class(name='Foo bar')
|
|
113 | 112 |
role.store() |
114 | 113 |
local_user.roles = [role.id] |
115 | 114 |
local_user.store() |
... | ... | |
165 | 164 |
def test_user_roles(pub, local_user): |
166 | 165 |
local_user.name_identifiers = ['xyz'] |
167 | 166 |
local_user.store() |
168 |
role = Role(name='Foo bar')
|
|
167 |
role = pub.role_class(name='Foo bar')
|
|
169 | 168 |
role.store() |
170 | 169 |
local_user.roles = [role.id] |
171 | 170 |
local_user.store() |
... | ... | |
339 | 338 | |
340 | 339 | |
341 | 340 |
def test_user_forms_from_agent(pub, local_user): |
342 |
Role.wipe()
|
|
343 |
role = Role(name='Foo bar')
|
|
341 |
pub.role_class.wipe()
|
|
342 |
role = pub.role_class(name='Foo bar')
|
|
344 | 343 |
role.store() |
345 | 344 | |
346 | 345 |
agent_user = get_publisher().user_class() |
tests/api/test_workflow.py | ||
---|---|---|
10 | 10 |
from wcs.formdef import FormDef |
11 | 11 |
from wcs.qommon.http_request import HTTPRequest |
12 | 12 |
from wcs.qommon.ident.password_accounts import PasswordAccount |
13 |
from wcs.roles import Role |
|
14 | 13 |
from wcs.wf.jump import JumpWorkflowStatusItem |
15 | 14 |
from wcs.wf.register_comment import RegisterCommenterWorkflowStatusItem |
16 | 15 |
from wcs.workflows import Workflow |
... | ... | |
108 | 107 |
get_app(pub).post(sign_uri(formdata.get_url() + 'jump/trigger/XXX/'), status=200) |
109 | 108 |
assert formdef.data_class().get(formdata.id).status == 'wf-st2' |
110 | 109 | |
111 |
Role.wipe()
|
|
112 |
role = Role(name='xxx')
|
|
110 |
pub.role_class.wipe()
|
|
111 |
role = pub.role_class(name='xxx')
|
|
113 | 112 |
role.store() |
114 | 113 | |
115 | 114 |
jump.by = [role.id] |
... | ... | |
318 | 317 |
add_to_journal.comment = 'HELLO WORLD 4' |
319 | 318 |
trigger.roles = ['logged-users'] |
320 | 319 |
workflow.store() |
321 |
Role.wipe()
|
|
322 |
role = Role(name='xxx')
|
|
320 |
pub.role_class.wipe()
|
|
321 |
role = pub.role_class(name='xxx')
|
|
323 | 322 |
role.store() |
324 | 323 |
trigger.roles = [role.id] |
325 | 324 |
workflow.store() |
tests/backoffice_pages/test_all.py | ||
---|---|---|
20 | 20 |
from wcs.qommon.ident.password_accounts import PasswordAccount |
21 | 21 |
from wcs.qommon.http_request import HTTPRequest |
22 | 22 |
import wcs.qommon.storage as st |
23 |
from wcs.roles import Role, logged_users_role
|
|
23 |
from wcs.roles import logged_users_role |
|
24 | 24 |
from wcs.workflows import ( |
25 | 25 |
Workflow, |
26 | 26 |
CommentableWorkflowStatusItem, |
... | ... | |
84 | 84 |
if user.name == 'admin': |
85 | 85 |
user1 = user |
86 | 86 |
user1.is_admin = is_admin |
87 |
user1.roles = [x.id for x in Role.select() if x.name == 'foobar']
|
|
87 |
user1.roles = [x.id for x in pub.role_class.select() if x.name == 'foobar']
|
|
88 | 88 |
user1.store() |
89 | 89 |
elif user.email == 'jean.darmette@triffouilis.fr': |
90 | 90 |
pass # don't remove user created by local_user fixture |
... | ... | |
102 | 102 |
account1.user_id = user1.id |
103 | 103 |
account1.store() |
104 | 104 | |
105 |
Role.wipe()
|
|
106 |
role = Role(name='foobar')
|
|
105 |
pub.role_class.wipe()
|
|
106 |
role = pub.role_class(name='foobar')
|
|
107 | 107 |
role.store() |
108 | 108 | |
109 | 109 |
user1.roles = [role.id] |
... | ... | |
223 | 223 |
assert 'Forms' not in resp.text |
224 | 224 |
assert 'Workflows' not in resp.text |
225 | 225 | |
226 |
pub.cfg['admin-permissions'] = {'forms': [x.id for x in Role.select()]}
|
|
226 |
pub.cfg['admin-permissions'] = {'forms': [x.id for x in pub.role_class.select()]}
|
|
227 | 227 |
pub.write_cfg() |
228 | 228 |
resp = app.get('/backoffice/') |
229 | 229 |
assert 'Management' in resp.text |
230 | 230 |
assert 'Forms' in resp.text |
231 | 231 |
assert 'Workflows' not in resp.text |
232 | 232 | |
233 |
pub.cfg['admin-permissions'] = {'workflows': [x.id for x in Role.select()]}
|
|
233 |
pub.cfg['admin-permissions'] = {'workflows': [x.id for x in pub.role_class.select()]}
|
|
234 | 234 |
pub.write_cfg() |
235 | 235 |
resp = app.get('/backoffice/') |
236 | 236 |
assert 'Management' in resp.text |
... | ... | |
238 | 238 |
assert 'Workflows' in resp.text |
239 | 239 | |
240 | 240 |
# check role id int->str migration |
241 |
pub.cfg['admin-permissions'] = {'workflows': [int(x.id) for x in Role.select()]}
|
|
241 |
pub.cfg['admin-permissions'] = {'workflows': [int(x.id) for x in pub.role_class.select()]}
|
|
242 | 242 |
pub.write_cfg() |
243 | 243 |
resp = app.get('/backoffice/') |
244 | 244 |
assert 'Management' in resp.text |
... | ... | |
1524 | 1524 |
resp = app.get('/backoffice/management/form-title/') |
1525 | 1525 |
assert 'id="multi-actions"' in resp.text |
1526 | 1526 | |
1527 |
trigger.roles = [x.id for x in Role.select() if x.name == 'foobar']
|
|
1527 |
trigger.roles = [x.id for x in pub.role_class.select() if x.name == 'foobar']
|
|
1528 | 1528 |
workflow.store() |
1529 | 1529 | |
1530 | 1530 |
resp = app.get('/backoffice/management/form-title/') |
... | ... | |
1683 | 1683 |
register_comment.parent = workflow.possible_status[2] |
1684 | 1684 | |
1685 | 1685 |
trigger = action.triggers[0] |
1686 |
trigger.roles = [x.id for x in Role.select() if x.name == 'foobar']
|
|
1686 |
trigger.roles = [x.id for x in pub.role_class.select() if x.name == 'foobar']
|
|
1687 | 1687 | |
1688 | 1688 |
workflow.store() |
1689 | 1689 |
formdef.workflow_id = workflow.id |
... | ... | |
1728 | 1728 |
register_comment = action.append_item('register-comment') |
1729 | 1729 |
register_comment.comment = 'session_user={{session_user}}' |
1730 | 1730 |
trigger = action.triggers[0] |
1731 |
trigger.roles = [x.id for x in Role.select() if x.name == 'foobar']
|
|
1731 |
trigger.roles = [x.id for x in pub.role_class.select() if x.name == 'foobar']
|
|
1732 | 1732 | |
1733 | 1733 |
workflow.store() |
1734 | 1734 |
formdef.workflow_id = workflow.id |
... | ... | |
1970 | 1970 |
jump = action.append_item('jump') |
1971 | 1971 |
jump.status = 'finished' |
1972 | 1972 |
trigger = action.triggers[0] |
1973 |
trigger.roles = [x.id for x in Role.select() if x.name == 'foobar']
|
|
1973 |
trigger.roles = [x.id for x in pub.role_class.select() if x.name == 'foobar']
|
|
1974 | 1974 | |
1975 | 1975 |
workflow.store() |
1976 | 1976 |
formdef.workflow_id = workflow.id |
... | ... | |
2004 | 2004 |
action = workflow.add_global_action('FOOBAR') |
2005 | 2005 |
remove = action.append_item('remove') |
2006 | 2006 |
trigger = action.triggers[0] |
2007 |
trigger.roles = [x.id for x in Role.select() if x.name == 'foobar']
|
|
2007 |
trigger.roles = [x.id for x in pub.role_class.select() if x.name == 'foobar']
|
|
2008 | 2008 | |
2009 | 2009 |
workflow.store() |
2010 | 2010 |
formdef.workflow_id = workflow.id |
... | ... | |
2207 | 2207 | |
2208 | 2208 |
# info text is not visible if form is locked |
2209 | 2209 |
second_user = pub.user_class(name='foobar') |
2210 |
second_user.roles = Role.keys()
|
|
2210 |
second_user.roles = pub.role_class.keys()
|
|
2211 | 2211 |
second_user.store() |
2212 | 2212 |
account = PasswordAccount(id='foobar') |
2213 | 2213 |
account.set_password('foobar') |
... | ... | |
2248 | 2248 |
# check a formdata that has been dispatched to another role is accessible |
2249 | 2249 |
# by an user with that role. |
2250 | 2250 |
user1 = create_user(pub) |
2251 |
role = Role(name='foobaz')
|
|
2251 |
role = pub.role_class(name='foobaz')
|
|
2252 | 2252 |
role.store() |
2253 | 2253 |
user1.roles = [role.id] |
2254 | 2254 |
user1.store() |
... | ... | |
3044 | 3044 | |
3045 | 3045 |
# change role handling a formdef, make sure they do not appear anylonger in |
3046 | 3046 |
# the all/done views. |
3047 |
role = Role(name='whatever')
|
|
3047 |
role = pub.role_class(name='whatever')
|
|
3048 | 3048 |
role.store() |
3049 | 3049 |
formdef = FormDef.get_by_urlname('form-title') |
3050 | 3050 |
formdef.workflow_roles = {'_receiver': role.id} |
... | ... | |
3759 | 3759 |
create_environment(pub) |
3760 | 3760 | |
3761 | 3761 |
second_user = pub.user_class(name='foobar') |
3762 |
second_user.roles = Role.keys()
|
|
3762 |
second_user.roles = pub.role_class.keys()
|
|
3763 | 3763 |
second_user.store() |
3764 | 3764 |
account = PasswordAccount(id='foobar') |
3765 | 3765 |
account.set_password('foobar') |
... | ... | |
3837 | 3837 |
formdatas = formdef.data_class().select(lambda x: x.status == 'wf-new') |
3838 | 3838 | |
3839 | 3839 |
second_user = pub.user_class(name='foobar') |
3840 |
second_user.roles = Role.keys()
|
|
3840 |
second_user.roles = pub.role_class.keys()
|
|
3841 | 3841 |
second_user.store() |
3842 | 3842 |
account = PasswordAccount(id='foobar') |
3843 | 3843 |
account.set_password('foobar') |
... | ... | |
4449 | 4449 |
# check functions |
4450 | 4450 |
assert re.findall('Recipient.*foobar', resp.text) |
4451 | 4451 | |
4452 |
role = Role(name='plop')
|
|
4452 |
role = pub.role_class(name='plop')
|
|
4453 | 4453 |
role.store() |
4454 | 4454 |
formdata.workflow_roles = {'_receiver': role.id} |
4455 | 4455 |
formdata.store() |
... | ... | |
5634 | 5634 | |
5635 | 5635 | |
5636 | 5636 |
def test_lazy_eval_with_conditional_workflow_form(pub): |
5637 |
role = Role(name='foobar')
|
|
5637 |
role = pub.role_class(name='foobar')
|
|
5638 | 5638 |
role.store() |
5639 | 5639 |
user = create_user(pub) |
5640 | 5640 |
tests/backoffice_pages/test_custom_view.py | ||
---|---|---|
9 | 9 |
from wcs.qommon.http_request import HTTPRequest |
10 | 10 |
from wcs.carddef import CardDef |
11 | 11 |
from wcs.formdef import FormDef |
12 |
from wcs.roles import Role |
|
13 | 12 |
from wcs import fields |
14 | 13 | |
15 | 14 |
from utilities import get_app, login, create_temporary_pub, clean_temporary_pub |
... | ... | |
597 | 596 | |
598 | 597 |
def test_carddata_custom_view_is_default(pub): |
599 | 598 |
user = create_superuser(pub) |
600 |
Role.wipe()
|
|
601 |
role = Role(name='foobar')
|
|
599 |
pub.role_class.wipe()
|
|
600 |
role = pub.role_class(name='foobar')
|
|
602 | 601 |
role.store() |
603 | 602 |
user.roles = [role.id] |
604 | 603 |
user.store() |
tests/form_pages/test_all.py | ||
---|---|---|
45 | 45 |
from wcs.wf.redirect_to_url import RedirectToUrlWorkflowStatusItem |
46 | 46 |
from wcs.blocks import BlockDef |
47 | 47 |
from wcs.categories import Category |
48 |
from wcs.roles import Role, logged_users_role
|
|
48 |
from wcs.roles import logged_users_role |
|
49 | 49 |
from wcs.tracking_code import TrackingCode |
50 | 50 |
from wcs.data_sources import NamedDataSource |
51 | 51 |
from wcs import fields |
... | ... | |
356 | 356 |
formdef = create_formdef() |
357 | 357 |
get_app(pub).get('/test/', status=200) |
358 | 358 | |
359 |
Role.wipe()
|
|
360 |
role = Role(name='xxx')
|
|
359 |
pub.role_class.wipe()
|
|
360 |
role = pub.role_class(name='xxx')
|
|
361 | 361 |
role.store() |
362 | 362 | |
363 | 363 |
# check a formdef protected by a role cannot be accessed |
... | ... | |
594 | 594 | |
595 | 595 | |
596 | 596 |
def test_form_submit_handling_role_info(pub): |
597 |
role = Role(name='xxx')
|
|
597 |
role = pub.role_class(name='xxx')
|
|
598 | 598 |
role.details = 'Managing service' |
599 | 599 |
role.store() |
600 | 600 |
formdef = create_formdef() |
... | ... | |
4665 | 4665 |
resp = login(app, username='foo', password='foo').get('/') |
4666 | 4666 |
resp = app.post(formdata.get_url() + 'jump/trigger/XXX', status=403) |
4667 | 4667 | |
4668 |
Role.wipe()
|
|
4669 |
role = Role(name='xxx')
|
|
4668 |
pub.role_class.wipe()
|
|
4669 |
role = pub.role_class(name='xxx')
|
|
4670 | 4670 |
role.store() |
4671 | 4671 | |
4672 | 4672 |
jump.by = [role.id] |
... | ... | |
4729 | 4729 | |
4730 | 4730 |
app = get_app(pub) |
4731 | 4731 | |
4732 |
Role.wipe()
|
|
4733 |
role = Role(name='xxx')
|
|
4732 |
pub.role_class.wipe()
|
|
4733 |
role = pub.role_class(name='xxx')
|
|
4734 | 4734 |
role.allows_backoffice_access = False |
4735 | 4735 |
role.store() |
4736 | 4736 | |
... | ... | |
4795 | 4795 |
pub.session_manager.session_class.wipe() |
4796 | 4796 |
user = create_user(pub) |
4797 | 4797 | |
4798 |
role = Role(name='xxx')
|
|
4798 |
role = pub.role_class(name='xxx')
|
|
4799 | 4799 |
role.store() |
4800 | 4800 |
user.roles = [role.id] |
4801 | 4801 |
user.store() |
... | ... | |
5232 | 5232 | |
5233 | 5233 | |
5234 | 5234 |
def test_item_field_from_custom_view_on_cards(pub): |
5235 |
Role.wipe()
|
|
5235 |
pub.role_class.wipe()
|
|
5236 | 5236 |
pub.custom_view_class.wipe() |
5237 | 5237 | |
5238 | 5238 |
user = create_user(pub) |
5239 |
role = Role(name='xxx')
|
|
5239 |
role = pub.role_class(name='xxx')
|
|
5240 | 5240 |
role.store() |
5241 | 5241 |
user.roles = [role.id] |
5242 | 5242 |
user.is_admin = True |
... | ... | |
5311 | 5311 |
pytest.skip('this requires SQL') |
5312 | 5312 |
return |
5313 | 5313 | |
5314 |
Role.wipe()
|
|
5314 |
pub.role_class.wipe()
|
|
5315 | 5315 |
pub.custom_view_class.wipe() |
5316 | 5316 | |
5317 | 5317 |
user = create_user(pub) |
5318 |
role = Role(name='xxx')
|
|
5318 |
role = pub.role_class(name='xxx')
|
|
5319 | 5319 |
role.store() |
5320 | 5320 |
user.roles = [role.id] |
5321 | 5321 |
user.is_admin = True |
... | ... | |
6609 | 6609 |
def test_manager_public_access(pub): |
6610 | 6610 |
user, manager = create_user_and_admin(pub) |
6611 | 6611 | |
6612 |
Role.wipe()
|
|
6613 |
role = Role(name='xxx')
|
|
6612 |
pub.role_class.wipe()
|
|
6613 |
role = pub.role_class(name='xxx')
|
|
6614 | 6614 |
role.store() |
6615 | 6615 | |
6616 | 6616 |
manager.is_admin = False |
tests/form_pages/test_formdata.py | ||
---|---|---|
19 | 19 |
from wcs.data_sources import NamedDataSource |
20 | 20 |
from wcs.formdef import FormDef |
21 | 21 |
from wcs.qommon.form import UploadedFile |
22 |
from wcs.roles import Role |
|
23 | 22 |
from wcs.wf.attachment import AddAttachmentWorkflowStatusItem |
24 | 23 |
from wcs.wf.export_to_model import ExportToModel, transform_to_pdf |
25 | 24 |
from wcs.wf.form import FormWorkflowStatusItem, WorkflowFormFieldsFormDef |
... | ... | |
921 | 920 |
def test_formdata_generated_document_in_private_history(pub): |
922 | 921 |
user = create_user(pub) |
923 | 922 | |
924 |
Role.wipe()
|
|
925 |
role = Role(name='xxx')
|
|
923 |
pub.role_class.wipe()
|
|
924 |
role = pub.role_class(name='xxx')
|
|
926 | 925 |
role.store() |
927 | 926 | |
928 | 927 |
user.roles = [role.id] |
... | ... | |
1510 | 1509 |
def test_formdata_evolution_registercommenter_to(pub): |
1511 | 1510 |
user = create_user(pub) |
1512 | 1511 | |
1513 |
Role.wipe()
|
|
1514 |
role1 = Role(name='role the user does not have')
|
|
1512 |
pub.role_class.wipe()
|
|
1513 |
role1 = pub.role_class(name='role the user does not have')
|
|
1515 | 1514 |
role1.store() |
1516 |
role2 = Role(name='role the user does have')
|
|
1515 |
role2 = pub.role_class(name='role the user does have')
|
|
1517 | 1516 |
role2.store() |
1518 | 1517 |
user.roles = [role2.id] |
1519 | 1518 |
user.store() |
tests/test_formdata.py | ||
---|---|---|
19 | 19 |
from wcs.conditions import Condition |
20 | 20 |
from wcs.formdef import FormDef |
21 | 21 |
from wcs.formdata import Evolution |
22 |
from wcs.roles import Role |
|
23 | 22 |
from wcs import sessions |
24 | 23 |
from wcs.variables import LazyFormData |
25 | 24 |
from wcs.workflows import ( |
... | ... | |
598 | 597 |
user.name_identifiers = ['....'] |
599 | 598 |
user.store() |
600 | 599 | |
601 |
role = Role(name='foobar')
|
|
600 |
role = pub.role_class(name='foobar')
|
|
602 | 601 |
role.store() |
603 | 602 | |
604 | 603 |
FormDef.wipe() |
... | ... | |
1732 | 1731 |
condition = Condition({'type': 'django', 'value': 'form_user|has_role:form_role_receiver_name'}) |
1733 | 1732 |
assert condition.evaluate() is False |
1734 | 1733 | |
1735 |
role = Role.select()[0]
|
|
1734 |
role = pub.role_class.select()[0]
|
|
1736 | 1735 |
user = pub.user_class.select()[0] |
1737 | 1736 |
user.roles = [role.id, '42'] # role.id 42 does not exist |
1738 | 1737 |
user.store() |
... | ... | |
1759 | 1758 |
condition = Condition({'type': 'django', 'value': 'form_role_receiver_name in form_user|roles'}) |
1760 | 1759 |
assert condition.evaluate() is False |
1761 | 1760 | |
1762 |
role = Role.select()[0]
|
|
1761 |
role = pub.role_class.select()[0]
|
|
1763 | 1762 |
user = pub.user_class.select()[0] |
1764 | 1763 |
user.roles = [role.id, '42'] # role.id 42 does not exist |
1765 | 1764 |
user.store() |
tests/test_formdef_import.py | ||
---|---|---|
12 | 12 |
from wcs.carddef import CardDef |
13 | 13 |
from wcs.formdef import FormDef, fields, FormdefImportError |
14 | 14 |
from wcs.workflows import Workflow |
15 |
from wcs.roles import Role |
|
16 | 15 |
from wcs.qommon.misc import indent_xml as indent |
17 | 16 | |
18 | 17 |
from utilities import create_temporary_pub |
... | ... | |
506 | 505 | |
507 | 506 | |
508 | 507 |
def test_workflow_roles(): |
509 |
Role.wipe()
|
|
510 |
role = Role(name='blah')
|
|
508 |
pub.role_class.wipe()
|
|
509 |
role = pub.role_class(name='blah')
|
|
511 | 510 |
role.store() |
512 | 511 | |
513 | 512 |
formdef = FormDef() |
... | ... | |
549 | 548 | |
550 | 549 | |
551 | 550 |
def test_user_roles(): |
552 |
Role.wipe()
|
|
551 |
pub.role_class.wipe()
|
|
553 | 552 | |
554 |
role = Role(name='blah')
|
|
553 |
role = pub.role_class(name='blah')
|
|
555 | 554 |
role.store() |
556 | 555 | |
557 | 556 |
formdef = FormDef() |
... | ... | |
576 | 575 | |
577 | 576 | |
578 | 577 |
def test_backoffice_submission_roles(): |
579 |
Role.wipe()
|
|
578 |
pub.role_class.wipe()
|
|
580 | 579 | |
581 |
role = Role(name='blah')
|
|
580 |
role = pub.role_class(name='blah')
|
|
582 | 581 |
role.store() |
583 | 582 | |
584 | 583 |
formdef = FormDef() |
tests/test_hobo_notify.py | ||
---|---|---|
1 | 1 |
# -*- coding: utf-8 -*- |
2 |
import shutil |
|
3 |
from quixote import cleanup |
|
2 |
import uuid |
|
4 | 3 | |
5 | 4 |
from wcs.qommon import force_str |
6 | 5 |
from wcs.qommon.http_request import HTTPRequest |
7 | 6 |
from wcs.ctl.hobo_notify import CmdHoboNotify |
8 |
from wcs.roles import Role |
|
9 | 7 |
from wcs.qommon import storage as st |
10 | 8 | |
11 |
from utilities import create_temporary_pub, clean_temporary_pub
|
|
9 |
from utilities import create_temporary_pub |
|
12 | 10 | |
13 | 11 |
import pytest |
14 | 12 | |
... | ... | |
27 | 25 |
pub.cfg['sp'] = {'saml2_providerid': 'test'} |
28 | 26 |
pub.write_cfg() |
29 | 27 | |
30 |
Role.wipe()
|
|
31 |
r = Role(name='Service étt civil')
|
|
28 |
pub.role_class.wipe()
|
|
29 |
r = pub.role_class(name='Service étt civil')
|
|
32 | 30 |
r.slug = 'service-ett-civil' |
33 | 31 |
r.store() |
34 | 32 | |
... | ... | |
36 | 34 | |
37 | 35 | |
38 | 36 |
def test_process_notification_role_wrong_audience(pub): |
39 |
User = pub.user_class |
|
40 | ||
41 | 37 |
notification = { |
42 | 38 |
'@type': u'provision', |
43 | 39 |
'audience': [u'coin'], |
... | ... | |
49 | 45 |
'name': u'Service enfance', |
50 | 46 |
'slug': u'service-enfance', |
51 | 47 |
'description': u'Rôle du service petite enfance', |
52 |
'uuid': u'12345',
|
|
48 |
'uuid': str(uuid.uuid4()),
|
|
53 | 49 |
'emails': [u'petite-enfance@example.com'], |
54 | 50 |
'emails_to_members': False, |
55 | 51 |
}, |
... | ... | |
58 | 54 |
'name': u'Service état civil', |
59 | 55 |
'slug': u'service-etat-civil', |
60 | 56 |
'description': u'Rôle du service état civil', |
61 |
'uuid': u'xyz',
|
|
57 |
'uuid': str(uuid.uuid4()),
|
|
62 | 58 |
'emails': [u'etat-civil@example.com'], |
63 | 59 |
'emails_to_members': True, |
64 | 60 |
}, |
65 | 61 |
], |
66 | 62 |
}, |
67 | 63 |
} |
68 |
assert Role.count() == 1
|
|
69 |
assert Role.select()[0].name == 'Service étt civil'
|
|
70 |
assert Role.select()[0].slug == 'service-ett-civil'
|
|
71 |
assert Role.select()[0].details is None
|
|
72 |
assert Role.select()[0].emails is None
|
|
73 |
assert Role.select()[0].emails_to_members is False
|
|
64 |
assert pub.role_class.count() == 1
|
|
65 |
assert pub.role_class.select()[0].name == 'Service étt civil'
|
|
66 |
assert pub.role_class.select()[0].slug == 'service-ett-civil'
|
|
67 |
assert pub.role_class.select()[0].details is None
|
|
68 |
assert pub.role_class.select()[0].emails is None
|
|
69 |
assert pub.role_class.select()[0].emails_to_members is False
|
|
74 | 70 |
CmdHoboNotify.process_notification(notification) |
75 |
assert Role.count() == 1
|
|
76 |
assert Role.select()[0].name == 'Service étt civil'
|
|
77 |
assert Role.select()[0].slug == 'service-ett-civil'
|
|
78 |
assert Role.select()[0].details is None
|
|
79 |
assert Role.select()[0].emails is None
|
|
80 |
assert Role.select()[0].emails_to_members is False
|
|
71 |
assert pub.role_class.count() == 1
|
|
72 |
assert pub.role_class.select()[0].name == 'Service étt civil'
|
|
73 |
assert pub.role_class.select()[0].slug == 'service-ett-civil'
|
|
74 |
assert pub.role_class.select()[0].details is None
|
|
75 |
assert pub.role_class.select()[0].emails is None
|
|
76 |
assert pub.role_class.select()[0].emails_to_members is False
|
|
81 | 77 | |
82 | 78 | |
83 | 79 |
def test_process_notification_role(pub): |
84 |
User = pub.user_class
|
|
85 | ||
80 |
uuid1 = str(uuid.uuid4())
|
|
81 |
uuid2 = str(uuid.uuid4()) |
|
86 | 82 |
notification = { |
87 | 83 |
'@type': u'provision', |
88 | 84 |
'audience': [u'test'], |
... | ... | |
94 | 90 |
'name': u'Service enfance', |
95 | 91 |
'slug': u'service-enfance', |
96 | 92 |
'details': u'Rôle du service petite enfance', |
97 |
'uuid': u'12345',
|
|
93 |
'uuid': uuid1,
|
|
98 | 94 |
'emails': [u'petite-enfance@example.com'], |
99 | 95 |
'emails_to_members': False, |
100 | 96 |
}, |
... | ... | |
102 | 98 |
'name': u'Service état civil', |
103 | 99 |
'slug': u'service-ett-civil', |
104 | 100 |
'details': u'Rôle du service état civil', |
105 |
'uuid': u'xyz',
|
|
101 |
'uuid': uuid2,
|
|
106 | 102 |
'emails': [u'etat-civil@example.com'], |
107 | 103 |
'emails_to_members': True, |
108 | 104 |
}, |
109 | 105 |
], |
110 | 106 |
}, |
111 | 107 |
} |
112 |
assert Role.count() == 1
|
|
113 |
assert Role.select()[0].name == 'Service étt civil'
|
|
114 |
assert Role.select()[0].slug == 'service-ett-civil'
|
|
115 |
assert Role.select()[0].details is None
|
|
116 |
assert Role.select()[0].emails is None
|
|
117 |
assert Role.select()[0].emails_to_members is False
|
|
118 |
existing_role_id = Role.select()[0].id
|
|
108 |
assert pub.role_class.count() == 1
|
|
109 |
assert pub.role_class.select()[0].name == 'Service étt civil'
|
|
110 |
assert pub.role_class.select()[0].slug == 'service-ett-civil'
|
|
111 |
assert pub.role_class.select()[0].details is None
|
|
112 |
assert pub.role_class.select()[0].emails is None
|
|
113 |
assert pub.role_class.select()[0].emails_to_members is False
|
|
114 |
existing_role_id = pub.role_class.select()[0].id
|
|
119 | 115 |
CmdHoboNotify.process_notification(notification) |
120 |
assert Role.count() == 2
|
|
121 |
old_role = Role.get(existing_role_id)
|
|
116 |
assert pub.role_class.count() == 2
|
|
117 |
old_role = pub.role_class.get(existing_role_id)
|
|
122 | 118 |
assert old_role.name == 'Service état civil' |
123 |
assert old_role.uuid == 'xyz'
|
|
119 |
assert old_role.uuid == uuid2
|
|
124 | 120 |
assert old_role.slug == 'service-ett-civil' |
125 | 121 |
assert old_role.details == "Rôle du service état civil" |
126 | 122 |
assert old_role.emails == ['etat-civil@example.com'] |
127 | 123 |
assert old_role.emails_to_members is True |
128 |
new_role = Role.get_on_index('12345', 'uuid')
|
|
129 |
assert new_role.id == '12345'
|
|
124 |
new_role = pub.role_class.get_on_index(uuid1, 'uuid')
|
|
125 |
assert new_role.id == uuid1
|
|
130 | 126 |
assert new_role.name == 'Service enfance' |
131 | 127 |
assert new_role.slug == 'service-enfance' |
132 |
assert new_role.uuid == '12345'
|
|
128 |
assert new_role.uuid == uuid1
|
|
133 | 129 |
assert new_role.details == "Rôle du service petite enfance" |
134 | 130 |
assert new_role.emails == ['petite-enfance@example.com'] |
135 | 131 |
assert new_role.emails_to_members is False |
... | ... | |
145 | 141 |
'name': u'Service enfance', |
146 | 142 |
'slug': u'service-enfance', |
147 | 143 |
'description': u'Rôle du service petite enfance', |
148 |
'uuid': u'12345',
|
|
144 |
'uuid': uuid1,
|
|
149 | 145 |
'emails': [u'petite-enfance@example.com'], |
150 | 146 |
'emails_to_members': True, |
151 | 147 |
}, |
... | ... | |
153 | 149 |
}, |
154 | 150 |
} |
155 | 151 |
CmdHoboNotify.process_notification(notification) |
156 |
assert Role.count() == 1
|
|
157 |
assert Role.select()[0].id == new_role.id
|
|
158 |
assert Role.select()[0].uuid == '12345'
|
|
159 |
assert Role.select()[0].name == 'Service enfance'
|
|
160 |
assert Role.select()[0].slug == 'service-enfance'
|
|
161 |
assert Role.select()[0].details is None
|
|
162 |
assert Role.select()[0].emails == ['petite-enfance@example.com']
|
|
163 |
assert Role.select()[0].emails_to_members is True
|
|
152 |
assert pub.role_class.count() == 1
|
|
153 |
assert pub.role_class.select()[0].id == new_role.id
|
|
154 |
assert pub.role_class.select()[0].uuid == uuid1
|
|
155 |
assert pub.role_class.select()[0].name == 'Service enfance'
|
|
156 |
assert pub.role_class.select()[0].slug == 'service-enfance'
|
|
157 |
assert pub.role_class.select()[0].details is None
|
|
158 |
assert pub.role_class.select()[0].emails == ['petite-enfance@example.com']
|
|
159 |
assert pub.role_class.select()[0].emails_to_members is True
|
|
164 | 160 | |
165 | 161 | |
166 | 162 |
def test_process_notification_internal_role(pub): |
167 |
Role.wipe()
|
|
163 |
pub.role_class.wipe()
|
|
168 | 164 | |
169 | 165 |
notification = { |
170 | 166 |
'@type': u'provision', |
... | ... | |
177 | 173 |
'name': u'Service enfance', |
178 | 174 |
'slug': u'_service-enfance', |
179 | 175 |
'details': u'Rôle du service petite enfance', |
180 |
'uuid': u'12345',
|
|
176 |
'uuid': str(uuid.uuid4()),
|
|
181 | 177 |
'emails': [u'petite-enfance@example.com'], |
182 | 178 |
'emails_to_members': False, |
183 | 179 |
}, |
... | ... | |
185 | 181 |
}, |
186 | 182 |
} |
187 | 183 |
CmdHoboNotify.process_notification(notification) |
188 |
assert Role.count() == 1
|
|
189 |
role = Role.select()[0]
|
|
184 |
assert pub.role_class.count() == 1
|
|
185 |
role = pub.role_class.select()[0]
|
|
190 | 186 |
assert role.is_internal() |
191 | 187 | |
192 | 188 | |
193 | 189 |
def test_process_notification_role_description(pub): |
194 |
User = pub.user_class |
|
195 | ||
196 | 190 |
# check descriptions are not used to fill role.details |
191 |
uuid1 = str(uuid.uuid4()) |
|
192 |
uuid2 = str(uuid.uuid4()) |
|
197 | 193 |
notification = { |
198 | 194 |
'@type': u'provision', |
199 | 195 |
'audience': [u'test'], |
... | ... | |
205 | 201 |
'name': u'Service enfance', |
206 | 202 |
'slug': u'service-enfance', |
207 | 203 |
'description': u'Rôle du service petite enfance', |
208 |
'uuid': u'12345',
|
|
204 |
'uuid': uuid1,
|
|
209 | 205 |
'emails': [u'petite-enfance@example.com'], |
210 | 206 |
'emails_to_members': False, |
211 | 207 |
}, |
... | ... | |
213 | 209 |
'name': u'Service état civil', |
214 | 210 |
'slug': u'service-ett-civil', |
215 | 211 |
'description': u'Rôle du service état civil', |
216 |
'uuid': u'xyz',
|
|
212 |
'uuid': uuid2,
|
|
217 | 213 |
'emails': [u'etat-civil@example.com'], |
218 | 214 |
'emails_to_members': True, |
219 | 215 |
}, |
220 | 216 |
], |
221 | 217 |
}, |
222 | 218 |
} |
223 |
assert Role.count() == 1
|
|
224 |
assert Role.select()[0].name == 'Service étt civil'
|
|
225 |
assert Role.select()[0].slug == 'service-ett-civil'
|
|
226 |
assert Role.select()[0].details is None
|
|
227 |
assert Role.select()[0].emails is None
|
|
228 |
assert Role.select()[0].emails_to_members is False
|
|
229 |
existing_role_id = Role.select()[0].id
|
|
219 |
assert pub.role_class.count() == 1
|
|
220 |
assert pub.role_class.select()[0].name == 'Service étt civil'
|
|
221 |
assert pub.role_class.select()[0].slug == 'service-ett-civil'
|
|
222 |
assert pub.role_class.select()[0].details is None
|
|
223 |
assert pub.role_class.select()[0].emails is None
|
|
224 |
assert pub.role_class.select()[0].emails_to_members is False
|
|
225 |
existing_role_id = pub.role_class.select()[0].id
|
|
230 | 226 |
CmdHoboNotify.process_notification(notification) |
231 |
assert Role.count() == 2
|
|
232 |
old_role = Role.get(existing_role_id)
|
|
227 |
assert pub.role_class.count() == 2
|
|
228 |
old_role = pub.role_class.get(existing_role_id)
|
|
233 | 229 |
assert old_role.name == 'Service état civil' |
234 | 230 |
assert old_role.slug == 'service-ett-civil' |
235 |
assert old_role.uuid == 'xyz'
|
|
231 |
assert old_role.uuid == uuid2
|
|
236 | 232 |
assert old_role.details is None |
237 | 233 |
assert old_role.emails == ['etat-civil@example.com'] |
238 | 234 |
assert old_role.emails_to_members is True |
239 |
new_role = Role.get_on_index('12345', 'uuid')
|
|
235 |
new_role = pub.role_class.get_on_index(uuid1, 'uuid')
|
|
240 | 236 |
assert new_role.name == 'Service enfance' |
241 | 237 |
assert new_role.slug == 'service-enfance' |
242 |
assert new_role.uuid == '12345'
|
|
238 |
assert new_role.uuid == uuid1
|
|
243 | 239 |
assert new_role.details is None |
244 | 240 |
assert new_role.emails == ['petite-enfance@example.com'] |
245 | 241 |
assert new_role.emails_to_members is False |
... | ... | |
255 | 251 |
'name': u'Service enfance', |
256 | 252 |
'slug': u'service-enfance', |
257 | 253 |
'description': u'Rôle du service petite enfance', |
258 |
'uuid': u'12345',
|
|
254 |
'uuid': uuid1,
|
|
259 | 255 |
'emails': [u'petite-enfance@example.com'], |
260 | 256 |
'emails_to_members': True, |
261 | 257 |
}, |
... | ... | |
263 | 259 |
}, |
264 | 260 |
} |
265 | 261 |
CmdHoboNotify.process_notification(notification) |
266 |
assert Role.count() == 1
|
|
267 |
assert Role.select()[0].id == new_role.id
|
|
268 |
assert Role.select()[0].name == 'Service enfance'
|
|
269 |
assert Role.select()[0].uuid == '12345'
|
|
270 |
assert Role.select()[0].slug == 'service-enfance'
|
|
271 |
assert Role.select()[0].details is None
|
|
272 |
assert Role.select()[0].emails == ['petite-enfance@example.com']
|
|
273 |
assert Role.select()[0].emails_to_members is True
|
|
262 |
assert pub.role_class.count() == 1
|
|
263 |
assert pub.role_class.select()[0].id == new_role.id
|
|
264 |
assert pub.role_class.select()[0].name == 'Service enfance'
|
|
265 |
assert pub.role_class.select()[0].uuid == uuid1
|
|
266 |
assert pub.role_class.select()[0].slug == 'service-enfance'
|
|
267 |
assert pub.role_class.select()[0].details is None
|
|
268 |
assert pub.role_class.select()[0].emails == ['petite-enfance@example.com']
|
|
269 |
assert pub.role_class.select()[0].emails_to_members is True
|
|
274 | 270 | |
275 | 271 | |
276 | 272 |
def test_process_notification_role_deprovision(pub): |
277 |
User = pub.user_class |
|
278 | ||
273 |
uuid1 = str(uuid.uuid4()) |
|
279 | 274 |
notification = { |
280 | 275 |
'@type': u'deprovision', |
281 | 276 |
'audience': [u'test'], |
... | ... | |
285 | 280 |
'data': [ |
286 | 281 |
{ |
287 | 282 |
'@type': 'role', |
288 |
'uuid': u'xyz',
|
|
283 |
'uuid': uuid1,
|
|
289 | 284 |
}, |
290 | 285 |
], |
291 | 286 |
}, |
292 | 287 |
} |
293 |
role = Role.select()[0]
|
|
288 |
role = pub.role_class.select()[0]
|
|
294 | 289 |
role.remove_self() |
295 | 290 |
assert role.name == 'Service étt civil' |
296 | 291 |
assert role.slug == 'service-ett-civil' |
297 |
role.id = 'xyz'
|
|
292 |
role.id = uuid1
|
|
298 | 293 |
role.store() |
299 | 294 | |
300 |
role = Role('foo')
|
|
295 |
role = pub.role_class('foo')
|
|
301 | 296 |
role.slug = 'bar' |
302 | 297 |
role.store() |
303 | 298 | |
304 |
assert Role.count() == 2
|
|
299 |
assert pub.role_class.count() == 2
|
|
305 | 300 |
CmdHoboNotify.process_notification(notification) |
306 |
assert Role.count() == 1
|
|
307 |
assert Role.select()[0].slug == 'bar'
|
|
301 |
assert pub.role_class.count() == 1
|
|
302 |
assert pub.role_class.select()[0].slug == 'bar'
|
|
308 | 303 | |
309 |
r = Role(name='Service étt civil')
|
|
310 |
r.uuid = 'xyz'
|
|
304 |
r = pub.role_class(name='Service étt civil')
|
|
305 |
r.uuid = uuid1
|
|
311 | 306 |
r.store() |
312 |
assert Role.count() == 2
|
|
307 |
assert pub.role_class.count() == 2
|
|
313 | 308 |
CmdHoboNotify.process_notification(notification) |
314 |
assert Role.count() == 1
|
|
315 |
assert Role.select()[0].slug == 'bar'
|
|
309 |
assert pub.role_class.count() == 1
|
|
310 |
assert pub.role_class.select()[0].slug == 'bar'
|
|
316 | 311 | |
317 | 312 | |
318 | 313 |
PROFILE = { |
... | ... | |
451 | 446 |
# setup an hobo profile |
452 | 447 |
CmdCheckHobos().update_profile(PROFILE, pub) |
453 | 448 | |
449 |
uuid1 = str(uuid.uuid4()) |
|
450 |
uuid2 = str(uuid.uuid4()) |
|
454 | 451 |
notification = { |
455 | 452 |
'@type': u'provision', |
456 | 453 |
'audience': [u'test'], |
... | ... | |
462 | 459 |
'name': u'Service enfance', |
463 | 460 |
'slug': u'service-enfance', |
464 | 461 |
'description': u'Rôle du service petite enfance', |
465 |
'uuid': u'12345',
|
|
462 |
'uuid': uuid1,
|
|
466 | 463 |
'emails': [u'petite-enfance@example.com'], |
467 | 464 |
'emails_to_members': False, |
468 | 465 |
}, |
... | ... | |
470 | 467 |
'name': u'Service état civil', |
471 | 468 |
'slug': u'service-ett-civil', |
472 | 469 |
'description': u'Rôle du service état civil', |
473 |
'uuid': u'xyz',
|
|
470 |
'uuid': uuid2,
|
|
474 | 471 |
'emails': [u'etat-civil@example.com'], |
475 | 472 |
'emails_to_members': True, |
476 | 473 |
}, |
477 | 474 |
], |
478 | 475 |
}, |
479 | 476 |
} |
480 |
assert Role.count() == 1
|
|
481 |
assert Role.select()[0].name == 'Service étt civil'
|
|
482 |
assert Role.select()[0].slug == 'service-ett-civil'
|
|
483 |
assert Role.select()[0].details is None
|
|
484 |
assert Role.select()[0].emails is None
|
|
485 |
assert Role.select()[0].emails_to_members is False
|
|
486 |
existing_role_id = Role.select()[0].id
|
|
477 |
assert pub.role_class.count() == 1
|
|
478 |
assert pub.role_class.select()[0].name == 'Service étt civil'
|
|
479 |
assert pub.role_class.select()[0].slug == 'service-ett-civil'
|
|
480 |
assert pub.role_class.select()[0].details is None
|
|
481 |
assert pub.role_class.select()[0].emails is None
|
|
482 |
assert pub.role_class.select()[0].emails_to_members is False
|
|
483 |
existing_role_id = pub.role_class.select()[0].id
|
|
487 | 484 |
CmdHoboNotify.process_notification(notification) |
488 |
assert Role.count() == 2
|
|
489 |
old_role = Role.get(existing_role_id)
|
|
485 |
assert pub.role_class.count() == 2
|
|
486 |
old_role = pub.role_class.get(existing_role_id)
|
|
490 | 487 |
assert old_role.name == 'Service état civil' |
491 |
assert old_role.uuid == 'xyz'
|
|
488 |
assert old_role.uuid == uuid2
|
|
492 | 489 |
assert old_role.slug == 'service-ett-civil' |
493 | 490 |
assert old_role.details is None |
494 | 491 |
assert old_role.emails == ['etat-civil@example.com'] |
495 | 492 |
assert old_role.emails_to_members is True |
496 |
new_role = Role.get_on_index('12345', 'uuid')
|
|
493 |
new_role = pub.role_class.get_on_index(uuid1, 'uuid')
|
|
497 | 494 |
assert new_role.name == 'Service enfance' |
498 | 495 |
assert new_role.slug == 'service-enfance' |
499 |
assert new_role.uuid == '12345'
|
|
496 |
assert new_role.uuid == uuid1
|
|
500 | 497 |
assert new_role.details is None |
501 | 498 |
assert new_role.emails == ['petite-enfance@example.com'] |
502 | 499 |
assert new_role.emails_to_members is False |
... | ... | |
518 | 515 |
'is_active': True, |
519 | 516 |
u'roles': [ |
520 | 517 |
{ |
521 |
u'uuid': u'12345',
|
|
518 |
u'uuid': uuid1,
|
|
522 | 519 |
u'name': u'Service petite enfance', |
523 | 520 |
u'description': u'etc.', |
524 | 521 |
}, |
525 | 522 |
{ |
526 |
u'uuid': u'xyz',
|
|
523 |
u'uuid': uuid2,
|
|
527 | 524 |
u'name': u'Service état civil', |
528 | 525 |
u'description': u'etc.', |
529 | 526 |
}, |
... | ... | |
565 | 562 |
'is_active': False, |
566 | 563 |
u'roles': [ |
567 | 564 |
{ |
568 |
u'uuid': u'xyz',
|
|
565 |
u'uuid': uuid2,
|
|
569 | 566 |
u'name': u'Service état civil', |
570 | 567 |
u'description': u'etc.', |
571 | 568 |
}, |
572 | 569 |
{ |
573 |
u'uuid': u'unknown-uuid',
|
|
570 |
u'uuid': str(uuid.uuid4()),
|
|
574 | 571 |
u'name': u'Service enfance', |
575 | 572 |
u'description': u'', |
576 | 573 |
}, |
... | ... | |
612 | 609 |
u'is_superuser': True, |
613 | 610 |
u'roles': [ |
614 | 611 |
{ |
615 |
u'uuid': u'xyz',
|
|
612 |
u'uuid': uuid2,
|
|
616 | 613 |
u'name': u'Service état civil', |
617 | 614 |
u'description': u'etc.', |
618 | 615 |
}, |
... | ... | |
647 | 644 |
'is_superuser': False, |
648 | 645 |
'roles': [ |
649 | 646 |
{ |
650 |
'uuid': '12345',
|
|
647 |
'uuid': uuid1,
|
|
651 | 648 |
'name': 'Service petite enfance', |
652 | 649 |
'description': 'etc.', |
653 | 650 |
}, |
654 | 651 |
{ |
655 |
'uuid': 'xyz',
|
|
652 |
'uuid': uuid2,
|
|
656 | 653 |
'name': 'Service état civil', |
657 | 654 |
'description': 'etc.', |
658 | 655 |
}, |
... | ... | |
678 | 675 |
from wcs.ctl.check_hobos import CmdCheckHobos |
679 | 676 | |
680 | 677 |
User.wipe() |
681 |
Role.wipe()
|
|
678 |
pub.role_class.wipe()
|
|
682 | 679 |
CmdCheckHobos().update_profile(PROFILE, pub) |
683 | 680 | |
684 | 681 |
notification = { |
... | ... | |
737 | 734 |
def test_process_notification_role_with_errors(pub): |
738 | 735 |
User = pub.user_class |
739 | 736 |
User.wipe() |
740 |
Role.wipe()
|
|
737 |
pub.role_class.wipe()
|
|
741 | 738 |
notification = { |
742 | 739 |
'@type': u'provision', |
743 | 740 |
'audience': [u'test'], |
... | ... | |
759 | 756 |
with pytest.raises(KeyError) as e: |
760 | 757 |
CmdHoboNotify.process_notification(notification) |
761 | 758 |
assert e.value.args == ('role without uuid',) |
762 |
assert Role.count() == 0
|
|
759 |
assert pub.role_class.count() == 0
|
|
763 | 760 | |
764 | 761 |
notification['objects']['data'][0]['uuid'] = u'12345' |
765 | 762 |
del notification['objects']['data'][0]['name'] |
766 | 763 |
with pytest.raises(ValueError) as e: |
767 | 764 |
CmdHoboNotify.process_notification(notification) |
768 | 765 |
assert e.value.args == ('invalid role',) |
769 |
assert Role.count() == 0
|
|
766 |
assert pub.role_class.count() == 0
|
|
770 | 767 | |
771 | 768 | |
772 | 769 |
def test_process_user_deprovision(pub): |
... | ... | |
776 | 773 |
from wcs.ctl.check_hobos import CmdCheckHobos |
777 | 774 | |
778 | 775 |
User.wipe() |
779 |
Role.wipe()
|
|
776 |
pub.role_class.wipe()
|
|
780 | 777 |
CmdCheckHobos().update_profile(PROFILE, pub) |
781 | 778 | |
782 | 779 |
user = User() |
tests/test_role.py | ||
---|---|---|
4 | 4 | |
5 | 5 |
from utilities import create_temporary_pub, clean_temporary_pub |
6 | 6 | |
7 |
from wcs.qommon.storage import StorableObject |
|
8 |
from wcs.roles import Role, get_user_roles |
|
7 |
from wcs.roles import get_user_roles |
|
9 | 8 | |
10 | 9 |
from quixote import get_publisher |
11 | 10 | |
... | ... | |
21 | 20 | |
22 | 21 | |
23 | 22 |
def test_slug(): |
24 |
Role.wipe()
|
|
25 |
role = Role(name='Hello world')
|
|
23 |
get_publisher().role_class.wipe()
|
|
24 |
role = get_publisher().role_class(name='Hello world')
|
|
26 | 25 |
role.store() |
27 | 26 |
assert role.slug == 'hello-world' |
28 | 27 | |
29 | 28 | |
30 | 29 |
def test_duplicated_name(): |
31 |
Role.wipe()
|
|
32 |
role = Role(name='Hello world')
|
|
30 |
get_publisher().role_class.wipe()
|
|
31 |
role = get_publisher().role_class(name='Hello world')
|
|
33 | 32 |
role.store() |
34 | 33 |
assert role.slug == 'hello-world' |
35 |
role = Role(name='Hello world')
|
|
34 |
role = get_publisher().role_class(name='Hello world')
|
|
36 | 35 |
role.store() |
37 | 36 |
assert role.slug == 'hello-world-1' |
38 | 37 | |
39 | 38 | |
40 | 39 |
def test_migrate(): |
41 |
Role.wipe()
|
|
42 |
role = Role(name='Hello world')
|
|
40 |
get_publisher().role_class.wipe()
|
|
41 |
role = get_publisher().role_class(name='Hello world')
|
|
43 | 42 |
role.store() |
44 | 43 |
obj = pickle.load(open(role.get_object_filename(), 'rb')) |
45 | 44 |
del obj.slug |
46 | 45 |
pickle.dump(obj, open(role.get_object_filename(), 'wb')) |
47 | 46 |
assert pickle.load(open(role.get_object_filename(), 'rb')).slug is None |
48 |
assert Role.get(role.id).slug == 'hello-world'
|
|
47 |
assert get_publisher().role_class.get(role.id).slug == 'hello-world'
|
|
49 | 48 | |
50 | 49 | |
51 | 50 |
def test_get_user_roles(): |
52 |
Role.wipe()
|
|
53 |
Role(name='f1').store()
|
|
54 |
Role(name='é1').store()
|
|
55 |
Role(name='a1').store()
|
|
51 |
get_publisher().role_class.wipe()
|
|
52 |
get_publisher().role_class(name='f1').store()
|
|
53 |
get_publisher().role_class(name='é1').store()
|
|
54 |
get_publisher().role_class(name='a1').store()
|
|
56 | 55 |
assert [x[1] for x in get_user_roles()] == ['a1', 'é1', 'f1'] |
57 | 56 | |
58 | 57 | |
59 | 58 |
def test_get_emails(): |
60 | 59 |
User = get_publisher().user_class |
61 | 60 |
User.wipe() |
62 |
Role.wipe()
|
|
61 |
get_publisher().role_class.wipe()
|
|
63 | 62 | |
64 |
role = Role(name='role')
|
|
63 |
role = get_publisher().role_class(name='role')
|
|
65 | 64 |
role.emails_to_members = True |
66 | 65 |
role.store() |
67 | 66 |
tests/test_saml_auth.py | ||
---|---|---|
1 | 1 |
import datetime |
2 | 2 |
import os |
3 |
import sys |
|
4 | 3 |
import shutil |
4 |
import uuid |
|
5 | 5 |
import urllib.parse |
6 | 6 | |
7 | 7 |
try: |
... | ... | |
19 | 19 |
from wcs.qommon.ident.idp import MethodAdminDirectory, AdminIDPDir |
20 | 20 |
from wcs.qommon import sessions, x509utils |
21 | 21 |
from wcs.qommon.errors import RequestError |
22 |
from wcs.roles import Role |
|
23 | 22 | |
24 | 23 |
from utilities import get_app, create_temporary_pub, clean_temporary_pub |
25 | 24 | |
... | ... | |
39 | 38 |
</ns0:IDPSSODescriptor> |
40 | 39 |
</ns0:EntityDescriptor>""" |
41 | 40 | |
41 |
role_uuid1 = str(uuid.uuid4()) |
|
42 |
role_uuid2 = str(uuid.uuid4()) |
|
43 | ||
42 | 44 | |
43 | 45 |
def pytest_generate_tests(metafunc): |
44 | 46 |
if 'pub' in metafunc.fixturenames: |
... | ... | |
169 | 171 |
role_slug_attribute.name = 'role-slug' |
170 | 172 |
role_slug_attribute.nameFormat = lasso.SAML2_ATTRIBUTE_NAME_FORMAT_BASIC |
171 | 173 |
role_uuids = [] |
172 |
for role_uuid in ('foo', 'bar'):
|
|
174 |
for role_uuid in (role_uuid1, role_uuid2):
|
|
173 | 175 |
text_node = lasso.MiscTextNode.newWithString(role_uuid) |
174 | 176 |
text_node.textChild = True |
175 | 177 |
atv = lasso.Saml2AttributeValue() |
... | ... | |
263 | 265 |
pub.write_cfg() |
264 | 266 |
pub.set_config() |
265 | 267 | |
266 |
Role.wipe()
|
|
267 |
role = Role('Foo')
|
|
268 |
role.uuid = 'foo'
|
|
268 |
pub.role_class.wipe()
|
|
269 |
role = pub.role_class('Foo')
|
|
270 |
role.uuid = role_uuid1
|
|
269 | 271 |
role.store() |
270 | 272 | |
271 | 273 |
# 1st pass to generate a user |
... | ... | |
280 | 282 |
assert user.verified_fields |
281 | 283 |
assert len(user.verified_fields) == 3 |
282 | 284 |
assert user.form_data['_birthdate'].tm_year == 2000 |
283 |
assert user.roles == [role.id] # bar uuid is ignored as unknown
|
|
285 |
assert user.roles == [role.id] # other uuid is ignored as unknown
|
|
284 | 286 | |
285 | 287 |
assert ('enrolling user %s in Foo' % user.id) in [x.message for x in caplog.records] |
286 |
assert 'role uuid bar is unknown' in [x.message for x in caplog.records]
|
|
288 |
assert 'role uuid %s is unknown' % role_uuid2 in [x.message for x in caplog.records]
|
|
287 | 289 | |
288 | 290 |
req = HTTPRequest( |
289 | 291 |
None, |
tests/test_snapshots.py | ||
---|---|---|
149 | 149 | |
150 | 150 |
def test_form_snapshot_comments(pub): |
151 | 151 |
create_superuser(pub) |
152 |
create_role() |
|
152 |
create_role(pub)
|
|
153 | 153 |
app = login(get_app(pub)) |
154 | 154 |
resp = app.get('/backoffice/forms/') |
155 | 155 | |
... | ... | |
178 | 178 | |
179 | 179 |
def test_form_snapshot_history(pub, formdef_with_history): |
180 | 180 |
create_superuser(pub) |
181 |
create_role() |
|
181 |
create_role(pub)
|
|
182 | 182 |
app = login(get_app(pub)) |
183 | 183 |
resp = app.get('/backoffice/forms/%s/' % formdef_with_history.id) |
184 | 184 |
resp = resp.click('History') |
... | ... | |
194 | 194 | |
195 | 195 |
def test_form_snapshot_export(pub, formdef_with_history): |
196 | 196 |
create_superuser(pub) |
197 |
create_role() |
|
197 |
create_role(pub)
|
|
198 | 198 |
app = login(get_app(pub)) |
199 | 199 |
resp = app.get('/backoffice/forms/%s/history/' % formdef_with_history.id) |
200 | 200 | |
... | ... | |
206 | 206 | |
207 | 207 |
def test_form_snapshot_restore(pub, formdef_with_history): |
208 | 208 |
create_superuser(pub) |
209 |
create_role() |
|
209 |
create_role(pub)
|
|
210 | 210 |
app = login(get_app(pub)) |
211 | 211 | |
212 | 212 |
# restore as new |
... | ... | |
233 | 233 | |
234 | 234 |
def test_block_snapshot_browse(pub, blocks_feature): |
235 | 235 |
create_superuser(pub) |
236 |
create_role() |
|
236 |
create_role(pub)
|
|
237 | 237 | |
238 | 238 |
BlockDef.wipe() |
239 | 239 |
blockdef = BlockDef() |
... | ... | |
256 | 256 | |
257 | 257 |
def test_card_snapshot_browse(pub): |
258 | 258 |
create_superuser(pub) |
259 |
create_role() |
|
259 |
create_role(pub)
|
|
260 | 260 | |
261 | 261 |
CardDef.wipe() |
262 | 262 |
carddef = CardDef() |
... | ... | |
298 | 298 | |
299 | 299 |
def test_datasource_snapshot_browse(pub): |
300 | 300 |
create_superuser(pub) |
301 |
create_role() |
|
301 |
create_role(pub)
|
|
302 | 302 | |
303 | 303 |
NamedDataSource.wipe() |
304 | 304 |
datasource = NamedDataSource(name='test') |
... | ... | |
322 | 322 | |
323 | 323 |
def test_form_snapshot_browse(pub, formdef_with_history): |
324 | 324 |
create_superuser(pub) |
325 |
create_role() |
|
325 |
create_role(pub)
|
|
326 | 326 |
app = login(get_app(pub)) |
327 | 327 | |
328 | 328 |
pub.custom_view_class.wipe() |
... | ... | |
378 | 378 | |
379 | 379 |
def test_workflow_snapshot_browse(pub): |
380 | 380 |
create_superuser(pub) |
381 |
create_role() |
|
381 |
create_role(pub)
|
|
382 | 382 | |
383 | 383 |
Workflow.wipe() |
384 | 384 |
workflow = Workflow(name='test') |
... | ... | |
399 | 399 | |
400 | 400 |
def test_workflow_with_model_snapshot_browse(pub): |
401 | 401 |
create_superuser(pub) |
402 |
create_role() |
|
402 |
create_role(pub)
|
|
403 | 403 | |
404 | 404 |
Workflow.wipe() |
405 | 405 |
if os.path.exists(os.path.join(pub.app_dir, 'models')): |
... | ... | |
442 | 442 | |
443 | 443 |
def test_workflow_with_form_snapshot_browse(pub): |
444 | 444 |
create_superuser(pub) |
445 |
create_role() |
|
445 |
create_role(pub)
|
|
446 | 446 | |
447 | 447 |
Workflow.wipe() |
448 | 448 |
wf = Workflow(name='status') |
... | ... | |
466 | 466 | |
467 | 467 |
def test_wscall_snapshot_browse(pub): |
468 | 468 |
create_superuser(pub) |
469 |
create_role() |
|
469 |
create_role(pub)
|
|
470 | 470 | |
471 | 471 |
NamedWsCall.wipe() |
472 | 472 |
wscall = NamedWsCall(name='test') |
... | ... | |
489 | 489 | |
490 | 490 |
def test_form_snapshot_save(pub, formdef_with_history): |
491 | 491 |
create_superuser(pub) |
492 |
create_role() |
|
492 |
create_role(pub)
|
|
493 | 493 |
app = login(get_app(pub)) |
494 | 494 | |
495 | 495 |
resp = app.get('/backoffice/forms/%s/' % formdef_with_history.id) |
... | ... | |
522 | 522 | |
523 | 523 |
def test_snaphost_workflow_status_item_comments(pub): |
524 | 524 |
create_superuser(pub) |
525 |
create_role() |
|
525 |
create_role(pub)
|
|
526 | 526 | |
527 | 527 |
Workflow.wipe() |
528 | 528 |
workflow = Workflow(name='test') |
tests/test_sql.py | ||
---|---|---|
18 | 18 |
from wcs import publisher, fields |
19 | 19 |
from wcs.formdef import FormDef |
20 | 20 |
from wcs.formdata import Evolution |
21 |
from wcs.roles import Role |
|
22 | 21 |
from wcs.workflows import Workflow, CommentableWorkflowStatusItem, WorkflowCriticalityLevel |
23 | 22 |
from wcs.wf.jump import JumpWorkflowStatusItem |
24 | 23 |
from wcs.wf.register_comment import RegisterCommenterWorkflowStatusItem |
... | ... | |
1911 | 1910 |
# create roles |
1912 | 1911 |
roles = [] |
1913 | 1912 |
for i in range(nb_roles): |
1914 |
role = Role(name='role%s' % i)
|
|
1913 |
role = pub.role_class(name='role%s' % i)
|
|
1915 | 1914 |
role.store() |
1916 | 1915 |
roles.append(role) |
1917 | 1916 |
tests/test_workflow_import.py | ||
---|---|---|
33 | 33 |
from wcs.wf.jump import JumpWorkflowStatusItem |
34 | 34 |
from wcs.fields import StringField, FileField |
35 | 35 |
from wcs.qommon.form import UploadedFile |
36 |
from wcs.roles import Role |
|
37 | 36 |
from wcs.workflows import ExportToModel, WorkflowVariablesFieldsFormDef, DisplayMessageWorkflowStatusItem |
38 | 37 | |
39 | 38 | |
... | ... | |
117 | 116 |
wf = Workflow(name='status') |
118 | 117 |
st1 = wf.add_status('Status1', 'st1') |
119 | 118 | |
120 |
role = Role()
|
|
119 |
role = pub.role_class()
|
|
121 | 120 |
role.id = '5' |
122 | 121 |
role.name = 'Test Role' |
123 | 122 |
role.store() |
... | ... | |
174 | 173 | |
175 | 174 | |
176 | 175 |
def test_status_actions_named_existing_role(pub): |
177 |
role = Role()
|
|
176 |
role = pub.role_class()
|
|
178 | 177 |
role.id = '2' |
179 | 178 |
role.name = 'Test Role named existing role' |
180 | 179 |
role.store() |
... | ... | |
206 | 205 | |
207 | 206 | |
208 | 207 |
def test_status_actions_named_missing_role(pub): |
209 |
role = Role()
|
|
208 |
role = pub.role_class()
|
|
210 | 209 |
role.id = '3' |
211 | 210 |
role.name = 'Test Role A' |
212 | 211 |
role.store() |
213 | 212 | |
214 |
role = Role()
|
|
213 |
role = pub.role_class()
|
|
215 | 214 |
role.id = '4' |
216 | 215 |
role.name = 'Test Role B' |
217 | 216 |
role.store() |
... | ... | |
241 | 240 |
xml_export = xml_export_orig.replace( |
242 | 241 |
b'<item role_id="3">Test Role A</item>', b'<item role_id="999">foobar</item>' |
243 | 242 |
) |
244 |
nb_roles = Role.count()
|
|
243 |
nb_roles = pub.role_class.count()
|
|
245 | 244 |
wf3 = Workflow.import_from_xml_tree(ET.parse(io.BytesIO(xml_export))) |
246 |
assert Role.count() == nb_roles + 1
|
|
245 |
assert pub.role_class.count() == nb_roles + 1
|
|
247 | 246 | |
248 | 247 |
# check that it doesn't fallback on the id if there's no match on the |
249 | 248 |
# name |
250 |
nb_roles = Role.count()
|
|
249 |
nb_roles = pub.role_class.count()
|
|
251 | 250 |
xml_export = xml_export_orig.replace( |
252 | 251 |
b'<item role_id="3">Test Role A</item>', b'<item role_id="3">Test Role C</item>' |
253 | 252 |
) |
254 | 253 |
wf3 = Workflow.import_from_xml_tree(ET.parse(io.BytesIO(xml_export))) |
255 | 254 |
assert wf3.possible_status[0].items[0].by != ['3'] |
256 |
assert Role.count() == nb_roles + 1
|
|
255 |
assert pub.role_class.count() == nb_roles + 1
|
|
257 | 256 | |
258 | 257 |
# on the other hand, check that it uses the id when included_id is True |
259 | 258 |
wf3 = Workflow.import_from_xml_tree(ET.parse(io.BytesIO(xml_export)), include_id=True) |
... | ... | |
455 | 454 | |
456 | 455 | |
457 | 456 |
def test_global_actions(pub): |
458 |
role = Role()
|
|
457 |
role = pub.role_class()
|
|
459 | 458 |
role.id = '5' |
460 | 459 |
role.name = 'Test Role' |
461 | 460 |
role.store() |
... | ... | |
481 | 480 | |
482 | 481 | |
483 | 482 |
def test_register_comment_to(pub): |
484 |
role = Role()
|
|
483 |
role = pub.role_class()
|
|
485 | 484 |
role.id = '5' |
486 | 485 |
role.name = 'Test Role' |
487 | 486 |
role.store() |
... | ... | |
526 | 525 |
wf = Workflow(name='status') |
527 | 526 |
st1 = wf.add_status('Status1', 'st1') |
528 | 527 | |
529 |
Role.wipe()
|
|
528 |
pub.role_class.wipe()
|
|
530 | 529 | |
531 |
role1 = Role()
|
|
530 |
role1 = pub.role_class()
|
|
532 | 531 |
role1.name = 'Test Role 1' |
533 | 532 |
role1.store() |
534 | 533 | |
535 |
role2 = Role()
|
|
534 |
role2 = pub.role_class()
|
|
536 | 535 |
role2.name = 'Test Role 2' |
537 | 536 |
role2.store() |
538 | 537 | |
... | ... | |
550 | 549 |
assert wf2.possible_status[0].items[0].rules == dispatch.rules |
551 | 550 |
assert wf2.possible_status[0].items[0].dispatch_type == 'automatic' |
552 | 551 | |
553 |
Role.wipe()
|
|
552 |
pub.role_class.wipe()
|
|
554 | 553 | |
555 |
role3 = Role()
|
|
554 |
role3 = pub.role_class()
|
|
556 | 555 |
role3.name = 'Test Role 1' |
557 | 556 |
role3.store() |
558 | 557 | |
559 |
role4 = Role()
|
|
558 |
role4 = pub.role_class()
|
|
560 | 559 |
role4.name = 'Test Role 2' |
561 | 560 |
role4.store() |
562 | 561 | |
... | ... | |
605 | 604 |
st1.items.append(sendmail) |
606 | 605 |
sendmail.parent = st1 |
607 | 606 | |
608 |
Role.wipe()
|
|
607 |
pub.role_class.wipe()
|
|
609 | 608 |
wf2 = assert_import_export_works(wf) |
610 |
assert Role.count() == 0
|
|
609 |
assert pub.role_class.count() == 0
|
|
611 | 610 | |
612 | 611 |
sendmail.to = [ |
613 | 612 |
'_submitter', |
... | ... | |
617 | 616 |
'foobar@localhost', |
618 | 617 |
] |
619 | 618 |
wf2 = assert_import_export_works(wf) |
620 |
assert Role.count() == 0
|
|
619 |
assert pub.role_class.count() == 0
|
|
621 | 620 |
assert wf2.possible_status[0].items[0].to == sendmail.to |
622 | 621 | |
623 | 622 | |
... | ... | |
648 | 647 |
st1.items.append(sendsms) |
649 | 648 |
sendsms.parent = st1 |
650 | 649 | |
651 |
Role.wipe()
|
|
650 |
pub.role_class.wipe()
|
|
652 | 651 |
wf2 = assert_import_export_works(wf) |
653 |
assert Role.count() == 0
|
|
652 |
assert pub.role_class.count() == 0
|
|
654 | 653 |
assert wf2.possible_status[0].items[0].to == sendsms.to |
655 | 654 | |
656 | 655 |
tests/test_workflows.py | ||
---|---|---|
47 | 47 |
BlockField, |
48 | 48 |
) |
49 | 49 |
from wcs.formdata import Evolution |
50 |
from wcs.roles import Role |
|
51 | 50 |
from wcs.workflows import ( |
52 | 51 |
Workflow, |
53 | 52 |
WorkflowStatusItem, |
... | ... | |
426 | 425 |
user = pub.user_class(name='foo') |
427 | 426 |
user.store() |
428 | 427 | |
429 |
role = Role(name='bar1')
|
|
428 |
role = pub.role_class(name='bar1')
|
|
430 | 429 |
role.store() |
431 | 430 | |
432 | 431 |
formdef = FormDef() |
... | ... | |
475 | 474 |
formdef.name = 'baz' |
476 | 475 |
formdef.store() |
477 | 476 | |
478 |
Role.wipe()
|
|
479 |
role = Role(name='xxx')
|
|
477 |
pub.role_class.wipe()
|
|
478 |
role = pub.role_class(name='xxx')
|
|
480 | 479 |
role.store() |
481 | 480 | |
482 | 481 |
item = DispatchWorkflowStatusItem() |
... | ... | |
516 | 515 |
item.perform(formdata) |
517 | 516 |
assert not formdata.workflow_roles |
518 | 517 | |
519 |
Role.wipe()
|
|
520 |
role1 = Role('xxx1')
|
|
518 |
two_pubs.role_class.wipe()
|
|
519 |
role1 = two_pubs.role_class('xxx1')
|
|
521 | 520 |
role1.store() |
522 |
role2 = Role('xxx2')
|
|
521 |
role2 = two_pubs.role_class('xxx2')
|
|
523 | 522 |
role2.store() |
524 | 523 | |
525 | 524 |
for variable in ('form_var_foo', '{{form_var_foo}}'): |
... | ... | |
588 | 587 |
formdef.name = 'baz' |
589 | 588 |
formdef.store() |
590 | 589 | |
591 |
Role.wipe()
|
|
592 |
role = Role(name='xxx')
|
|
590 |
two_pubs.role_class.wipe()
|
|
591 |
role = two_pubs.role_class(name='xxx')
|
|
593 | 592 |
role.slug = 'yyy' |
594 | 593 |
role.store() |
595 | 594 | |
... | ... | |
679 | 678 |
formdata = formdef.data_class()() |
680 | 679 |
formdata.user_id = user.id |
681 | 680 | |
682 |
role = Role(name='plop')
|
|
681 |
role = pub.role_class(name='plop')
|
|
683 | 682 |
role.store() |
684 |
role2 = Role(name='xxx')
|
|
683 |
role2 = pub.role_class(name='xxx')
|
|
685 | 684 |
role2.store() |
686 | 685 | |
687 | 686 |
item = AddRoleWorkflowStatusItem() |
... | ... | |
723 | 722 |
user.name_identifiers = ['xxx'] |
724 | 723 |
user.store() |
725 | 724 | |
726 |
role = Role(name='bar1')
|
|
725 |
role = pub.role_class(name='bar1')
|
|
727 | 726 |
role.store() |
728 | 727 | |
729 | 728 |
formdef = FormDef() |
... | ... | |
1156 | 1155 |
workflow = Workflow(name='register comment to') |
1157 | 1156 |
st1 = workflow.add_status('Status1', 'st1') |
1158 | 1157 | |
1159 |
role = Role(name='foorole')
|
|
1158 |
role = pub.role_class(name='foorole')
|
|
1160 | 1159 |
role.store() |
1161 |
role2 = Role(name='no-one-role')
|
|
1160 |
role2 = pub.role_class(name='no-one-role')
|
|
1162 | 1161 |
role2.store() |
1163 | 1162 |
user = pub.user_class(name='baruser') |
1164 | 1163 |
user.roles = [] |
... | ... | |
1262 | 1261 |
user.email = 'zorg@localhost' |
1263 | 1262 |
user.store() |
1264 | 1263 | |
1265 |
Role.wipe()
|
|
1266 |
role1 = Role(name='foo')
|
|
1264 |
pub.role_class.wipe()
|
|
1265 |
role1 = pub.role_class(name='foo')
|
|
1267 | 1266 |
role1.emails = ['foo@localhost'] |
1268 | 1267 |
role1.store() |
1269 | 1268 | |
1270 |
role2 = Role(name='bar')
|
|
1269 |
role2 = pub.role_class(name='bar')
|
|
1271 | 1270 |
role2.emails = ['bar@localhost', 'baz@localhost'] |
1272 | 1271 |
role2.store() |
1273 | 1272 | |
... | ... | |
2790 | 2789 | |
2791 | 2790 | |
2792 | 2791 |
def test_display_form_and_comment(pub): |
2793 |
role = Role(name='bar1')
|
|
2792 |
role = pub.role_class(name='bar1')
|
|
2794 | 2793 |
role.store() |
2795 | 2794 | |
2796 | 2795 |
user = pub.user_class() |
... | ... | |
2854 | 2853 | |
2855 | 2854 | |
2856 | 2855 |
def test_choice_button_no_label(pub): |
2857 |
role = Role(name='bar1')
|
|
2856 |
role = pub.role_class(name='bar1')
|
|
2858 | 2857 |
role.store() |
2859 | 2858 | |
2860 | 2859 |
user = pub.user_class() |
... | ... | |
2957 | 2956 |
workflow = Workflow(name='display message to') |
2958 | 2957 |
st1 = workflow.add_status('Status1', 'st1') |
2959 | 2958 | |
2960 |
role = Role(name='foorole')
|
|
2959 |
role = pub.role_class(name='foorole')
|
|
2961 | 2960 |
role.store() |
2962 |
role2 = Role(name='no-one-role')
|
|
2961 |
role2 = pub.role_class(name='no-one-role')
|
|
2963 | 2962 |
role2.store() |
2964 | 2963 |
user = pub.user_class(name='baruser') |
2965 | 2964 |
user.roles = [] |
... | ... | |
3049 | 3048 |
display_message.position = 'actions' |
3050 | 3049 |
assert display_message.get_line_details() == 'with actions' |
3051 | 3050 | |
3052 |
role = Role(name='foorole')
|
|
3051 |
role = pub.role_class(name='foorole')
|
|
3053 | 3052 |
role.store() |
3054 | 3053 |
display_message.to = [role.id] |
3055 | 3054 |
assert display_message.get_line_details() == 'with actions, for foorole' |
... | ... | |
3062 | 3061 |
user.email = 'zorg@localhost' |
3063 | 3062 |
user.store() |
3064 | 3063 | |
3065 |
Role.wipe()
|
|
3066 |
role1 = Role(name='foo')
|
|
3064 |
pub.role_class.wipe()
|
|
3065 |
role1 = pub.role_class(name='foo')
|
|
3067 | 3066 |
role1.emails = ['foo@localhost'] |
3068 | 3067 |
role1.details = 'Hello World' |
3069 | 3068 |
role1.store() |
3070 | 3069 | |
3071 |
role2 = Role(name='bar')
|
|
3070 |
role2 = pub.role_class(name='bar')
|
|
3072 | 3071 |
role2.emails = ['bar@localhost', 'baz@localhost'] |
3073 | 3072 |
role2.store() |
3074 | 3073 | |
... | ... | |
5302 | 5301 |
st1 = workflow.add_status('Status1', 'st1') |
5303 | 5302 |
workflow.store() |
5304 | 5303 | |
5305 |
role = Role(name='bar1')
|
|
5304 |
role = two_pubs.role_class(name='bar1')
|
|
5306 | 5305 |
role.store() |
5307 | 5306 | |
5308 | 5307 |
user = two_pubs.user_class() |
... | ... | |
5424 | 5423 |
# roles (not exposed in current UI) |
5425 | 5424 |
http_requests.empty() |
5426 | 5425 | |
5427 |
role = Role(name='blah')
|
|
5426 |
role = pub.role_class(name='blah')
|
|
5428 | 5427 |
role.store() |
5429 | 5428 | |
5430 | 5429 |
user1 = pub.user_class() |
... | ... | |
5484 | 5483 | |
5485 | 5484 |
def test_aggregation_email(pub, emails): |
5486 | 5485 |
Workflow.wipe() |
5487 |
Role.wipe()
|
|
5486 |
pub.role_class.wipe()
|
|
5488 | 5487 |
AggregationEmail.wipe() |
5489 | 5488 | |
5490 |
role = Role(name='foobar')
|
|
5489 |
role = pub.role_class(name='foobar')
|
|
5491 | 5490 |
role.emails = ['foobar@localhost'] |
5492 | 5491 |
role.emails_to_members = False |
5493 | 5492 |
role.store() |
tests/utilities.py | ||
---|---|---|
20 | 20 |
import wcs |
21 | 21 |
import wcs.wsgi |
22 | 22 |
from wcs import compat |
23 |
from wcs.roles import Role |
|
23 | 24 |
from wcs.users import User |
24 | 25 |
from wcs.tracking_code import TrackingCode |
25 | 26 |
import wcs.qommon.emails |
... | ... | |
77 | 78 | |
78 | 79 |
if sql_mode: |
79 | 80 |
pub.user_class = sql.SqlUser |
81 |
pub.role_class = sql.Role |
|
80 | 82 |
pub.tracking_code_class = sql.TrackingCode |
81 | 83 |
pub.session_class = sql.Session |
82 | 84 |
pub.custom_view_class = sql.CustomView |
... | ... | |
85 | 87 |
pub.is_using_postgresql = lambda: True |
86 | 88 |
else: |
87 | 89 |
pub.user_class = User |
90 |
pub.role_class = Role |
|
88 | 91 |
pub.tracking_code_class = TrackingCode |
89 | 92 |
pub.session_class = sessions.BasicSession |
90 | 93 |
pub.custom_view_class = custom_views.CustomView |
... | ... | |
167 | 170 |
pub.write_cfg() |
168 | 171 | |
169 | 172 |
sql.do_user_table() |
173 |
sql.do_role_table() |
|
170 | 174 |
sql.do_tracking_code_table() |
171 | 175 |
sql.do_session_table() |
172 | 176 |
sql.do_custom_views_table() |
wcs/admin/forms.py | ||
---|---|---|
41 | 41 |
from wcs.formdef import FormDef, FormdefImportError, FormdefImportRecoverableError, DRAFTS_DEFAULT_LIFESPAN |
42 | 42 |
from wcs.carddef import CardDef |
43 | 43 |
from wcs.categories import Category |
44 |
from wcs.roles import Role, logged_users_role, get_user_roles
|
|
44 |
from wcs.roles import logged_users_role, get_user_roles |
|
45 | 45 |
from wcs.workflows import Workflow |
46 | 46 |
from wcs.forms.root import qrcode |
47 | 47 | |
... | ... | |
694 | 694 |
role_id = self.formdef.workflow_roles.get(wf_role_id) |
695 | 695 |
if role_id: |
696 | 696 |
try: |
697 |
role = Role.get(role_id)
|
|
697 |
role = get_publisher().role_class.get(role_id)
|
|
698 | 698 |
role_label = role.name |
699 | 699 |
except KeyError: |
700 | 700 |
# removed role ? |
... | ... | |
820 | 820 |
roles.append(logged_users_role().name) |
821 | 821 |
else: |
822 | 822 |
try: |
823 |
roles.append(Role.get(x).name)
|
|
823 |
roles.append(get_publisher().role_class.get(x).name)
|
|
824 | 824 |
except KeyError: |
825 | 825 |
# removed role ? |
826 | 826 |
roles.append(_('Unknown role (%s)') % x) |
... | ... | |
1784 | 1784 | |
1785 | 1785 |
def form_actions(self): |
1786 | 1786 |
r = TemplateIO(html=True) |
1787 |
has_roles = bool(Role.count())
|
|
1787 |
has_roles = bool(get_publisher().role_class.count())
|
|
1788 | 1788 |
r += htmltext('<div id="appbar">') |
1789 | 1789 |
r += htmltext('<h2>%s</h2>') % _('Forms') |
1790 | 1790 |
if has_roles: |
... | ... | |
1825 | 1825 | |
1826 | 1826 |
def new(self): |
1827 | 1827 |
get_response().breadcrumb.append(('new', _('New'))) |
1828 |
if Role.count() == 0:
|
|
1828 |
if get_publisher().role_class.count() == 0:
|
|
1829 | 1829 |
return template.error_page('forms', _('You first have to define roles.')) |
1830 | 1830 |
formdefui = self.formdef_ui_class(None) |
1831 | 1831 |
form = formdefui.new_form_ui() |
wcs/admin/roles.py | ||
---|---|---|
24 | 24 | |
25 | 25 |
from wcs.qommon.backoffice.menu import html_top |
26 | 26 | |
27 |
from wcs.roles import Role, get_user_roles
|
|
27 |
from wcs.roles import get_user_roles |
|
28 | 28 |
from wcs.formdef import FormDef |
29 | 29 | |
30 | 30 | |
... | ... | |
32 | 32 |
def __init__(self, role): |
33 | 33 |
self.role = role |
34 | 34 |
if self.role is None: |
35 |
self.role = Role()
|
|
35 |
self.role = get_publisher().role_class()
|
|
36 | 36 | |
37 | 37 |
def get_form(self): |
38 | 38 |
form = Form(enctype="multipart/form-data") |
... | ... | |
75 | 75 |
if self.role: |
76 | 76 |
role = self.role |
77 | 77 |
else: |
78 |
role = Role(name=form.get_widget('name').parse())
|
|
78 |
role = get_publisher().role_class(name=form.get_widget('name').parse())
|
|
79 | 79 | |
80 | 80 |
name = form.get_widget('name').parse() |
81 |
role_names = [x.name for x in Role.select() if x.id != role.id]
|
|
81 |
role_names = [x.name for x in get_publisher().role_class.select() if x.id != role.id]
|
|
82 | 82 |
if name in role_names: |
83 | 83 |
form.get_widget('name').set_error(_('This name is already used')) |
84 | 84 |
raise ValueError() |
... | ... | |
94 | 94 | |
95 | 95 |
def __init__(self, component): |
96 | 96 |
try: |
97 |
self.role = Role.get(component)
|
|
97 |
self.role = get_publisher().role_class.get(component)
|
|
98 | 98 |
except KeyError: |
99 | 99 |
raise errors.TraversalError() |
100 | 100 |
self.role_ui = RoleUI(self.role) |
wcs/admin/settings.py | ||
---|---|---|
58 | 58 |
from wcs.data_sources import NamedDataSource |
59 | 59 |
from wcs.formdef import FormDef |
60 | 60 |
from wcs.workflows import Workflow, WorkflowImportError |
61 |
from wcs.roles import Role |
|
62 | 61 | |
63 | 62 |
from wcs.backoffice.studio import StudioDirectory |
64 | 63 |
from .fields import FieldDefPage, FieldsDirectory |
... | ... | |
545 | 544 |
) |
546 | 545 | |
547 | 546 |
if enabled('permissions'): |
548 |
roles = list(Role.select())
|
|
547 |
roles = list(get_publisher().role_class.select())
|
|
549 | 548 |
if roles: |
550 | 549 |
r += htmltext('<dt><a href="admin-permissions">%s</a></dt> <dd>%s</dd>') % ( |
551 | 550 |
_('Admin Permissions'), |
... | ... | |
689 | 688 | |
690 | 689 |
rows = [] |
691 | 690 |
value = [] |
692 |
roles = [x for x in Role.select(order_by='name') if not x.is_internal()]
|
|
691 |
roles = [x for x in get_publisher().role_class.select(order_by='name') if not x.is_internal()]
|
|
693 | 692 |
for role in roles: |
694 | 693 |
rows.append(role.name) |
695 | 694 |
value.append([role.allows_backoffice_access]) |
... | ... | |
998 | 997 |
z = zipfile.ZipFile(c, 'w') |
999 | 998 |
for d in self.dirs: |
1000 | 999 |
if d not in ( |
1001 |
'roles', |
|
1002 | 1000 |
'categories', |
1003 | 1001 |
'carddef_categories', |
1004 | 1002 |
'wscalls', |
... | ... | |
1055 | 1053 |
os.path.join('blockdefs_xml', str(blockdef.id)), |
1056 | 1054 |
b'<?xml version="1.0"?>\n' + ET.tostring(node), |
1057 | 1055 |
) |
1056 |
if 'roles' in self.dirs: |
|
1057 |
for role in get_publisher().role_class.select(): |
|
1058 |
node = role.export_to_xml(include_id=True) |
|
1059 |
misc.indent_xml(node) |
|
1060 |
z.writestr( |
|
1061 |
os.path.join('roles_xml', str(role.id)), |
|
1062 |
b'<?xml version="1.0"?>\n' + ET.tostring(node), |
|
1063 |
) |
|
1058 | 1064 | |
1059 | 1065 |
if self.settings: |
1060 | 1066 |
z.write(os.path.join(self.app_dir, 'config.pck'), 'config.pck') |
wcs/admin/users.py | ||
---|---|---|
23 | 23 |
from wcs.qommon import errors |
24 | 24 |
from wcs.qommon import misc, get_cfg |
25 | 25 |
from wcs.qommon.backoffice.listing import pagination_links |
26 |
from wcs.roles import Role |
|
27 | 26 | |
28 | 27 |
from wcs.qommon import ident |
29 | 28 |
from wcs.qommon.ident.idp import is_idp_managing_user_attributes, is_idp_managing_user_roles |
... | ... | |
56 | 55 |
formdef.add_fields_to_form(form, form_data=self.user.form_data) |
57 | 56 |
form.add(CheckboxWidget, 'is_admin', title=_('Administrator Account'), value=self.user.is_admin) |
58 | 57 | |
59 |
roles = list(Role.select(order_by='name'))
|
|
58 |
roles = list(get_publisher().role_class.select(order_by='name'))
|
|
60 | 59 |
if len(roles) and not is_idp_managing_user_roles(): |
61 | 60 |
form.add( |
62 | 61 |
WidgetList, |
... | ... | |
187 | 186 |
r += htmltext('<li><strong>%s</strong></li>') % _('Site Administrator') |
188 | 187 |
for k in self.user.roles or []: |
189 | 188 |
try: |
190 |
r += htmltext('<li>%s</li>') % Role.get(k).name
|
|
189 |
r += htmltext('<li>%s</li>') % get_publisher().role_class.get(k).name
|
|
191 | 190 |
except KeyError: |
192 | 191 |
# removed role ? |
193 | 192 |
r += htmltext('<li><em>') |
... | ... | |
332 | 331 |
# optimize query by removing the roles criterias if they are all |
333 | 332 |
# checked |
334 | 333 |
possible_roles = ['admin', 'none'] |
335 |
possible_roles.extend(Role.keys())
|
|
334 |
possible_roles.extend(get_publisher().role_class.keys())
|
|
336 | 335 |
if set(possible_roles) == set(checked_roles): |
337 | 336 |
checked_roles = None |
338 | 337 | |
... | ... | |
464 | 463 |
r += htmltext('<input type="hidden" name="filter" value="true"/>') |
465 | 464 |
r += htmltext('<ul>') |
466 | 465 |
roles = [('admin', _('Site Administrator'))] |
467 |
for role in Role.select():
|
|
466 |
for role in get_publisher().role_class.select():
|
|
468 | 467 |
roles.append((role.id, role.name)) |
469 | 468 |
roles.append(('none', _('None'))) |
470 | 469 |
wcs/api.py | ||
---|---|---|
38 | 38 |
from wcs.carddef import CardDef |
39 | 39 |
from wcs.formdef import FormDef |
40 | 40 |
from wcs.data_sources import get_object as get_data_source_object |
41 |
from wcs.roles import Role, logged_users_role
|
|
41 |
from wcs.roles import logged_users_role |
|
42 | 42 |
from wcs.forms.common import FormStatusPage |
43 | 43 |
import wcs.qommon.storage as st |
44 | 44 |
from wcs.api_utils import sign_url_auto_orig, is_url_signed, get_user_from_api_query_string, get_query_flag |
... | ... | |
695 | 695 |
role_id = formdef_workflow_roles.get(wf_role_id) |
696 | 696 |
if role_id: |
697 | 697 |
try: |
698 |
workflow_function['role'] = Role.get(role_id).get_json_export_dict() |
|
698 |
workflow_function['role'] = ( |
|
699 |
get_publisher().role_class.get(role_id).get_json_export_dict() |
|
700 |
) |
|
699 | 701 |
except KeyError: |
700 | 702 |
pass |
701 | 703 |
formdict['functions'][wf_role_id] = workflow_function |
... | ... | |
814 | 816 |
user_info = user.get_substitution_variables(prefix='') |
815 | 817 |
del user_info['user'] |
816 | 818 |
user_info['id'] = user.id |
817 |
user_roles = [Role.get(x, ignore_errors=True) for x in user.roles or []]
|
|
819 |
user_roles = [get_publisher().role_class.get(x, ignore_errors=True) for x in user.roles or []]
|
|
818 | 820 |
user_info['user_roles'] = [x.get_json_export_dict() for x in user_roles if x] |
819 | 821 |
return json.dumps(user_info, cls=misc.JSONEncoder) |
820 | 822 | |
... | ... | |
953 | 955 |
user_info = user.get_substitution_variables(prefix='') |
954 | 956 |
del user_info['user'] |
955 | 957 |
user_info['user_id'] = user.id |
956 |
user_roles = [Role.get(x, ignore_errors=True) for x in user.roles or []]
|
|
958 |
user_roles = [get_publisher().role_class.get(x, ignore_errors=True) for x in user.roles or []]
|
|
957 | 959 |
user_info['user_roles'] = [x.get_json_export_dict() for x in user_roles if x] |
958 | 960 |
# add attributes to be usable as datasource |
959 | 961 |
user_info['id'] = user.id |
... | ... | |
1085 | 1087 |
if not (is_url_signed() or (get_request().user and get_request().user.can_go_in_admin())): |
1086 | 1088 |
raise AccessForbiddenError('unsigned request or user has no access to backoffice') |
1087 | 1089 |
list_roles = [] |
1088 |
charset = get_publisher().site_charset |
|
1089 |
for role in Role.select(): |
|
1090 |
for role in get_publisher().role_class.select(): |
|
1090 | 1091 |
if not role.is_internal(): |
1091 | 1092 |
list_roles.append(role.get_json_export_dict()) |
1092 | 1093 |
get_response().set_content_type('application/json') |
wcs/backoffice/cards.py | ||
---|---|---|
26 | 26 | |
27 | 27 |
from wcs.carddef import CardDef |
28 | 28 |
from wcs.categories import CardDefCategory |
29 |
from wcs.roles import Role |
|
30 | 29 |
from wcs.workflows import Workflow |
31 | 30 |
from wcs.admin.categories import CardDefCategoriesDirectory |
32 | 31 |
from wcs.admin.forms import FormsDirectory, FormDefPage, FormDefUI, html_top, OptionsDirectory |
... | ... | |
142 | 141 |
role_id = self.formdef.workflow_roles.get(wf_role_id) |
143 | 142 |
if role_id: |
144 | 143 |
try: |
145 |
role = Role.get(role_id)
|
|
144 |
role = get_publisher().role_class.get(role_id)
|
|
146 | 145 |
role_label = role.name |
147 | 146 |
except KeyError: |
148 | 147 |
# removed role ? |
wcs/backoffice/management.py | ||
---|---|---|
72 | 72 |
from wcs.categories import Category |
73 | 73 |
from wcs.formdata import FormData |
74 | 74 |
from wcs.formdef import FormDef |
75 |
from wcs.roles import logged_users_role, Role
|
|
75 |
from wcs.roles import logged_users_role |
|
76 | 76 |
from wcs.variables import LazyFieldVar |
77 | 77 |
from wcs.workflows import template_on_formdata, WorkflowStatusItem |
78 | 78 | |
... | ... | |
3296 | 3296 |
acting_role_id = self.filled.get_role_translation(key) |
3297 | 3297 |
if acting_role_id: |
3298 | 3298 |
try: |
3299 |
acting_role = Role.get(acting_role_id)
|
|
3299 |
acting_role = get_publisher().role_class.get(acting_role_id)
|
|
3300 | 3300 |
r += htmltext('<div class="value"><span>%s</span></div>') % acting_role.name |
3301 | 3301 |
except KeyError: |
3302 | 3302 |
r += htmltext('<div class="value"><span>%s %s</span></div>') % ( |
wcs/ctl/hobo_notify.py | ||
---|---|---|
19 | 19 |
import json |
20 | 20 | |
21 | 21 |
from quixote import get_publisher |
22 |
from wcs.roles import Role |
|
23 | 22 |
from ..qommon.ctl import Command |
24 | 23 |
from ..qommon.publisher import get_cfg |
25 | 24 |
from wcs.admin.settings import UserFieldsFormDef |
... | ... | |
116 | 115 |
emails = [force_str(email) for email in o['emails']] |
117 | 116 |
emails_to_members = o['emails_to_members'] |
118 | 117 |
# Find existing role |
119 |
role = Role.resolve(uuid, slug, name)
|
|
118 |
role = get_publisher().role_class.resolve(uuid, slug, name)
|
|
120 | 119 |
if not role: |
121 | 120 |
if action != 'provision': |
122 | 121 |
continue |
123 |
role = Role(id=uuid)
|
|
122 |
role = get_publisher().role_class(id=uuid)
|
|
124 | 123 |
if action == 'provision': |
125 | 124 |
# Provision/rename |
126 | 125 |
role.name = name |
... | ... | |
138 | 137 |
role.remove_self() |
139 | 138 |
# All roles have been sent |
140 | 139 |
if full and action == 'provision': |
141 |
for role in Role.select(ignore_errors=True):
|
|
140 |
for role in get_publisher().role_class.select(ignore_errors=True):
|
|
142 | 141 |
if role and role.uuid not in uuids: |
143 | 142 |
role.remove_self() |
144 | 143 | |
... | ... | |
186 | 185 |
user.is_admin = o.get('is_superuser', False) |
187 | 186 |
user.roles = [] |
188 | 187 |
for role_ref in o.get('roles', []): |
189 |
role = Role.resolve(role_ref['uuid'])
|
|
188 |
role = get_publisher().role_class.resolve(role_ref['uuid'])
|
|
190 | 189 |
if role and role.id not in user.roles: |
191 | 190 |
user.add_roles([role.id]) |
192 | 191 |
user.set_attributes_from_formdata(user.form_data) |
... | ... | |
203 | 202 |
users = User.get_users_with_name_identifier(o['uuid']) |
204 | 203 |
for user in users: |
205 | 204 |
user.set_deleted() |
206 |
except Exception as e:
|
|
205 |
except Exception: |
|
207 | 206 |
publisher.notify_of_exception(sys.exc_info(), context='[PROVISIONNING]') |
208 | 207 | |
209 | 208 |
wcs/formdata.py | ||
---|---|---|
33 | 33 |
from .qommon.substitution import Substitutions, invalidate_substitution_cache |
34 | 34 |
from .qommon.template import Template |
35 | 35 | |
36 |
from .roles import Role |
|
37 | 36 |
from .fields import FileField |
38 | 37 | |
39 | 38 | |
... | ... | |
694 | 693 | |
695 | 694 |
def get_handling_role(self): |
696 | 695 |
try: |
697 |
return Role.get(self.get_handling_role_id())
|
|
696 |
return get_publisher().role_class.get(self.get_handling_role_id())
|
|
698 | 697 |
except KeyError: |
699 | 698 |
return None |
700 | 699 | |
... | ... | |
840 | 839 |
for role_type, role_id in workflow_roles.items(): |
841 | 840 |
prefix = 'form_role_%s_' % role_type.replace('-', '_').strip('_') |
842 | 841 |
try: |
843 |
d.update(Role.get(role_id).get_substitution_variables(prefix))
|
|
842 |
d.update(get_publisher().role_class.get(role_id).get_substitution_variables(prefix))
|
|
844 | 843 |
except KeyError: |
845 | 844 |
pass |
846 | 845 | |
... | ... | |
1202 | 1201 |
# exclude special _submitter value |
1203 | 1202 |
role_list = [x for x in data['roles'][role_key] if x != '_submitter'] |
1204 | 1203 |
# get role objects |
1205 |
role_list = [Role.get(x, ignore_errors=True) for x in role_list]
|
|
1204 |
role_list = [get_publisher().role_class.get(x, ignore_errors=True) for x in role_list]
|
|
1206 | 1205 |
# export as json dicts |
1207 | 1206 |
role_list = [x.get_json_export_dict() for x in role_list if x is not None] |
1208 | 1207 |
data['roles'][role_key] = role_list |
wcs/formdef.py | ||
---|---|---|
41 | 41 |
from .qommon.publisher import get_publisher_class |
42 | 42 | |
43 | 43 |
from .formdata import FormData |
44 |
from .roles import Role, logged_users_role
|
|
44 |
from .roles import logged_users_role |
|
45 | 45 |
from .categories import Category |
46 | 46 |
from . import fields |
47 | 47 |
from . import data_sources |
... | ... | |
984 | 984 |
role = force_text(role_id, charset) |
985 | 985 |
else: |
986 | 986 |
try: |
987 |
role = force_text(Role.get(role_id).name, charset)
|
|
987 |
role = force_text(get_publisher().role_class.get(role_id).name, charset)
|
|
988 | 988 |
except KeyError: |
989 | 989 |
role = force_text(role_id, charset) |
990 | 990 |
sub = ET.SubElement(roles, 'role') |
... | ... | |
1002 | 1002 |
role = force_text(role_id, charset) |
1003 | 1003 |
else: |
1004 | 1004 |
try: |
1005 |
role = force_text(Role.get(role_id).name, charset)
|
|
1005 |
role = force_text(get_publisher().role_class.get(role_id).name, charset)
|
|
1006 | 1006 |
except KeyError: |
1007 | 1007 |
role = force_text(role_id, charset) |
1008 | 1008 |
sub = ET.SubElement(roles, 'role') |
... | ... | |
1196 | 1196 |
role_id = value |
1197 | 1197 |
elif include_id: |
1198 | 1198 |
role_id = role_node.attrib.get('role_id') |
1199 |
if role_id and not Role.has_key(role_id):
|
|
1199 |
if role_id and not get_publisher().role_class.get(role_id, ignore_errors=True):
|
|
1200 | 1200 |
role_id = None |
1201 | 1201 | |
1202 | 1202 |
if not role_id: |
1203 |
for role in Role.select(ignore_errors=True):
|
|
1203 |
for role in get_publisher().role_class.select(ignore_errors=True):
|
|
1204 | 1204 |
if role.name == value: |
1205 | 1205 |
role_id = role.id |
1206 | 1206 |
break |
wcs/publisher.py | ||
---|---|---|
55 | 55 |
from . import sessions |
56 | 56 |
from .qommon.cron import CronJob |
57 | 57 | |
58 |
from .roles import Role |
|
58 | 59 |
from .users import User |
59 | 60 |
from .tracking_code import TrackingCode |
60 | 61 | |
... | ... | |
157 | 158 |
from . import sql |
158 | 159 | |
159 | 160 |
self.user_class = sql.SqlUser |
161 |
self.role_class = sql.Role |
|
160 | 162 |
self.tracking_code_class = sql.TrackingCode |
161 | 163 |
self.session_class = sql.Session |
162 | 164 |
self.custom_view_class = sql.CustomView |
... | ... | |
165 | 167 |
sql.get_connection(new=True) |
166 | 168 |
else: |
167 | 169 |
self.user_class = User |
170 |
self.role_class = Role |
|
168 | 171 |
self.tracking_code_class = TrackingCode |
169 | 172 |
self.session_class = sessions.BasicSession |
170 | 173 |
self.custom_view_class = custom_views.CustomView |
... | ... | |
218 | 221 |
for f in z.namelist(): |
219 | 222 |
if '.indexes' in f: |
220 | 223 |
continue |
221 |
if os.path.dirname(f) in ('formdefs_xml', 'carddefs_xml', 'workflows_xml', 'blockdefs_xml'): |
|
224 |
if os.path.dirname(f) in ( |
|
225 |
'formdefs_xml', |
|
226 |
'carddefs_xml', |
|
227 |
'workflows_xml', |
|
228 |
'blockdefs_xml', |
|
229 |
'roles_xml', |
|
230 |
): |
|
222 | 231 |
continue |
223 | 232 |
path = os.path.join(self.app_dir, f) |
224 | 233 |
if not os.path.exists(os.path.dirname(path)): |
... | ... | |
284 | 293 |
carddefs.append(carddef) |
285 | 294 |
results['carddefs'] += 1 |
286 | 295 | |
296 |
# sixth pass, roles |
|
297 |
roles = [] |
|
298 |
for f in z.namelist(): |
|
299 |
if os.path.dirname(f) == 'roles_xml' and os.path.basename(f): |
|
300 |
role = self.role_class.import_from_xml(z.open(f), include_id=True) |
|
301 |
role.store() |
|
302 |
roles.append(role) |
|
303 |
results['roles'] += 1 |
|
304 | ||
287 | 305 |
# rebuild indexes for imported objects |
288 | 306 |
for k, v in results.items(): |
289 | 307 |
if k == 'settings': |
... | ... | |
306 | 324 | |
307 | 325 |
klass = Category |
308 | 326 |
elif k == 'roles': |
309 |
from .roles import Role |
|
310 | ||
311 |
klass = Role |
|
327 |
klass = self.role_class |
|
312 | 328 |
elif k == 'workflows': |
313 | 329 |
from wcs.workflows import Workflow |
314 | 330 | |
... | ... | |
335 | 351 |
sql.get_connection(new=True) |
336 | 352 |
sql.do_session_table() |
337 | 353 |
sql.do_user_table() |
354 |
sql.do_role_table() |
|
338 | 355 |
sql.do_tracking_code_table() |
339 | 356 |
sql.do_custom_views_table() |
340 | 357 |
sql.do_snapshots_table() |
wcs/qommon/saml2.py | ||
---|---|---|
36 | 36 |
from .publisher import get_cfg, get_logger |
37 | 37 |
from . import _, force_str |
38 | 38 |
from .template import error_page |
39 |
from wcs.roles import Role |
|
40 | 39 | |
41 | 40 |
from . import errors |
42 | 41 | |
... | ... | |
495 | 494 |
# point roles in authentic where provisionned from w.c.s. and join |
496 | 495 |
# was done on the slug field. |
497 | 496 |
for uuid in m['role-slug']: |
498 |
role = Role.resolve(uuid)
|
|
497 |
role = get_publisher().role_class.resolve(uuid)
|
|
499 | 498 |
if not role: |
500 | 499 |
logger.warning('role uuid %s is unknown', uuid) |
501 | 500 |
continue |
wcs/qommon/sessions.py | ||
---|---|---|
175 | 175 | |
176 | 176 |
def create_form_token(self): |
177 | 177 |
token = super(Session, self).create_form_token() |
178 |
print('created token:', token) |
|
179 | 178 |
open(self.get_form_token_filepath(token), 'wb').close() |
180 | 179 |
return token |
181 | 180 |
wcs/qommon/templatetags/qommon.py | ||
---|---|---|
26 | 26 | |
27 | 27 |
import pyproj |
28 | 28 |
from pyproj import Geod |
29 |
from quixote import get_publisher |
|
29 | 30 | |
30 | 31 |
try: |
31 | 32 |
import langdetect |
... | ... | |
43 | 44 |
from wcs.qommon import evalutils |
44 | 45 |
from wcs.qommon import tokens |
45 | 46 |
from wcs.qommon.admin.texts import TextsDirectory |
46 |
from wcs.roles import Role |
|
47 | 47 | |
48 | 48 |
register = template.Library() |
49 | 49 | |
... | ... | |
698 | 698 |
return False |
699 | 699 |
for role_id in user.get_roles(): |
700 | 700 |
try: |
701 |
if role_name == Role.get(role_id).name:
|
|
701 |
if role_name == get_publisher().role_class.get(role_id).name:
|
|
702 | 702 |
return True |
703 | 703 |
except KeyError: # role has been deleted |
704 | 704 |
pass |
... | ... | |
711 | 711 |
# do not fail on non-user objects, just return empty list |
712 | 712 |
return [] |
713 | 713 |
role_ids = user.get_roles() |
714 |
roles = [Role.get(x, ignore_errors=True, ignore_migration=True) for x in role_ids]
|
|
714 |
roles = [get_publisher().role_class.get(x, ignore_errors=True, ignore_migration=True) for x in role_ids]
|
|
715 | 715 |
return [x.name for x in roles if x] |
716 | 716 | |
717 | 717 |
wcs/roles.py | ||
---|---|---|
14 | 14 |
# You should have received a copy of the GNU General Public License |
15 | 15 |
# along with this program; if not, see <http://www.gnu.org/licenses/>. |
16 | 16 | |
17 |
import xml.etree.ElementTree as ET |
|
18 | ||
17 | 19 |
from django.utils.encoding import force_text |
18 | 20 |
from quixote import get_publisher |
19 | 21 | |
... | ... | |
24 | 26 |
class Role(StorableObject): |
25 | 27 |
_names = 'roles' |
26 | 28 |
_indexes = ['uuid', 'slug'] |
29 |
xml_root_node = 'role' |
|
27 | 30 | |
28 | 31 |
name = None |
29 | 32 |
uuid = None |
... | ... | |
34 | 37 |
emails_to_members = False |
35 | 38 |
allows_backoffice_access = True |
36 | 39 | |
40 |
TEXT_ATTRIBUTES = ['name', 'uuid', 'slug', 'details', 'emails'] |
|
41 |
BOOLEAN_ATTRIBUTES = ['internal', 'emails_to_members', 'allows_backoffice_access'] |
|
42 | ||
37 | 43 |
def __init__(self, name=None, id=None): |
38 | 44 |
StorableObject.__init__(self, id=id) |
39 | 45 |
self.name = name |
... | ... | |
98 | 104 |
'id': self.id, |
99 | 105 |
} |
100 | 106 | |
107 |
def export_to_xml(self, include_id=False): |
|
108 |
charset = get_publisher().site_charset |
|
109 |
root = ET.Element(self.xml_root_node) |
|
110 |
if include_id and self.id: |
|
111 |
root.attrib['id'] = str(self.id) |
|
112 |
for text_attribute in list(self.TEXT_ATTRIBUTES): |
|
113 |
if not hasattr(self, text_attribute) or not getattr(self, text_attribute): |
|
114 |
continue |
|
115 |
ET.SubElement(root, text_attribute).text = force_text(getattr(self, text_attribute), charset) |
|
116 |
for boolean_attribute in self.BOOLEAN_ATTRIBUTES: |
|
117 |
if not hasattr(self, boolean_attribute): |
|
118 |
continue |
|
119 |
value = getattr(self, boolean_attribute) |
|
120 |
if value: |
|
121 |
value = 'true' |
|
122 |
else: |
|
123 |
value = 'false' |
|
124 |
ET.SubElement(root, boolean_attribute).text = value |
|
125 |
return root |
|
126 | ||
127 |
@classmethod |
|
128 |
def import_from_xml(cls, fd, charset=None, include_id=False): |
|
129 |
try: |
|
130 |
tree = ET.parse(fd) |
|
131 |
except Exception: |
|
132 |
raise ValueError() |
|
133 | ||
134 |
if charset is None: |
|
135 |
charset = get_publisher().site_charset |
|
136 |
assert charset == 'utf-8' |
|
137 |
role = cls() |
|
138 | ||
139 |
# if the tree we get is actually a ElementTree for real, we get its |
|
140 |
# root element and go on happily. |
|
141 |
if not ET.iselement(tree): |
|
142 |
tree = tree.getroot() |
|
143 | ||
144 |
if include_id and tree.attrib.get('id'): |
|
145 |
role.id = tree.attrib.get('id') |
|
146 |
for text_attribute in list(cls.TEXT_ATTRIBUTES): |
|
147 |
value = tree.find(text_attribute) |
|
148 |
if value is None or value.text is None: |
|
149 |
continue |
|
150 |
setattr(role, text_attribute, misc.xml_node_text(value)) |
|
151 | ||
152 |
for boolean_attribute in cls.BOOLEAN_ATTRIBUTES: |
|
153 |
value = tree.find(boolean_attribute) |
|
154 |
if value is None: |
|
155 |
continue |
|
156 |
setattr(role, boolean_attribute, value.text == 'true') |
|
157 | ||
158 |
return role |
|
159 | ||
101 | 160 |
@classmethod |
102 | 161 |
def resolve(cls, uuid, slug=None, name=None): |
103 | 162 |
try: |
... | ... | |
132 | 191 | |
133 | 192 | |
134 | 193 |
def get_user_roles(): |
135 |
t = sorted([(misc.simplify(x.name), x.id, x.name, x.id) for x in Role.select() if not x.is_internal()]) |
|
194 |
t = sorted( |
|
195 |
[ |
|
196 |
(misc.simplify(x.name), x.id, x.name, x.id) |
|
197 |
for x in get_publisher().role_class.select() |
|
198 |
if not x.is_internal() |
|
199 |
] |
|
200 |
) |
|
136 | 201 |
return [x[1:] for x in t] |
wcs/sql.py | ||
---|---|---|
46 | 46 |
import wcs.custom_views |
47 | 47 |
import wcs.formdata |
48 | 48 |
import wcs.logged_errors |
49 |
import wcs.roles |
|
49 | 50 |
import wcs.snapshots |
50 | 51 |
import wcs.tracking_code |
51 | 52 |
import wcs.users |
... | ... | |
802 | 803 |
cur.close() |
803 | 804 | |
804 | 805 | |
806 |
def do_role_table(concurrently=False): |
|
807 |
conn, cur = get_connection_and_cursor() |
|
808 |
table_name = 'roles' |
|
809 | ||
810 |
cur.execute( |
|
811 |
'''SELECT COUNT(*) FROM information_schema.tables |
|
812 |
WHERE table_schema = 'public' |
|
813 |
AND table_name = %s''', |
|
814 |
(table_name,), |
|
815 |
) |
|
816 |
if cur.fetchone()[0] == 0: |
|
817 |
cur.execute( |
|
818 |
'''CREATE TABLE %s (id VARCHAR PRIMARY KEY, |
|
819 |
name VARCHAR, |
|
820 |
uuid UUID, |
|
821 |
slug VARCHAR UNIQUE, |
|
822 |
internal BOOLEAN, |
|
823 |
details VARCHAR, |
|
824 |
emails VARCHAR[], |
|
825 |
emails_to_members BOOLEAN, |
|
826 |
allows_backoffice_access BOOLEAN)''' |
|
827 |
% table_name |
|
828 |
) |
|
829 |
cur.execute( |
|
830 |
'''SELECT column_name FROM information_schema.columns |
|
831 |
WHERE table_schema = 'public' |
|
832 |
AND table_name = %s''', |
|
833 |
(table_name,), |
|
834 |
) |
|
835 |
existing_fields = set([x[0] for x in cur.fetchall()]) |
|
836 | ||
837 |
needed_fields = set([x[0] for x in Role._table_static_fields]) |
|
838 | ||
839 |
# delete obsolete fields |
|
840 |
for field in existing_fields - needed_fields: |
|
841 |
cur.execute('''ALTER TABLE %s DROP COLUMN %s''' % (table_name, field)) |
|
842 | ||
843 |
conn.commit() |
|
844 |
cur.close() |
|
845 | ||
846 | ||
847 |
def migrate_legacy_roles(): |
|
848 |
# store old pickle roles in SQL |
|
849 |
from wcs import roles |
|
850 | ||
851 |
for role_id in roles.Role.keys(): |
|
852 |
role = roles.Role.get(role_id) |
|
853 |
role.__class__ = Role |
|
854 |
role.store() |
|
855 | ||
856 | ||
805 | 857 |
def do_tracking_code_table(): |
806 | 858 |
conn, cur = get_connection_and_cursor() |
807 | 859 |
table_name = 'tracking_codes' |
... | ... | |
1401 | 1453 | |
1402 | 1454 |
@classmethod |
1403 | 1455 |
@guard_postgres |
1404 |
def get(cls, id, ignore_errors=False, ignore_migration=False): |
|
1405 |
if cls._numerical_id or id is None:
|
|
1456 |
def get(cls, id, ignore_errors=False, ignore_migration=False, column=None):
|
|
1457 |
if column is None and (cls._numerical_id or id is None):
|
|
1406 | 1458 |
try: |
1407 | 1459 |
int(id) |
1408 | 1460 |
except (TypeError, ValueError): |
... | ... | |
1414 | 1466 | |
1415 | 1467 |
sql_statement = '''SELECT %s |
1416 | 1468 |
FROM %s |
1417 |
WHERE id = %%(id)s''' % (
|
|
1469 |
WHERE %s = %%(value)s''' % (
|
|
1418 | 1470 |
', '.join([x[0] for x in cls._table_static_fields] + cls.get_data_fields()), |
1419 | 1471 |
cls._table_name, |
1472 |
column or 'id', |
|
1420 | 1473 |
) |
1421 |
cur.execute(sql_statement, {'id': str(id)})
|
|
1474 |
cur.execute(sql_statement, {'value': str(id)})
|
|
1422 | 1475 |
row = cur.fetchone() |
1423 | 1476 |
if row is None: |
1424 | 1477 |
cur.close() |
... | ... | |
1428 | 1481 |
cur.close() |
1429 | 1482 |
return cls._row2ob(row) |
1430 | 1483 | |
1484 |
@classmethod |
|
1485 |
@guard_postgres |
|
1486 |
def get_on_index(cls, value, index, ignore_errors=False, **kwargs): |
|
1487 |
return cls.get(value, ignore_errors=ignore_errors, column=index) |
|
1488 | ||
1431 | 1489 |
@classmethod |
1432 | 1490 |
@guard_postgres |
1433 | 1491 |
def get_ids(cls, ids, ignore_errors=False, keep_order=False, fields=None, order_by=None): |
... | ... | |
2447 | 2505 |
return objects |
2448 | 2506 | |
2449 | 2507 | |
2508 |
class Role(SqlMixin, wcs.roles.Role): |
|
2509 |
_table_name = 'roles' |
|
2510 |
_table_static_fields = [ |
|
2511 |
('id', 'varchar'), |
|
2512 |
('name', 'varchar'), |
|
2513 |
('uuid', 'uuid'), |
|
2514 |
('slug', 'varchar'), |
|
2515 |
('internal', 'boolean'), |
|
2516 |
('details', 'varchar'), |
|
2517 |
('emails', 'varchar[]'), |
|
2518 |
('emails_to_members', 'boolean'), |
|
2519 |
('allows_backoffice_access', 'boolean'), |
|
2520 |
] |
|
2521 | ||
2522 |
_numerical_id = False |
|
2523 | ||
2524 |
@guard_postgres |
|
2525 |
def store(self): |
|
2526 |
if self.slug is None: |
|
2527 |
# set slug if it's not yet there |
|
2528 |
self.slug = self.get_new_slug() |
|
2529 | ||
2530 |
sql_dict = { |
|
2531 |
'id': self.id, |
|
2532 |
'name': self.name, |
|
2533 |
'uuid': self.uuid, |
|
2534 |
'slug': self.slug, |
|
2535 |
'internal': self.internal, |
|
2536 |
'details': self.details, |
|
2537 |
'emails': self.emails, |
|
2538 |
'emails_to_members': self.emails_to_members, |
|
2539 |
'allows_backoffice_access': self.allows_backoffice_access, |
|
2540 |
} |
|
2541 | ||
2542 |
conn, cur = get_connection_and_cursor() |
|
2543 |
column_names = sql_dict.keys() |
|
2544 | ||
2545 |
if not self.id: |
|
2546 |
sql_dict['id'] = self.get_new_id() |
|
2547 |
sql_statement = '''INSERT INTO %s (%s) |
|
2548 |
VALUES (%s) |
|
2549 |
RETURNING id''' % ( |
|
2550 |
self._table_name, |
|
2551 |
', '.join(column_names), |
|
2552 |
', '.join(['%%(%s)s' % x for x in column_names]), |
|
2553 |
) |
|
2554 |
while True: |
|
2555 |
try: |
|
2556 |
cur.execute(sql_statement, sql_dict) |
|
2557 |
except psycopg2.IntegrityError: |
|
2558 |
conn.rollback() |
|
2559 |
sql_dict['id'] = self.get_new_id() |
|
2560 |
else: |
|
2561 |
break |
|
2562 |
self.id = str_encode(cur.fetchone()[0]) |
|
2563 |
else: |
|
2564 |
sql_statement = '''UPDATE %s SET %s WHERE id = %%(id)s RETURNING id''' % ( |
|
2565 |
self._table_name, |
|
2566 |
', '.join(['%s = %%(%s)s' % (x, x) for x in column_names]), |
|
2567 |
) |
|
2568 |
cur.execute(sql_statement, sql_dict) |
|
2569 |
if cur.fetchone() is None: |
|
2570 |
sql_statement = '''INSERT INTO %s (%s) VALUES (%s)''' % ( |
|
2571 |
self._table_name, |
|
2572 |
', '.join(column_names), |
|
2573 |
', '.join(['%%(%s)s' % x for x in column_names]), |
|
2574 |
) |
|
2575 |
cur.execute(sql_statement, sql_dict) |
|
2576 | ||
2577 |
conn.commit() |
|
2578 |
cur.close() |
|
2579 | ||
2580 |
@classmethod |
|
2581 |
def get_data_fields(cls): |
|
2582 |
return [] |
|
2583 | ||
2584 |
@classmethod |
|
2585 |
def _row2ob(cls, row, **kwargs): |
|
2586 |
o = cls() |
|
2587 |
for field, value in zip(cls._table_static_fields, tuple(row)): |
|
2588 |
if field[1] in ('varchar', 'varchar[]'): |
|
2589 |
setattr(o, field[0], str_encode(value)) |
|
2590 |
else: |
|
2591 |
setattr(o, field[0], value) |
|
2592 |
return o |
|
2593 | ||
2594 | ||
2450 | 2595 |
class Session(SqlMixin, wcs.sessions.BasicSession): |
2451 | 2596 |
_table_name = 'sessions' |
2452 | 2597 |
_table_static_fields = [ |
... | ... | |
3145 | 3290 |
# latest migration, number + description (description is not used |
3146 | 3291 |
# programmaticaly but will make sure git conflicts if two migrations are |
3147 | 3292 |
# separately added with the same number) |
3148 |
SQL_LEVEL = (48, 'remove acked attribute from LoggedError')
|
|
3293 |
SQL_LEVEL = (49, 'Role migration')
|
|
3149 | 3294 | |
3150 | 3295 | |
3151 | 3296 |
def migrate_global_views(conn, cur): |
... | ... | |
3316 | 3461 |
# 47: store LoggedErrors in SQL |
3317 | 3462 |
# 48: remove acked attribute from LoggedError |
3318 | 3463 |
do_loggederrors_table() |
3464 |
if sql_level < 49: |
|
3465 |
# 49: store Role in SQL |
|
3466 |
do_role_table() |
|
3467 |
migrate_legacy_roles() |
|
3319 | 3468 | |
3320 | 3469 |
cur.execute('''UPDATE wcs_meta SET value = %s WHERE key = %s''', (str(SQL_LEVEL[0]), 'sql_level')) |
3321 | 3470 |
wcs/users.py | ||
---|---|---|
16 | 16 | |
17 | 17 |
import datetime |
18 | 18 | |
19 |
from quixote import get_publisher |
|
20 | ||
19 | 21 |
from .qommon import _, N_ |
20 | 22 |
from .qommon.misc import simplify |
21 | 23 |
from .qommon.storage import StorableObject |
... | ... | |
134 | 136 |
return True |
135 | 137 |
if self.anonymous: |
136 | 138 |
return False |
137 |
from .roles import Role |
|
138 | 139 | |
139 | 140 |
for role_id in self.roles or []: |
140 | 141 |
try: |
141 |
role = Role.get(role_id)
|
|
142 |
role = get_publisher().role_class.get(role_id)
|
|
142 | 143 |
if role.allows_backoffice_access: |
143 | 144 |
return True |
144 | 145 |
except KeyError: # role has been deleted |
wcs/variables.py | ||
---|---|---|
493 | 493 | |
494 | 494 |
@property |
495 | 495 |
def role(self): |
496 |
from wcs.roles import Role |
|
497 | ||
498 | 496 |
workflow_roles = {} |
499 | 497 |
if self._formdef.workflow_roles: |
500 | 498 |
workflow_roles.update(self._formdef.workflow_roles) |
... | ... | |
505 | 503 |
for role_type, role_id in workflow_roles.items(): |
506 | 504 |
prefix = '%s_' % role_type.replace('-', '_').strip('_') |
507 | 505 |
try: |
508 |
d.update(Role.get(role_id).get_substitution_variables(prefix))
|
|
506 |
d.update(get_publisher().role_class.get(role_id).get_substitution_variables(prefix))
|
|
509 | 507 |
except KeyError: |
510 | 508 |
pass |
511 | 509 |
return d |
wcs/wf/aggregation_email.py | ||
---|---|---|
22 | 22 |
from ..qommon import emails |
23 | 23 | |
24 | 24 |
from wcs.workflows import WorkflowStatusItem, register_item_class |
25 |
from wcs.roles import Role |
|
26 | 25 | |
27 | 26 | |
28 | 27 |
class AggregationEmailWorkflowStatusItem(WorkflowStatusItem): |
... | ... | |
110 | 109 |
aggregate.remove_self() |
111 | 110 | |
112 | 111 |
try: |
113 |
role = Role.get(aggregate_id)
|
|
112 |
role = get_publisher().role_class.get(aggregate_id)
|
|
114 | 113 |
except KeyError: |
115 | 114 |
continue |
116 | 115 |
if not role.get_emails(): |
wcs/wf/dispatch.py | ||
---|---|---|
22 | 22 |
from ..qommon import _, N_ |
23 | 23 |
from ..qommon.form import * |
24 | 24 |
from ..qommon.template import Template |
25 |
from wcs.roles import Role, get_user_roles
|
|
25 |
from wcs.roles import get_user_roles |
|
26 | 26 |
from wcs.workflows import XmlSerialisable, WorkflowStatusItem, register_item_class, get_role_name |
27 | 27 | |
28 | 28 | |
... | ... | |
232 | 232 |
break |
233 | 233 | |
234 | 234 |
if new_role_id: |
235 |
if not Role.has_key(new_role_id):
|
|
235 |
if not get_publisher().role_class.get(new_role_id, ignore_errors=True):
|
|
236 | 236 |
get_publisher().record_error( |
237 | 237 |
_('error in dispatch, missing role (%s)') % new_role_id, formdata=formdata |
238 | 238 |
) |
wcs/wf/notification.py | ||
---|---|---|
23 | 23 |
from ..qommon.template import TemplateError |
24 | 24 |
from ..qommon import get_logger |
25 | 25 | |
26 |
from wcs.roles import Role |
|
27 | 26 |
from wcs.workflows import WorkflowStatusItem, register_item_class, template_on_formdata |
28 | 27 |
from .wscall import WebserviceCallStatusItem |
29 | 28 | |
... | ... | |
156 | 155 | |
157 | 156 |
dest = formdata.get_role_translation(dest) |
158 | 157 |
try: |
159 |
role = Role.get(dest)
|
|
158 |
role = get_publisher().role_class.get(dest)
|
|
160 | 159 |
except KeyError: |
161 | 160 |
continue |
162 | 161 |
users.extend(get_publisher().user_class.get_users_with_role(role.id)) |
wcs/wf/roles.py | ||
---|---|---|
22 | 22 |
from ..qommon import _, N_ |
23 | 23 |
from ..qommon.form import * |
24 | 24 |
from wcs.workflows import WorkflowStatusItem, register_item_class |
25 |
from wcs.roles import get_user_roles, Role
|
|
25 |
from wcs.roles import get_user_roles |
|
26 | 26 |
from ..qommon.ident.idp import is_idp_managing_user_attributes |
27 | 27 |
from ..qommon.misc import http_post_request, http_delete_request |
28 | 28 |
from ..qommon.publisher import get_cfg, get_logger |
... | ... | |
99 | 99 |
request._user = user |
100 | 100 | |
101 | 101 |
def perform_idp(self, user, formdata, role_id): |
102 |
role = Role.get(role_id)
|
|
102 |
role = get_publisher().role_class.get(role_id)
|
|
103 | 103 |
role_uuid = role.uuid or role.slug |
104 | 104 |
user_uuid = user.name_identifiers[0] |
105 | 105 |
try: |
... | ... | |
169 | 169 |
request._user = user |
170 | 170 | |
171 | 171 |
def perform_idp(self, user, formdata, role_id): |
172 |
role = Role.get(role_id)
|
|
172 |
role = get_publisher().role_class.get(role_id)
|
|
173 | 173 |
role_uuid = role.uuid or role.slug |
174 | 174 |
user_uuid = user.name_identifiers[0] |
175 | 175 |
try: |
wcs/workflows.py | ||
---|---|---|
27 | 27 | |
28 | 28 |
from django.utils.encoding import force_text |
29 | 29 | |
30 |
from quixote import get_request, get_response, redirect |
|
30 |
from quixote import get_request, get_response, redirect, get_publisher
|
|
31 | 31 | |
32 | 32 |
from .qommon import _, N_, force_str |
33 | 33 |
from .qommon.misc import C_, get_as_datetime, file_digest, get_foreground_colour, xml_node_text |
... | ... | |
41 | 41 |
from .qommon.upload_storage import PicklableUpload, get_storage_object |
42 | 42 | |
43 | 43 |
from .conditions import Condition |
44 |
from .roles import Role, logged_users_role, get_user_roles
|
|
44 |
from .roles import logged_users_role, get_user_roles |
|
45 | 45 |
from .fields import FileField |
46 | 46 |
from .formdef import FormDef, FormdefImportError |
47 | 47 |
from .carddef import CardDef |
... | ... | |
1022 | 1022 |
# if we import using id, look at the role_id attribute |
1023 | 1023 |
if include_id and 'role_id' in elem.attrib: |
1024 | 1024 |
role_id = force_str(elem.attrib['role_id']) |
1025 |
if Role.has_key(role_id):
|
|
1025 |
if get_publisher().role_class.get(role_id, ignore_errors=True):
|
|
1026 | 1026 |
return role_id |
1027 | 1027 |
if WorkflowStatusItem.get_expression(role_id)['type'] in ('python', 'template'): |
1028 | 1028 |
return role_id |
1029 | 1029 | |
1030 | 1030 |
# if not using id, look up on the name |
1031 |
for role in Role.select(ignore_errors=True):
|
|
1031 |
for role in get_publisher().role_class.select(ignore_errors=True):
|
|
1032 | 1032 |
if role.name == value: |
1033 | 1033 |
return role.id |
1034 | 1034 | |
... | ... | |
1042 | 1042 |
raise WorkflowImportError(N_('Unknown referenced role (%s)'), (value,)) |
1043 | 1043 | |
1044 | 1044 |
# and if there's no match, create a new role |
1045 |
role = Role()
|
|
1045 |
role = get_publisher().role_class()
|
|
1046 | 1046 |
role.name = value |
1047 | 1047 |
role.store() |
1048 | 1048 |
return role.id |
... | ... | |
2125 | 2125 | |
2126 | 2126 |
def get_computed_role_id(self, role_id): |
2127 | 2127 |
new_role_id = self.compute(str(role_id)) |
2128 |
if Role.has_key(new_role_id):
|
|
2128 |
if get_publisher().role_class.get(new_role_id, ignore_errors=True):
|
|
2129 | 2129 |
return new_role_id |
2130 | 2130 |
# computed value, not an id, try to get role by slug |
2131 |
new_role = Role.get_on_index(new_role_id, 'slug', ignore_errors=True)
|
|
2131 |
new_role = get_publisher().role_class.get_on_index(new_role_id, 'slug', ignore_errors=True)
|
|
2132 | 2132 |
if new_role: |
2133 | 2133 |
return new_role.id |
2134 | 2134 |
# fallback to role label |
2135 |
for role in Role.select():
|
|
2135 |
for role in get_publisher().role_class.select():
|
|
2136 | 2136 |
if role.name == new_role_id: |
2137 | 2137 |
return role.id |
2138 | 2138 |
return None |
... | ... | |
2396 | 2396 |
return workflow.roles.get(role_id) |
2397 | 2397 |
else: |
2398 | 2398 |
try: |
2399 |
return Role.get(role_id).name
|
|
2399 |
return get_publisher().role_class.get(role_id).name
|
|
2400 | 2400 |
except KeyError: |
2401 | 2401 |
return |
2402 | 2402 | |
... | ... | |
2406 | 2406 |
if role_id.startswith('_') or role_id == 'logged-users': |
2407 | 2407 |
return force_text(role_id, charset) |
2408 | 2408 |
try: |
2409 |
return force_text(Role.get(role_id).name, charset)
|
|
2409 |
return force_text(get_publisher().role_class.get(role_id).name, charset)
|
|
2410 | 2410 |
except KeyError: |
2411 | 2411 |
return force_text(role_id, charset) |
2412 | 2412 | |
... | ... | |
2943 | 2943 |
dest = formdata.get_role_translation(dest) |
2944 | 2944 | |
2945 | 2945 |
try: |
2946 |
role = Role.get(dest)
|
|
2946 |
role = get_publisher().role_class.get(dest)
|
|
2947 | 2947 |
except KeyError: |
2948 | 2948 |
continue |
2949 | 2949 |
addresses.extend(role.get_emails()) |
2950 |
- |