20 |
20 |
from wcs.wf.form import FormWorkflowStatusItem, WorkflowFormEvolutionPart, WorkflowFormFieldsFormDef
|
21 |
21 |
from wcs.workflows import (
|
22 |
22 |
AttachmentEvolutionPart,
|
|
23 |
ContentSnapshotPart,
|
23 |
24 |
Workflow,
|
24 |
25 |
WorkflowBackofficeFieldsFormDef,
|
25 |
26 |
WorkflowVariablesFieldsFormDef,
|
... | ... | |
340 |
341 |
pub.site_options.set('options', 'unused-files-behaviour', behaviour)
|
341 |
342 |
|
342 |
343 |
formdata = formdef.data_class()()
|
343 |
|
formdata.just_created()
|
344 |
344 |
formdata.data = {
|
345 |
345 |
'5': PicklableUpload('test.txt', 'text/plain'),
|
346 |
346 |
}
|
347 |
347 |
formdata.data['5'].receive([b'hello world'])
|
|
348 |
formdata.just_created()
|
348 |
349 |
formdata.store()
|
|
350 |
assert isinstance(formdata.evolution[0].parts[0], ContentSnapshotPart)
|
|
351 |
assert formdata.evolution[0].parts[0].new_data['5'].qfilename == formdata.data['5'].qfilename
|
349 |
352 |
|
350 |
353 |
assert formdata.data['5'].qfilename in os.listdir(os.path.join(pub.app_dir, 'uploads'))
|
351 |
354 |
clean_unused_files(pub)
|
... | ... | |
356 |
359 |
|
357 |
360 |
for _ in range(5):
|
358 |
361 |
formdata = formdef.data_class()()
|
359 |
|
formdata.just_created()
|
360 |
362 |
formdata.data = {
|
361 |
363 |
'5': PicklableUpload('test.txt', 'text/plain'),
|
362 |
364 |
}
|
363 |
365 |
formdata.data['5'].receive([b'hello world'])
|
|
366 |
formdata.just_created()
|
364 |
367 |
formdata.store()
|
|
368 |
assert isinstance(formdata.evolution[0].parts[0], ContentSnapshotPart)
|
|
369 |
assert formdata.evolution[0].parts[0].new_data['5'].qfilename == formdata.data['5'].qfilename
|
365 |
370 |
|
366 |
371 |
# same file, deduplicated
|
367 |
372 |
assert os.listdir(os.path.join(pub.app_dir, 'uploads')) == [formdata.data['5'].qfilename]
|
... | ... | |
374 |
379 |
clean_unused_files(pub)
|
375 |
380 |
assert os.listdir(os.path.join(pub.app_dir, 'uploads')) == []
|
376 |
381 |
|
|
382 |
# file referenced in formdata history, but not in formdata's data
|
|
383 |
formdata = formdef.data_class()()
|
|
384 |
formdata.data = {
|
|
385 |
'5': PicklableUpload('test.txt', 'text/plain'),
|
|
386 |
}
|
|
387 |
formdata.data['5'].receive([b'hello world'])
|
|
388 |
formdata.just_created()
|
|
389 |
formdata.store()
|
|
390 |
assert isinstance(formdata.evolution[0].parts[0], ContentSnapshotPart)
|
|
391 |
assert formdata.evolution[0].parts[0].new_data['5'].qfilename == formdata.data['5'].qfilename
|
|
392 |
qfilename = formdata.data['5'].qfilename
|
|
393 |
formdata.data['5'] = None
|
|
394 |
formdata.store()
|
|
395 |
assert formdata.evolution[0].parts[0].new_data['5'].qfilename == qfilename
|
|
396 |
assert os.listdir(os.path.join(pub.app_dir, 'uploads')) == [qfilename]
|
|
397 |
clean_unused_files(pub)
|
|
398 |
assert len(os.listdir(os.path.join(pub.app_dir, 'uploads'))) == 1
|
|
399 |
formdata.anonymise()
|
|
400 |
assert len(formdata.evolution) == 1
|
|
401 |
assert formdata.evolution[0].parts is None
|
|
402 |
clean_unused_files(pub)
|
|
403 |
assert os.listdir(os.path.join(pub.app_dir, 'uploads')) == []
|
|
404 |
|
377 |
405 |
# file referenced in formdef option
|
378 |
406 |
workflow = Workflow(name='variables')
|
379 |
407 |
|
... | ... | |
387 |
415 |
formdef.store()
|
388 |
416 |
|
389 |
417 |
formdata = formdef.data_class()()
|
390 |
|
formdata.just_created()
|
391 |
418 |
formdata.data = {
|
392 |
419 |
'5': PicklableUpload('test.txt', 'text/plain'),
|
393 |
420 |
}
|
394 |
421 |
formdata.data['5'].receive([b'hello world'])
|
|
422 |
formdata.just_created()
|
395 |
423 |
formdata.store()
|
396 |
424 |
|
397 |
425 |
assert len(os.listdir(os.path.join(pub.app_dir, 'uploads'))) == 1
|
... | ... | |
407 |
435 |
|
408 |
436 |
# file in block field
|
409 |
437 |
formdata = formdef.data_class()()
|
410 |
|
formdata.just_created()
|
411 |
438 |
formdata.data = {
|
412 |
439 |
'6': {
|
413 |
440 |
'data': [
|
... | ... | |
420 |
447 |
formdata.data['6']['data'][0]['234'].receive([b'hello world'])
|
421 |
448 |
formdata.data['6']['data'][1]['234'].receive([b'hello world block'])
|
422 |
449 |
formdata.workflow_data = {'wscall': {'data': ['not', 'a', 'block'], 'err': 0}}
|
|
450 |
formdata.just_created()
|
423 |
451 |
formdata.store()
|
|
452 |
assert isinstance(formdata.evolution[0].parts[0], ContentSnapshotPart)
|
|
453 |
assert (
|
|
454 |
formdata.evolution[0].parts[0].new_data['6']['data'][0]['234'].qfilename
|
|
455 |
== formdata.data['6']['data'][0]['234'].qfilename
|
|
456 |
)
|
|
457 |
assert (
|
|
458 |
formdata.evolution[0].parts[0].new_data['6']['data'][1]['234'].qfilename
|
|
459 |
== formdata.data['6']['data'][1]['234'].qfilename
|
|
460 |
)
|
424 |
461 |
assert len(os.listdir(os.path.join(pub.app_dir, 'uploads'))) == 2
|
425 |
462 |
clean_unused_files(pub)
|
426 |
463 |
assert len(os.listdir(os.path.join(pub.app_dir, 'uploads'))) == 2
|
... | ... | |
428 |
465 |
clean_unused_files(pub)
|
429 |
466 |
assert len(os.listdir(os.path.join(pub.app_dir, 'uploads'))) == 0
|
430 |
467 |
|
431 |
|
# non local storage: nothing happens
|
|
468 |
# block field: file referenced in formdata history, but not in formdata's data
|
432 |
469 |
formdata = formdef.data_class()()
|
|
470 |
formdata.data = {
|
|
471 |
'6': {
|
|
472 |
'data': [
|
|
473 |
{'234': PicklableUpload('test.txt', 'text/plain')},
|
|
474 |
{'234': PicklableUpload('test2.txt', 'text/plain')},
|
|
475 |
],
|
|
476 |
'schema': {'234': 'file'},
|
|
477 |
},
|
|
478 |
}
|
|
479 |
formdata.data['6']['data'][0]['234'].receive([b'hello world'])
|
|
480 |
formdata.data['6']['data'][1]['234'].receive([b'hello world block'])
|
433 |
481 |
formdata.just_created()
|
|
482 |
formdata.store()
|
|
483 |
assert isinstance(formdata.evolution[0].parts[0], ContentSnapshotPart)
|
|
484 |
assert (
|
|
485 |
formdata.evolution[0].parts[0].new_data['6']['data'][0]['234'].qfilename
|
|
486 |
== formdata.data['6']['data'][0]['234'].qfilename
|
|
487 |
)
|
|
488 |
assert (
|
|
489 |
formdata.evolution[0].parts[0].new_data['6']['data'][1]['234'].qfilename
|
|
490 |
== formdata.data['6']['data'][1]['234'].qfilename
|
|
491 |
)
|
|
492 |
qfilename0 = formdata.data['6']['data'][0]['234'].qfilename
|
|
493 |
qfilename1 = formdata.data['6']['data'][1]['234'].qfilename
|
|
494 |
formdata.data['6'] = {}
|
|
495 |
formdata.store()
|
|
496 |
assert formdata.evolution[0].parts[0].new_data['6']['data'][0]['234'].qfilename == qfilename0
|
|
497 |
assert formdata.evolution[0].parts[0].new_data['6']['data'][1]['234'].qfilename == qfilename1
|
|
498 |
assert os.listdir(os.path.join(pub.app_dir, 'uploads')) == [qfilename0, qfilename1]
|
|
499 |
clean_unused_files(pub)
|
|
500 |
assert len(os.listdir(os.path.join(pub.app_dir, 'uploads'))) == 2
|
|
501 |
assert os.listdir(os.path.join(pub.app_dir, 'uploads')) == [qfilename0, qfilename1]
|
|
502 |
formdata.anonymise()
|
|
503 |
assert len(formdata.evolution) == 1
|
|
504 |
assert formdata.evolution[0].parts is None
|
|
505 |
clean_unused_files(pub)
|
|
506 |
assert len(os.listdir(os.path.join(pub.app_dir, 'uploads'))) == 0
|
|
507 |
|
|
508 |
# non local storage: nothing happens
|
|
509 |
formdata = formdef.data_class()()
|
434 |
510 |
formdata.data = {
|
435 |
511 |
'5': PicklableUpload('test.txt', 'text/plain'),
|
436 |
512 |
}
|
437 |
513 |
formdata.data['5'].receive([b'hello world'])
|
438 |
514 |
formdata.data['5'].storage = 'remote'
|
439 |
515 |
formdata.data['5'].storage_attrs = {'redirect_url': 'https://crypto.example.net/1234'}
|
|
516 |
formdata.just_created()
|
440 |
517 |
formdata.store()
|
441 |
518 |
assert len(os.listdir(os.path.join(pub.app_dir, 'uploads'))) == 0
|
442 |
519 |
clean_unused_files(pub)
|
... | ... | |
444 |
521 |
|
445 |
522 |
# workflow attachment
|
446 |
523 |
formdata = formdef.data_class()()
|
447 |
|
formdata.just_created()
|
448 |
524 |
formdata.data = {}
|
|
525 |
formdata.just_created()
|
449 |
526 |
formdata.store()
|
450 |
527 |
|
451 |
528 |
formdata.evolution[-1].parts = [
|
... | ... | |
480 |
557 |
|
481 |
558 |
# file from workflow form
|
482 |
559 |
formdata = formdef.data_class()()
|
483 |
|
formdata.just_created()
|
484 |
560 |
formdata.data = {}
|
|
561 |
formdata.just_created()
|
485 |
562 |
formdata.store()
|
486 |
563 |
|
487 |
564 |
display_form = FormWorkflowStatusItem()
|
... | ... | |
515 |
592 |
# unknown unused-files-behaviour: do nothing
|
516 |
593 |
pub.site_options.set('options', 'unused-files-behaviour', 'foo')
|
517 |
594 |
formdata = formdef.data_class()()
|
518 |
|
formdata.just_created()
|
519 |
595 |
formdata.data = {
|
520 |
596 |
'5': PicklableUpload('test-no-remove.txt', 'text/plain'),
|
521 |
597 |
}
|
522 |
598 |
formdata.data['5'].receive([b'hello world'])
|
|
599 |
formdata.just_created()
|
523 |
600 |
formdata.store()
|
524 |
601 |
|
525 |
602 |
assert formdata.data['5'].qfilename in os.listdir(os.path.join(pub.app_dir, 'uploads'))
|