20 |
20 |
from wcs.wf.form import FormWorkflowStatusItem, WorkflowFormEvolutionPart, WorkflowFormFieldsFormDef
|
21 |
21 |
from wcs.workflows import (
|
22 |
22 |
AttachmentEvolutionPart,
|
|
23 |
ContentSnapshotPart,
|
23 |
24 |
Workflow,
|
24 |
25 |
WorkflowBackofficeFieldsFormDef,
|
25 |
26 |
WorkflowVariablesFieldsFormDef,
|
... | ... | |
340 |
341 |
pub.site_options.set('options', 'unused-files-behaviour', behaviour)
|
341 |
342 |
|
342 |
343 |
formdata = formdef.data_class()()
|
343 |
|
formdata.just_created()
|
344 |
344 |
formdata.data = {
|
345 |
345 |
'5': PicklableUpload('test.txt', 'text/plain'),
|
346 |
346 |
}
|
347 |
347 |
formdata.data['5'].receive([b'hello world'])
|
|
348 |
formdata.just_created()
|
348 |
349 |
formdata.store()
|
|
350 |
assert isinstance(formdata.evolution[0].parts[0], ContentSnapshotPart)
|
|
351 |
assert formdata.evolution[0].parts[0].new_data['5'].qfilename == formdata.data['5'].qfilename
|
349 |
352 |
|
350 |
353 |
assert formdata.data['5'].qfilename in os.listdir(os.path.join(pub.app_dir, 'uploads'))
|
351 |
354 |
clean_unused_files(pub)
|
... | ... | |
356 |
359 |
|
357 |
360 |
for _ in range(5):
|
358 |
361 |
formdata = formdef.data_class()()
|
359 |
|
formdata.just_created()
|
360 |
362 |
formdata.data = {
|
361 |
363 |
'5': PicklableUpload('test.txt', 'text/plain'),
|
362 |
364 |
}
|
363 |
365 |
formdata.data['5'].receive([b'hello world'])
|
|
366 |
formdata.just_created()
|
364 |
367 |
formdata.store()
|
|
368 |
assert isinstance(formdata.evolution[0].parts[0], ContentSnapshotPart)
|
|
369 |
assert formdata.evolution[0].parts[0].new_data['5'].qfilename == formdata.data['5'].qfilename
|
365 |
370 |
|
366 |
371 |
# same file, deduplicated
|
367 |
372 |
assert os.listdir(os.path.join(pub.app_dir, 'uploads')) == [formdata.data['5'].qfilename]
|
... | ... | |
374 |
379 |
clean_unused_files(pub)
|
375 |
380 |
assert os.listdir(os.path.join(pub.app_dir, 'uploads')) == []
|
376 |
381 |
|
|
382 |
# file referenced in formdata history, but not in formdata's data
|
|
383 |
formdata = formdef.data_class()()
|
|
384 |
formdata.data = {
|
|
385 |
'5': PicklableUpload('test.txt', 'text/plain'),
|
|
386 |
}
|
|
387 |
formdata.data['5'].receive([b'hello world'])
|
|
388 |
formdata.just_created()
|
|
389 |
formdata.store()
|
|
390 |
assert isinstance(formdata.evolution[0].parts[0], ContentSnapshotPart)
|
|
391 |
assert formdata.evolution[0].parts[0].new_data['5'].qfilename == formdata.data['5'].qfilename
|
|
392 |
qfilename = formdata.data['5'].qfilename
|
|
393 |
formdata.data['5'] = None
|
|
394 |
formdata.store()
|
|
395 |
assert formdata.evolution[0].parts[0].new_data['5'].qfilename == qfilename
|
|
396 |
assert os.listdir(os.path.join(pub.app_dir, 'uploads')) == [qfilename]
|
|
397 |
clean_unused_files(pub)
|
|
398 |
assert len(os.listdir(os.path.join(pub.app_dir, 'uploads'))) == 1
|
|
399 |
formdata.anonymise()
|
|
400 |
assert len(formdata.evolution) == 1
|
|
401 |
assert formdata.evolution[0].parts is None
|
|
402 |
clean_unused_files(pub)
|
|
403 |
assert os.listdir(os.path.join(pub.app_dir, 'uploads')) == []
|
|
404 |
|
377 |
405 |
# file referenced in formdef option
|
378 |
406 |
workflow = Workflow(name='variables')
|
379 |
407 |
|
... | ... | |
387 |
415 |
formdef.store()
|
388 |
416 |
|
389 |
417 |
formdata = formdef.data_class()()
|
390 |
|
formdata.just_created()
|
391 |
418 |
formdata.data = {
|
392 |
419 |
'5': PicklableUpload('test.txt', 'text/plain'),
|
393 |
420 |
}
|
394 |
421 |
formdata.data['5'].receive([b'hello world'])
|
|
422 |
formdata.just_created()
|
395 |
423 |
formdata.store()
|
396 |
424 |
|
397 |
425 |
assert len(os.listdir(os.path.join(pub.app_dir, 'uploads'))) == 1
|
... | ... | |
407 |
435 |
|
408 |
436 |
# file in block field
|
409 |
437 |
formdata = formdef.data_class()()
|
410 |
|
formdata.just_created()
|
411 |
438 |
formdata.data = {
|
412 |
439 |
'6': {
|
413 |
440 |
'data': [
|
... | ... | |
420 |
447 |
formdata.data['6']['data'][0]['234'].receive([b'hello world'])
|
421 |
448 |
formdata.data['6']['data'][1]['234'].receive([b'hello world block'])
|
422 |
449 |
formdata.workflow_data = {'wscall': {'data': ['not', 'a', 'block'], 'err': 0}}
|
|
450 |
formdata.just_created()
|
423 |
451 |
formdata.store()
|
424 |
452 |
assert len(os.listdir(os.path.join(pub.app_dir, 'uploads'))) == 2
|
425 |
453 |
clean_unused_files(pub)
|
... | ... | |
430 |
458 |
|
431 |
459 |
# non local storage: nothing happens
|
432 |
460 |
formdata = formdef.data_class()()
|
433 |
|
formdata.just_created()
|
434 |
461 |
formdata.data = {
|
435 |
462 |
'5': PicklableUpload('test.txt', 'text/plain'),
|
436 |
463 |
}
|
437 |
464 |
formdata.data['5'].receive([b'hello world'])
|
438 |
465 |
formdata.data['5'].storage = 'remote'
|
439 |
466 |
formdata.data['5'].storage_attrs = {'redirect_url': 'https://crypto.example.net/1234'}
|
|
467 |
formdata.just_created()
|
440 |
468 |
formdata.store()
|
441 |
469 |
assert len(os.listdir(os.path.join(pub.app_dir, 'uploads'))) == 0
|
442 |
470 |
clean_unused_files(pub)
|
... | ... | |
444 |
472 |
|
445 |
473 |
# workflow attachment
|
446 |
474 |
formdata = formdef.data_class()()
|
447 |
|
formdata.just_created()
|
448 |
475 |
formdata.data = {}
|
|
476 |
formdata.just_created()
|
449 |
477 |
formdata.store()
|
450 |
478 |
|
451 |
479 |
formdata.evolution[-1].parts = [
|
... | ... | |
480 |
508 |
|
481 |
509 |
# file from workflow form
|
482 |
510 |
formdata = formdef.data_class()()
|
483 |
|
formdata.just_created()
|
484 |
511 |
formdata.data = {}
|
|
512 |
formdata.just_created()
|
485 |
513 |
formdata.store()
|
486 |
514 |
|
487 |
515 |
display_form = FormWorkflowStatusItem()
|
... | ... | |
515 |
543 |
# unknown unused-files-behaviour: do nothing
|
516 |
544 |
pub.site_options.set('options', 'unused-files-behaviour', 'foo')
|
517 |
545 |
formdata = formdef.data_class()()
|
518 |
|
formdata.just_created()
|
519 |
546 |
formdata.data = {
|
520 |
547 |
'5': PicklableUpload('test-no-remove.txt', 'text/plain'),
|
521 |
548 |
}
|
522 |
549 |
formdata.data['5'].receive([b'hello world'])
|
|
550 |
formdata.just_created()
|
523 |
551 |
formdata.store()
|
524 |
552 |
|
525 |
553 |
assert formdata.data['5'].qfilename in os.listdir(os.path.join(pub.app_dir, 'uploads'))
|