0004-api-add-attachment-evolution-parts-to-json-53253.patch
tests/api/test_formdata.py | ||
---|---|---|
18 | 18 |
from wcs.blocks import BlockDef |
19 | 19 |
from wcs.data_sources import NamedDataSource |
20 | 20 |
from wcs.formdata import Evolution |
21 | 21 |
from wcs.formdef import FormDef |
22 | 22 |
from wcs.qommon import ods |
23 | 23 |
from wcs.qommon.http_request import HTTPRequest |
24 | 24 |
from wcs.qommon.ident.password_accounts import PasswordAccount |
25 | 25 |
from wcs.qommon.upload_storage import PicklableUpload |
26 |
from wcs.workflows import EditableWorkflowStatusItem, Workflow, WorkflowBackofficeFieldsFormDef |
|
26 |
from wcs.workflows import ( |
|
27 |
AttachmentEvolutionPart, |
|
28 |
EditableWorkflowStatusItem, |
|
29 |
Workflow, |
|
30 |
WorkflowBackofficeFieldsFormDef, |
|
31 |
) |
|
27 | 32 | |
28 | 33 |
from ..utilities import clean_temporary_pub, create_temporary_pub, get_app, login |
29 | 34 |
from .utils import sign_uri |
30 | 35 | |
31 | 36 | |
32 | 37 |
def pytest_generate_tests(metafunc): |
33 | 38 |
if 'pub' in metafunc.fixturenames: |
34 | 39 |
metafunc.parametrize('pub', ['pickle', 'sql'], indirect=True) |
... | ... | |
457 | 462 |
resp = get_app(pub).get(sign_uri('/api/forms/test/%s/' % formdata.id, user=local_user)) |
458 | 463 |
assert resp.json['workflow']['data']['xxx'] == 23 |
459 | 464 |
assert resp.json['workflow']['data']['blah']['filename'] == 'test.txt' |
460 | 465 |
assert resp.json['workflow']['data']['blah']['content_type'] == 'text/plain' |
461 | 466 |
assert base64.decodebytes(force_bytes(resp.json['workflow']['data']['blah']['content'])) == b'test' |
462 | 467 |
assert base64.decodebytes(force_bytes(resp.json['workflow']['data']['blah2']['content'])) == b'test' |
463 | 468 | |
464 | 469 | |
470 |
def test_formdata_with_evolution_part_attachment(pub, local_user): |
|
471 |
pub.role_class.wipe() |
|
472 |
role = pub.role_class(name='test') |
|
473 |
role.id = '123' |
|
474 |
role.store() |
|
475 | ||
476 |
local_user.roles = [role.id] |
|
477 |
local_user.store() |
|
478 | ||
479 |
FormDef.wipe() |
|
480 |
formdef = FormDef() |
|
481 |
formdef.name = 'test' |
|
482 |
formdef.fields = [] |
|
483 |
workflow = Workflow.get_default_workflow() |
|
484 |
workflow.id = '2' |
|
485 |
workflow.store() |
|
486 |
formdef.workflow_id = workflow.id |
|
487 |
formdef.workflow_roles = {'_receiver': role.id} |
|
488 |
formdef.store() |
|
489 | ||
490 |
formdef.data_class().wipe() |
|
491 |
formdata = formdef.data_class()() |
|
492 |
formdata.just_created() |
|
493 |
formdata.status = 'wf-new' |
|
494 |
formdata.evolution[-1].status = 'wf-new' |
|
495 |
formdata.evolution[-1].parts = [ |
|
496 |
AttachmentEvolutionPart( |
|
497 |
'hello.txt', fp=io.BytesIO(b'test'), content_type='text/plain', varname='testfile' |
|
498 |
) |
|
499 |
] |
|
500 |
formdata.store() |
|
501 | ||
502 |
resp = get_app(pub).get(sign_uri('/api/forms/test/%s/' % formdata.id, user=local_user)) |
|
503 |
assert len(resp.json['evolution']) == 1 |
|
504 |
assert len(resp.json['evolution'][0]['parts']) == 1 |
|
505 |
part = resp.json['evolution'][0]['parts'][0] |
|
506 |
assert part['filename'] == 'hello.txt' |
|
507 |
assert part['content_type'] == 'text/plain' |
|
508 |
assert 'content' in part |
|
509 |
assert base64.decodebytes(force_bytes(part['content'])) == b'test' |
|
510 | ||
511 |
resp = get_app(pub).get(sign_uri('/api/forms/test/%s/?anonymise' % formdata.id, user=local_user)) |
|
512 |
assert len(resp.json['evolution']) == 1 |
|
513 |
assert len(resp.json['evolution'][0]['parts']) == 1 |
|
514 |
part = resp.json['evolution'][0]['parts'][0] |
|
515 |
assert part['content_type'] == 'text/plain' |
|
516 |
assert 'filename' not in part |
|
517 |
assert 'content' not in part |
|
518 | ||
519 |
resp = get_app(pub).get(sign_uri('/api/forms/test/%s/?full=on' % formdata.id, user=local_user)) |
|
520 |
assert len(resp.json['evolution']) == 1 |
|
521 |
assert 'parts' not in resp.json['evolution'] |
|
522 | ||
523 | ||
465 | 524 |
def test_api_list_formdata(pub, local_user): |
466 | 525 |
pub.role_class.wipe() |
467 | 526 |
role = pub.role_class(name='test') |
468 | 527 |
role.store() |
469 | 528 | |
470 | 529 |
FormDef.wipe() |
471 | 530 |
formdef = FormDef() |
472 | 531 |
formdef.name = 'test' |
wcs/formdata.py | ||
---|---|---|
175 | 175 |
data['who'] = user.get_json_export_dict() |
176 | 176 |
elif not anonymise and user: |
177 | 177 |
data['who'] = user.get_json_export_dict() |
178 | 178 |
if self.comment and not anonymise: |
179 | 179 |
data['comment'] = self.comment |
180 | 180 |
parts = [] |
181 | 181 |
for part in self.parts or []: |
182 | 182 |
if hasattr(part, 'get_json_export_dict'): |
183 |
parts.append(part.get_json_export_dict(anonymise=anonymise)) |
|
183 |
d = part.get_json_export_dict(include_files=include_files, anonymise=anonymise) |
|
184 |
if d: |
|
185 |
parts.append(d) |
|
184 | 186 |
if parts: |
185 | 187 |
data['parts'] = parts |
186 | 188 |
return data |
187 | 189 | |
188 | 190 |
# don't pickle _formata cache |
189 | 191 |
def __getstate__(self): |
190 | 192 |
odict = self.__dict__.copy() |
191 | 193 |
if '_formdata' in odict: |
... | ... | |
1209 | 1211 |
'backoffice': self.backoffice_submission, |
1210 | 1212 |
'channel': self.submission_channel or 'web', |
1211 | 1213 |
} |
1212 | 1214 | |
1213 | 1215 |
if self.evolution: |
1214 | 1216 |
evolution = data['evolution'] = [] |
1215 | 1217 |
for evo in self.evolution: |
1216 | 1218 |
evolution.append( |
1217 |
evo.get_json_export_dict(anonymise=anonymise, user=None if anonymise else user) |
|
1219 |
evo.get_json_export_dict( |
|
1220 |
include_files=include_files, |
|
1221 |
anonymise=anonymise, |
|
1222 |
user=None if anonymise else user, |
|
1223 |
) |
|
1218 | 1224 |
) |
1219 | 1225 | |
1220 | 1226 |
if self.geolocations: |
1221 | 1227 |
data['geolocations'] = {} |
1222 | 1228 |
for k, v in self.geolocations.items(): |
1223 | 1229 |
data['geolocations'][k] = v.copy() |
1224 | 1230 | |
1225 | 1231 |
return data |
wcs/forms/common.py | ||
---|---|---|
175 | 175 |
mine = True |
176 | 176 | |
177 | 177 |
self.check_receiver() |
178 | 178 |
return mine |
179 | 179 | |
180 | 180 |
def json(self): |
181 | 181 |
self.check_auth(api_call=True) |
182 | 182 |
anonymise = get_request().has_anonymised_data_api_restriction() |
183 |
return self.export_to_json(anonymise=anonymise) |
|
183 |
if get_request().form.get('full') == 'on': |
|
184 |
include_files = False |
|
185 |
else: |
|
186 |
include_files = True |
|
187 |
return self.export_to_json(include_files=include_files, anonymise=anonymise) |
|
184 | 188 | |
185 | 189 |
def workflow_messages(self, position='top'): |
186 | 190 |
if self.formdef.workflow: |
187 | 191 |
workflow_messages = self.filled.get_workflow_messages(position=position) |
188 | 192 |
if workflow_messages: |
189 | 193 |
r = TemplateIO(html=True) |
190 | 194 |
if position == 'top': |
191 | 195 |
r += htmltext('<div id="receipt-intro" class="workflow-messages %s">' % position) |
... | ... | |
364 | 368 |
response = get_response() |
365 | 369 |
response.set_status(303) |
366 | 370 |
response.headers[str('location')] = url |
367 | 371 |
response.content_type = 'text/plain' |
368 | 372 |
return "Your browser should redirect you" |
369 | 373 | |
370 | 374 |
def export_to_json(self, include_id=False, include_files=True, indent=None, anonymise=True): |
371 | 375 |
get_response().set_content_type('application/json') |
372 |
return self.filled.export_to_json(anonymise=anonymise) |
|
376 |
return self.filled.export_to_json(include_files=include_files, anonymise=anonymise)
|
|
373 | 377 | |
374 | 378 |
def history(self): |
375 | 379 |
if not self.filled.evolution: |
376 | 380 |
return |
377 | 381 |
if not self.formdef.is_user_allowed_read_status_and_history(get_request().user, self.filled): |
378 | 382 |
return |
379 | 383 | |
380 | 384 |
include_authors_in_form_history = ( |
wcs/workflows.py | ||
---|---|---|
265 | 265 |
if show_link: |
266 | 266 |
return htmltext( |
267 | 267 |
'<p class="wf-attachment"><a href="attachment?f=%s">%s</a></p>' |
268 | 268 |
% (os.path.basename(self.filename), self.orig_filename) |
269 | 269 |
) |
270 | 270 |
else: |
271 | 271 |
return htmltext('<p class="wf-attachment">%s</p>' % self.orig_filename) |
272 | 272 | |
273 |
def get_json_export_dict(self, include_id=False, include_files=True, anonymise=False, user=None): |
|
274 |
if not include_files: |
|
275 |
return None |
|
276 |
d = { |
|
277 |
'type': 'workflow-attachment', |
|
278 |
'content_type': self.content_type, |
|
279 |
} |
|
280 |
if not anonymise: |
|
281 |
d['filename'] = self.base_filename |
|
282 |
d['content'] = base64.encodebytes(open(self.filename, 'rb').read()) |
|
283 |
return d |
|
284 | ||
273 | 285 |
@classmethod |
274 | 286 |
def get_substitution_variables(cls, formdata): |
275 | 287 |
return { |
276 | 288 |
'attachments': AttachmentsSubstitutionProxy(formdata), |
277 | 289 |
'form_attachments': AttachmentsSubstitutionProxy(formdata), |
278 | 290 |
} |
279 | 291 | |
280 | 292 |
# mimic PicklableUpload methods: |
281 |
- |