Projet

Général

Profil

0003-formdata-look-at-files-in-history-when-deleting-unus.patch

Lauréline Guérin, 20 octobre 2022 11:01

Télécharger (11,8 ko)

Voir les différences:

Subject: [PATCH 3/3] formdata: look at files in history when deleting unused
 files (#62800)

 tests/test_formdef.py | 93 +++++++++++++++++++++++++++++++++++++++----
 wcs/formdata.py       | 12 +++++-
 wcs/formdef.py        |  2 +-
 3 files changed, 96 insertions(+), 11 deletions(-)
tests/test_formdef.py
20 20
from wcs.wf.form import FormWorkflowStatusItem, WorkflowFormEvolutionPart, WorkflowFormFieldsFormDef
21 21
from wcs.workflows import (
22 22
    AttachmentEvolutionPart,
23
    ContentSnapshotPart,
23 24
    Workflow,
24 25
    WorkflowBackofficeFieldsFormDef,
25 26
    WorkflowVariablesFieldsFormDef,
......
340 341
            pub.site_options.set('options', 'unused-files-behaviour', behaviour)
341 342

  
342 343
        formdata = formdef.data_class()()
343
        formdata.just_created()
344 344
        formdata.data = {
345 345
            '5': PicklableUpload('test.txt', 'text/plain'),
346 346
        }
347 347
        formdata.data['5'].receive([b'hello world'])
348
        formdata.just_created()
348 349
        formdata.store()
350
        assert isinstance(formdata.evolution[0].parts[0], ContentSnapshotPart)
351
        assert formdata.evolution[0].parts[0].new_data['5'].qfilename == formdata.data['5'].qfilename
349 352

  
350 353
        assert formdata.data['5'].qfilename in os.listdir(os.path.join(pub.app_dir, 'uploads'))
351 354
        clean_unused_files(pub)
......
356 359

  
357 360
        for _ in range(5):
358 361
            formdata = formdef.data_class()()
359
            formdata.just_created()
360 362
            formdata.data = {
361 363
                '5': PicklableUpload('test.txt', 'text/plain'),
362 364
            }
363 365
            formdata.data['5'].receive([b'hello world'])
366
            formdata.just_created()
364 367
            formdata.store()
368
            assert isinstance(formdata.evolution[0].parts[0], ContentSnapshotPart)
369
            assert formdata.evolution[0].parts[0].new_data['5'].qfilename == formdata.data['5'].qfilename
365 370

  
366 371
        # same file, deduplicated
367 372
        assert os.listdir(os.path.join(pub.app_dir, 'uploads')) == [formdata.data['5'].qfilename]
......
374 379
        clean_unused_files(pub)
375 380
        assert os.listdir(os.path.join(pub.app_dir, 'uploads')) == []
376 381

  
382
        # file referenced in formdata history, but not in formdata's data
383
        formdata = formdef.data_class()()
384
        formdata.data = {
385
            '5': PicklableUpload('test.txt', 'text/plain'),
386
        }
387
        formdata.data['5'].receive([b'hello world'])
388
        formdata.just_created()
389
        formdata.store()
390
        assert isinstance(formdata.evolution[0].parts[0], ContentSnapshotPart)
391
        assert formdata.evolution[0].parts[0].new_data['5'].qfilename == formdata.data['5'].qfilename
392
        qfilename = formdata.data['5'].qfilename
393
        formdata.data['5'] = None
394
        formdata.store()
395
        assert formdata.evolution[0].parts[0].new_data['5'].qfilename == qfilename
396
        assert os.listdir(os.path.join(pub.app_dir, 'uploads')) == [qfilename]
397
        clean_unused_files(pub)
398
        assert len(os.listdir(os.path.join(pub.app_dir, 'uploads'))) == 1
399
        formdata.anonymise()
400
        assert len(formdata.evolution) == 1
401
        assert formdata.evolution[0].parts is None
402
        clean_unused_files(pub)
403
        assert os.listdir(os.path.join(pub.app_dir, 'uploads')) == []
404

  
377 405
        # file referenced in formdef option
378 406
        workflow = Workflow(name='variables')
379 407

  
......
387 415
        formdef.store()
388 416

  
389 417
        formdata = formdef.data_class()()
390
        formdata.just_created()
391 418
        formdata.data = {
392 419
            '5': PicklableUpload('test.txt', 'text/plain'),
393 420
        }
394 421
        formdata.data['5'].receive([b'hello world'])
422
        formdata.just_created()
395 423
        formdata.store()
396 424

  
397 425
        assert len(os.listdir(os.path.join(pub.app_dir, 'uploads'))) == 1
......
407 435

  
408 436
        # file in block field
409 437
        formdata = formdef.data_class()()
410
        formdata.just_created()
411 438
        formdata.data = {
412 439
            '6': {
413 440
                'data': [
......
420 447
        formdata.data['6']['data'][0]['234'].receive([b'hello world'])
421 448
        formdata.data['6']['data'][1]['234'].receive([b'hello world block'])
422 449
        formdata.workflow_data = {'wscall': {'data': ['not', 'a', 'block'], 'err': 0}}
450
        formdata.just_created()
423 451
        formdata.store()
452
        assert isinstance(formdata.evolution[0].parts[0], ContentSnapshotPart)
453
        assert (
454
            formdata.evolution[0].parts[0].new_data['6']['data'][0]['234'].qfilename
455
            == formdata.data['6']['data'][0]['234'].qfilename
456
        )
457
        assert (
458
            formdata.evolution[0].parts[0].new_data['6']['data'][1]['234'].qfilename
459
            == formdata.data['6']['data'][1]['234'].qfilename
460
        )
424 461
        assert len(os.listdir(os.path.join(pub.app_dir, 'uploads'))) == 2
425 462
        clean_unused_files(pub)
426 463
        assert len(os.listdir(os.path.join(pub.app_dir, 'uploads'))) == 2
......
428 465
        clean_unused_files(pub)
429 466
        assert len(os.listdir(os.path.join(pub.app_dir, 'uploads'))) == 0
430 467

  
431
        # non local storage: nothing happens
468
        # block field: file referenced in formdata history, but not in formdata's data
432 469
        formdata = formdef.data_class()()
470
        formdata.data = {
471
            '6': {
472
                'data': [
473
                    {'234': PicklableUpload('test.txt', 'text/plain')},
474
                    {'234': PicklableUpload('test2.txt', 'text/plain')},
475
                ],
476
                'schema': {'234': 'file'},
477
            },
478
        }
479
        formdata.data['6']['data'][0]['234'].receive([b'hello world'])
480
        formdata.data['6']['data'][1]['234'].receive([b'hello world block'])
433 481
        formdata.just_created()
482
        formdata.store()
483
        assert isinstance(formdata.evolution[0].parts[0], ContentSnapshotPart)
484
        assert (
485
            formdata.evolution[0].parts[0].new_data['6']['data'][0]['234'].qfilename
486
            == formdata.data['6']['data'][0]['234'].qfilename
487
        )
488
        assert (
489
            formdata.evolution[0].parts[0].new_data['6']['data'][1]['234'].qfilename
490
            == formdata.data['6']['data'][1]['234'].qfilename
491
        )
492
        qfilename0 = formdata.data['6']['data'][0]['234'].qfilename
493
        qfilename1 = formdata.data['6']['data'][1]['234'].qfilename
494
        formdata.data['6'] = {}
495
        formdata.store()
496
        assert formdata.evolution[0].parts[0].new_data['6']['data'][0]['234'].qfilename == qfilename0
497
        assert formdata.evolution[0].parts[0].new_data['6']['data'][1]['234'].qfilename == qfilename1
498
        assert os.listdir(os.path.join(pub.app_dir, 'uploads')) == [qfilename0, qfilename1]
499
        clean_unused_files(pub)
500
        assert len(os.listdir(os.path.join(pub.app_dir, 'uploads'))) == 2
501
        assert os.listdir(os.path.join(pub.app_dir, 'uploads')) == [qfilename0, qfilename1]
502
        formdata.anonymise()
503
        assert len(formdata.evolution) == 1
504
        assert formdata.evolution[0].parts is None
505
        clean_unused_files(pub)
506
        assert len(os.listdir(os.path.join(pub.app_dir, 'uploads'))) == 0
507

  
508
        # non local storage: nothing happens
509
        formdata = formdef.data_class()()
434 510
        formdata.data = {
435 511
            '5': PicklableUpload('test.txt', 'text/plain'),
436 512
        }
437 513
        formdata.data['5'].receive([b'hello world'])
438 514
        formdata.data['5'].storage = 'remote'
439 515
        formdata.data['5'].storage_attrs = {'redirect_url': 'https://crypto.example.net/1234'}
516
        formdata.just_created()
440 517
        formdata.store()
441 518
        assert len(os.listdir(os.path.join(pub.app_dir, 'uploads'))) == 0
442 519
        clean_unused_files(pub)
......
444 521

  
445 522
        # workflow attachment
446 523
        formdata = formdef.data_class()()
447
        formdata.just_created()
448 524
        formdata.data = {}
525
        formdata.just_created()
449 526
        formdata.store()
450 527

  
451 528
        formdata.evolution[-1].parts = [
......
480 557

  
481 558
        # file from workflow form
482 559
        formdata = formdef.data_class()()
483
        formdata.just_created()
484 560
        formdata.data = {}
561
        formdata.just_created()
485 562
        formdata.store()
486 563

  
487 564
        display_form = FormWorkflowStatusItem()
......
515 592
    # unknown unused-files-behaviour: do nothing
516 593
    pub.site_options.set('options', 'unused-files-behaviour', 'foo')
517 594
    formdata = formdef.data_class()()
518
    formdata.just_created()
519 595
    formdata.data = {
520 596
        '5': PicklableUpload('test-no-remove.txt', 'text/plain'),
521 597
    }
522 598
    formdata.data['5'].receive([b'hello world'])
599
    formdata.just_created()
523 600
    formdata.store()
524 601

  
525 602
    assert formdata.data['5'].qfilename in os.listdir(os.path.join(pub.app_dir, 'uploads'))
wcs/formdata.py
414 414
            empty &= self.data.get(key) is None
415 415
        return empty
416 416

  
417
    def get_all_file_data(self):
417
    def get_all_file_data(self, with_history=False):
418 418
        from wcs.wf.form import WorkflowFormEvolutionPart
419
        from wcs.workflows import ContentSnapshotPart
419 420

  
420
        for field_data in itertools.chain((self.data or {}).values(), (self.workflow_data or {}).values()):
421
        def check_field_data(field_data):
421 422
            if misc.is_upload(field_data):
422 423
                yield field_data
423 424
            elif isinstance(field_data, dict) and isinstance(field_data.get('data'), list):
......
426 427
                        for block_field_data in subfield_rowdata.values():
427 428
                            if misc.is_upload(block_field_data):
428 429
                                yield block_field_data
430

  
431
        for field_data in itertools.chain((self.data or {}).values(), (self.workflow_data or {}).values()):
432
            yield from check_field_data(field_data)
429 433
        for part in self.iter_evolution_parts():
430 434
            if misc.is_attachment(part):
431 435
                yield part
......
433 437
                for field_data in (part.data or {}).values():
434 438
                    if misc.is_upload(field_data):
435 439
                        yield field_data
440
            elif isinstance(part, ContentSnapshotPart):
441
                # look into old and new values (belt and suspenders)
442
                for field_data in list((part.old_data or {}).values()) + list((part.new_data or {}).values()):
443
                    yield from check_field_data(field_data)
436 444

  
437 445
    @classmethod
438 446
    def get_actionable_count(cls, user_roles):
wcs/formdef.py
2143 2143
                if is_upload(option_data):
2144 2144
                    yield option_data.get_fs_filename()
2145 2145
            for formdata in formdef.data_class().select_iterator(ignore_errors=True, itersize=200):
2146
                for field_data in formdata.get_all_file_data():
2146
                for field_data in formdata.get_all_file_data(with_history=True):
2147 2147
                    if is_upload(field_data):
2148 2148
                        yield field_data.get_fs_filename()
2149 2149
                    elif is_attachment(field_data):
2150
-