0001-api-allow-wscall-output-as-formdata-submit-input-106.patch
tests/test_api.py | ||
---|---|---|
1 | 1 |
import pytest |
2 |
import json |
|
2 | 3 |
import shutil |
3 | 4 |
import os |
4 | 5 |
import hmac |
... | ... | |
19 | 20 |
from wcs.data_sources import NamedDataSource |
20 | 21 |
from wcs.workflows import Workflow |
21 | 22 |
from wcs.wf.jump import JumpWorkflowStatusItem |
22 |
from wcs import fields |
|
23 |
from wcs import fields, qommon
|
|
23 | 24 |
from wcs.api_utils import sign_url |
24 | 25 | |
25 | 26 |
from utilities import get_app, create_temporary_pub, clean_temporary_pub |
... | ... | |
363 | 364 | |
364 | 365 |
resp = get_app(pub).post_json('/api/formdefs/test/submit', {'data': {}}, status=403) |
365 | 366 |
assert resp.json['err'] == 1 |
366 |
assert resp.json['err_desc'] == 'no user'
|
|
367 |
assert resp.json['err_desc'] == 'unsigned API call'
|
|
367 | 368 | |
368 | 369 |
signed_url = sign_url('http://example.net/api/formdefs/test/submit' + |
369 | 370 |
'?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email), '1234') |
... | ... | |
448 | 449 | |
449 | 450 |
resp = get_app(pub).post_json('/api/formdefs/test/submit', {'data': {}}, status=403) |
450 | 451 |
assert resp.json['err'] == 1 |
451 |
assert resp.json['err_desc'] == 'no user'
|
|
452 |
assert resp.json['err_desc'] == 'unsigned API call'
|
|
452 | 453 | |
453 | 454 |
signed_url = sign_url('http://example.net/api/formdefs/test/submit' + |
454 | 455 |
'?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email), '1234') |
... | ... | |
503 | 504 | |
504 | 505 |
data_class.wipe() |
505 | 506 | |
507 | ||
508 |
def test_formdef_submit_from_wscall(pub, local_user): |
|
509 |
test_formdef_submit_with_varname(pub, local_user) |
|
510 |
formdef = FormDef.select()[0] |
|
511 | ||
512 |
formdata = formdef.data_class()() |
|
513 |
formdata.just_created() |
|
514 | ||
515 |
upload = PicklableUpload('test.txt', 'text/plain', 'ascii') |
|
516 |
upload.receive(['test']) |
|
517 | ||
518 |
formdata.data = { |
|
519 |
'0': 'xxx', |
|
520 |
'1': '1', |
|
521 |
'1_display': '1', |
|
522 |
'1_structured': { |
|
523 |
'id': '1', |
|
524 |
'text': 'foo', |
|
525 |
'more': 'XXX', |
|
526 |
}, |
|
527 |
'2': '10', |
|
528 |
'2_display': 'bar', |
|
529 |
'3': time.strptime('1970-01-01', '%Y-%m-%d'), |
|
530 |
'4': upload, |
|
531 |
'5': '1.5;2.25', |
|
532 |
} |
|
533 |
formdata.just_created() |
|
534 |
formdata.evolution[-1].status = 'wf-new' |
|
535 |
formdata.store() |
|
536 | ||
537 |
payload = json.loads( |
|
538 |
json.dumps(formdata.get_json_export_dict(), |
|
539 |
cls=qommon.misc.JSONEncoder, |
|
540 |
encoding=get_publisher().site_charset)) |
|
541 |
signed_url = sign_url('http://example.net/api/formdefs/test/submit?orig=coucou', '1234') |
|
542 |
url = signed_url[len('http://example.net'):] |
|
543 | ||
544 |
resp = get_app(pub).post_json(url, payload) |
|
545 |
assert resp.json['err'] == 0 |
|
546 |
new_formdata = formdef.data_class().get(resp.json['data']['id']) |
|
547 |
assert new_formdata.data['0'] == formdata.data['0'] |
|
548 |
assert new_formdata.data['1'] == formdata.data['1'] |
|
549 |
assert new_formdata.data['1_display'] == formdata.data['1_display'] |
|
550 |
assert new_formdata.data['1_structured'] == formdata.data['1_structured'] |
|
551 |
assert new_formdata.data['2'] == formdata.data['2'] |
|
552 |
assert new_formdata.data['2_display'] == formdata.data['2_display'] |
|
553 |
assert new_formdata.data['3'] == formdata.data['3'] |
|
554 |
assert new_formdata.data['4'].get_content() == formdata.data['4'].get_content() |
|
555 |
assert new_formdata.data['5'] == formdata.data['5'] |
|
556 |
assert new_formdata.user_id is None |
|
557 | ||
558 |
# add user |
|
559 |
formdata.user_id = local_user.id |
|
560 |
formdata.store() |
|
561 | ||
562 |
payload = json.loads( |
|
563 |
json.dumps(formdata.get_json_export_dict(), |
|
564 |
cls=qommon.misc.JSONEncoder, |
|
565 |
encoding=get_publisher().site_charset)) |
|
566 |
signed_url = sign_url('http://example.net/api/formdefs/test/submit?orig=coucou', '1234') |
|
567 |
url = signed_url[len('http://example.net'):] |
|
568 | ||
569 |
resp = get_app(pub).post_json(url, payload) |
|
570 |
assert resp.json['err'] == 0 |
|
571 |
new_formdata = formdef.data_class().get(resp.json['data']['id']) |
|
572 |
assert str(new_formdata.user_id) == str(local_user.id) |
|
573 | ||
574 | ||
506 | 575 |
def test_categories(pub): |
507 | 576 |
FormDef.wipe() |
508 | 577 |
Category.wipe() |
wcs/api.py | ||
---|---|---|
113 | 113 |
# } |
114 | 114 |
# } |
115 | 115 |
get_response().set_content_type('application/json') |
116 |
user = get_user_from_api_query_string() |
|
117 |
if not user: |
|
118 |
raise AccessForbiddenError('no user') |
|
119 | 116 |
if self.formdef.is_disabled(): |
120 | 117 |
raise AccessForbiddenError('disabled form') |
118 |
if not is_url_signed(): |
|
119 |
raise AccessForbiddenError('unsigned API call') |
|
120 |
user = get_user_from_api_query_string() |
|
121 | 121 |
json_input = get_request().json |
122 | 122 |
formdata = self.formdef.data_class()() |
123 | ||
124 |
if 'fields' in json_input and not 'data' in json_input: |
|
125 |
# this maps json output produced by wf/wscall.py to the json |
|
126 |
# formated expected as input. |
|
127 |
json_input['data'] = json_input['fields'] |
|
128 | ||
123 | 129 |
data = json_input['data'] |
124 | 130 |
# remap fields |
125 | 131 |
for field in self.formdef.fields: |
... | ... | |
146 | 152 |
formdata.data = data |
147 | 153 |
meta = json_input.get('meta') or {} |
148 | 154 |
if meta.get('backoffice-submission'): |
155 |
if not user: |
|
156 |
raise AccessForbiddenError('no user set for backoffice submission') |
|
149 | 157 |
if not self.formdef.backoffice_submission_roles: |
150 | 158 |
raise AccessForbiddenError('no backoffice submission roles') |
151 | 159 |
if not set(user.roles or []).intersection(self.formdef.backoffice_submission_roles): |
152 | 160 |
raise AccessForbiddenError('not cleared for backoffice submit') |
153 | 161 |
formdata.backoffice_submission = True |
154 |
else: |
|
162 |
elif 'user' in json_input: |
|
163 |
formdata_user = None |
|
164 |
for name_id in json_input['user'].get('NameID'): |
|
165 |
formdata_user = get_publisher().user_class.get_users_with_name_identifier(name_id) |
|
166 |
if formdata_user: |
|
167 |
break |
|
168 |
else: |
|
169 |
if json_input['user'].get('email'): |
|
170 |
formdata_user = get_publisher().user_class.get_users_with_email( |
|
171 |
json_input['user'].get('email')) |
|
172 |
if formdata_user: |
|
173 |
formdata.user_id = formdata_user[0].id |
|
174 |
elif user: |
|
155 | 175 |
formdata.user_id = user.id |
156 | 176 |
if json_input.get('context'): |
157 | 177 |
formdata.submission_context = json_input['context'] |
wcs/formdata.py | ||
---|---|---|
700 | 700 |
break |
701 | 701 |
if user: |
702 | 702 |
data['user'] = {'id': user.id, 'name': user.display_name} |
703 |
if user.email: |
|
704 |
data['user']['email'] = user.email |
|
705 |
if user.name_identifiers: |
|
706 |
data['user']['NameID'] = user.name_identifiers |
|
703 | 707 | |
704 | 708 |
data['fields'] = get_json_dict(self.formdef.fields, self.data, |
705 | 709 |
include_files=include_files) |
706 |
- |