Projet

Général

Profil

0001-api-get-formdata-from-history-70271.patch

Lauréline Guérin, 14 octobre 2022 22:35

Télécharger (12 ko)

Voir les différences:

Subject: [PATCH] api: get formdata from history (#70271)

 tests/test_content_snapshots.py | 144 ++++++++++++++++++++++++++++++++
 wcs/formdata.py                 |  36 +++++++-
 wcs/forms/common.py             |  10 ++-
 3 files changed, 183 insertions(+), 7 deletions(-)
tests/test_content_snapshots.py
1
import datetime
1 2
import json
2 3
import os
4
import time
3 5

  
4 6
import pytest
5 7
from webtest import Upload
......
7 9
from wcs import fields
8 10
from wcs.api_access import ApiAccess
9 11
from wcs.carddef import CardDef
12
from wcs.formdata import Evolution
10 13
from wcs.formdef import FormDef
11 14
from wcs.qommon.http_request import HTTPRequest
12 15
from wcs.qommon.ident.password_accounts import PasswordAccount
......
716 719
    assert carddata.evolution[0].parts[0].formdef_id == str(carddef.id)
717 720
    assert carddata.evolution[0].parts[0].old_data == {}
718 721
    assert carddata.evolution[0].parts[0].new_data == {'1': 'xxx'}
722

  
723

  
724
def test_api_formdata_at(pub, user, access, role):
725
    app = get_app(pub)
726
    app.set_authorization(('Basic', ('test', '12345')))
727

  
728
    FormDef.wipe()
729
    formdef = FormDef()
730
    formdef.name = 'test'
731
    formdef.fields = [
732
        fields.StringField(id='0', label='foobar', varname='foobar'),
733
    ]
734
    workflow = Workflow(name='foo')
735
    workflow.possible_status = Workflow.get_default_workflow().possible_status[:]
736
    workflow.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(workflow)
737
    workflow.backoffice_fields_formdef.fields = [
738
        fields.StringField(id='bo1', label='bo field 1', type='string', varname='plop'),
739
    ]
740
    workflow.store()
741
    formdef.workflow_id = workflow.id
742
    formdef.workflow_roles = {'_receiver': role.id}
743
    formdef.store()
744

  
745
    formdef.data_class().wipe()
746
    formdata = formdef.data_class()()
747
    formdata.data = {
748
        '0': 'foo',
749
        'bo1': 'bar',
750
    }
751
    formdata.just_created()
752
    formdata.evolution[0].time = datetime.datetime(2022, 1, 2, 3, 4).timetuple()
753
    formdata.store()
754

  
755
    def get_evo_and_parts(formdata):
756
        for evo in formdata.evolution:
757
            for part in evo.parts or []:
758
                if isinstance(part, ContentSnapshotPart):
759
                    yield time.strftime('%Y-%m-%d %H:%M', evo.time), part.old_data, part.new_data
760

  
761
    assert list(get_evo_and_parts(formdata)) == [('2022-01-02 03:04', {}, {'0': 'foo', 'bo1': 'bar'})]
762
    resp = app.get('/api/forms/test/%s/' % formdata.id)
763
    assert resp.json['fields'] == {'foobar': 'foo'}
764
    assert resp.json['workflow']['fields'] == {'plop': 'bar'}
765

  
766
    # wrong format
767
    resp = app.get('/api/forms/test/%s/?at=bad-format' % formdata.id, status=400)
768
    assert resp.json['err_desc'] == 'Invalid value "bad-format" for "at"'
769

  
770
    # before formdata creation
771
    resp = app.get('/api/forms/test/%s/?at=2022-01-02' % formdata.id, status=400)
772
    assert resp.json['err_desc'] == 'No data found for "2022-01-02" date'
773
    resp = app.get('/api/forms/test/%s/?at=2022-01-02 03:04' % formdata.id, status=400)
774
    assert resp.json['err_desc'] == 'No data found for "2022-01-02 03:04" date'
775

  
776
    # no ContentSnapshotPart (legacy formdata)
777
    formdata.evolution[0].parts = []
778
    formdata.store()
779
    assert list(get_evo_and_parts(formdata)) == []
780

  
781
    # add evolutions with ContentSnapshotPart
782
    evo = formdata.evolution[0]
783
    part = ContentSnapshotPart(formdata=formdata, old_data={})
784
    part.new_data = {'0': 'bar', 'bo1': 'foo'}
785
    evo.add_part(part)
786

  
787
    evo = Evolution()
788
    evo.time = datetime.datetime(2022, 1, 2, 3, 5).timetuple()
789
    evo.status = formdata.status
790
    part = ContentSnapshotPart(formdata=formdata, old_data={'0': 'bar', 'bo1': 'foo'})
791
    part.new_data = {'0': 'baz', 'bo1': 'foo'}
792
    evo.add_part(part)
793
    formdata.evolution.append(evo)
794

  
795
    evo = Evolution()
796
    evo.time = datetime.datetime(2022, 1, 4, 5, 6).timetuple()
797
    evo.status = formdata.status
798
    part = ContentSnapshotPart(formdata=formdata, old_data={'0': 'baz', 'bo1': 'foo'})
799
    part.new_data = {'0': 'foooo', '1': 'unknown', 'bo1': 'foo'}
800
    evo.add_part(part)
801
    formdata.evolution.append(evo)
802

  
803
    evo = Evolution()
804
    evo.time = datetime.datetime(2022, 1, 5, 6, 7).timetuple()
805
    evo.status = formdata.status
806
    part = ContentSnapshotPart(formdata=formdata, old_data={'0': 'foooo', '1': 'unknown', 'bo1': 'foo'})
807
    part.new_data = {'0': 'fooo', 'bo1': 'foo'}
808
    evo.add_part(part)
809
    part = ContentSnapshotPart(formdata=formdata, old_data={'0': 'fooo', 'bo1': 'foo'})
810
    part.new_data = {'0': 'foo', 'bo1': 'bar'}
811
    evo.add_part(part)
812
    formdata.evolution.append(evo)
813

  
814
    formdata._store_all_evolution = True
815
    formdata.store()
816
    assert list(get_evo_and_parts(formdata)) == [
817
        ('2022-01-02 03:04', {}, {'0': 'bar', 'bo1': 'foo'}),
818
        ('2022-01-02 03:05', {'0': 'bar', 'bo1': 'foo'}, {'0': 'baz', 'bo1': 'foo'}),
819
        ('2022-01-04 05:06', {'0': 'baz', 'bo1': 'foo'}, {'0': 'foooo', '1': 'unknown', 'bo1': 'foo'}),
820
        ('2022-01-05 06:07', {'0': 'foooo', '1': 'unknown', 'bo1': 'foo'}, {'0': 'fooo', 'bo1': 'foo'}),
821
        ('2022-01-05 06:07', {'0': 'fooo', 'bo1': 'foo'}, {'0': 'foo', 'bo1': 'bar'}),
822
    ]
823

  
824
    resp = app.get('/api/forms/test/%s/?at=2022-01-03' % formdata.id)
825
    assert resp.json['fields'] == {'foobar': 'baz'}
826
    assert resp.json['workflow']['fields'] == {'plop': 'foo'}
827

  
828
    resp = app.get('/api/forms/test/%s/?at=03/01/2022' % formdata.id)
829
    assert resp.json['fields'] == {'foobar': 'baz'}
830
    assert resp.json['workflow']['fields'] == {'plop': 'foo'}
831

  
832
    resp = app.get('/api/forms/test/%s/?at=2022-01-02 03:05' % formdata.id)
833
    assert resp.json['fields'] == {'foobar': 'bar'}
834
    assert resp.json['workflow']['fields'] == {'plop': 'foo'}
835

  
836
    resp = app.get('/api/forms/test/%s/?at=2022-01-02 03:06' % formdata.id)
837
    assert resp.json['fields'] == {'foobar': 'baz'}
838
    assert resp.json['workflow']['fields'] == {'plop': 'foo'}
839

  
840
    resp = app.get('/api/forms/test/%s/?at=2022-01-04' % formdata.id)
841
    assert resp.json['fields'] == {'foobar': 'baz'}
842
    assert resp.json['workflow']['fields'] == {'plop': 'foo'}
843

  
844
    resp = app.get('/api/forms/test/%s/?at=2022-01-05' % formdata.id)
845
    assert resp.json['fields'] == {'foobar': 'foooo'}
846
    assert resp.json['workflow']['fields'] == {'plop': 'foo'}
847

  
848
    resp = app.get('/api/forms/test/%s/?at=2022-01-05 06:07' % formdata.id)
849
    assert resp.json['fields'] == {'foobar': 'foooo'}
850
    assert resp.json['workflow']['fields'] == {'plop': 'foo'}
851

  
852
    resp = app.get('/api/forms/test/%s/?at=2022-01-05 06:08' % formdata.id)
853
    assert resp.json['fields'] == {'foobar': 'foo'}
854
    assert resp.json['workflow']['fields'] == {'plop': 'bar'}
855

  
856
    resp = app.get('/api/forms/test/%s/?at=2022-01-06' % formdata.id)
857
    assert resp.json['fields'] == {'foobar': 'foo'}
858
    assert resp.json['workflow']['fields'] == {'plop': 'bar'}
859

  
860
    resp = app.get('/api/forms/test/%s/?at=2022-01-07' % formdata.id)
861
    assert resp.json['fields'] == {'foobar': 'foo'}
862
    assert resp.json['workflow']['fields'] == {'plop': 'bar'}
wcs/formdata.py
24 24
import time
25 25

  
26 26
from quixote import get_publisher, get_request, get_session
27
from quixote.errors import RequestError
27 28
from quixote.html import htmltext
28 29
from quixote.http_request import Upload
29 30

  
......
1332 1333
                    store_dict[store_key + '_structured'] = data.get('%s_structured' % field.id)
1333 1334
        return new_data
1334 1335

  
1335
    def get_json_dict(self, fields, include_files=True, anonymise=False, include_unnamed_fields=False):
1336
    def get_json_dict(self, data, fields, include_files=True, anonymise=False, include_unnamed_fields=False):
1336 1337
        return self.get_json_data_dict(
1337
            self.data,
1338
            data,
1338 1339
            fields,
1339 1340
            formdata=self,
1340 1341
            include_files=include_files,
......
1353 1354
        include_evolution=True,
1354 1355
        include_roles=True,
1355 1356
        include_unnamed_fields=False,
1357
        values_at=None,
1356 1358
    ):
1359
        # noqa pylint: disable=too-many-arguments
1357 1360
        data = {}
1358 1361
        data['id'] = str(self.id)
1359 1362
        data['digests'] = self.digests
......
1379 1382

  
1380 1383
            data['user'] = user.get_json_export_dict(full=isinstance(self.formdef, CardDef))
1381 1384

  
1385
        _data = self.data
1386
        if values_at:
1387
            from wcs.workflows import ContentSnapshotPart
1388

  
1389
            try:
1390
                values_at_tuple = misc.get_as_datetime(values_at).timetuple()
1391
            except ValueError:
1392
                raise RequestError('Invalid value "%s" for "at"' % values_at)
1393
            matching_part = None
1394
            for evo in reversed(self.evolution or []):
1395
                for part in reversed(evo.parts or []):
1396
                    if isinstance(part, ContentSnapshotPart):
1397
                        if evo.time < values_at_tuple:
1398
                            matching_part = part
1399
                        break
1400
                if matching_part:
1401
                    break
1402
            if not matching_part:
1403
                raise RequestError('No data found for "%s" date' % values_at)
1404
            _data = matching_part.new_data
1405

  
1382 1406
        data['fields'] = self.get_json_dict(
1407
            _data,
1383 1408
            self.formdef.fields,
1384 1409
            include_files=include_files,
1385 1410
            anonymise=anonymise,
......
1398 1423
            data['workflow']['data'] = self.workflow_data
1399 1424
        if self.formdef.workflow.get_backoffice_fields():
1400 1425
            data['workflow']['fields'] = self.get_json_dict(
1426
                _data,
1401 1427
                self.formdef.workflow.get_backoffice_fields(),
1402 1428
                include_files=include_files,
1403 1429
                anonymise=anonymise,
......
1472 1498

  
1473 1499
        return data
1474 1500

  
1475
    def export_to_json(self, include_files=True, anonymise=False):
1476
        data = self.get_json_export_dict(include_files=include_files, anonymise=anonymise)
1501
    def export_to_json(self, include_files=True, anonymise=False, values_at=None):
1502
        data = self.get_json_export_dict(
1503
            include_files=include_files, anonymise=anonymise, values_at=values_at
1504
        )
1477 1505
        return json.dumps(data, cls=misc.JSONEncoder)
1478 1506

  
1479 1507
    def get_object_key(self):
wcs/forms/common.py
211 211
                raise errors.TraversalError(_('ID not available in filtered view'))
212 212

  
213 213
        return self.export_to_json(
214
            anonymise=anonymise, include_files=get_query_flag('include-files-content', default=True)
214
            anonymise=anonymise,
215
            include_files=get_query_flag('include-files-content', default=True),
216
            values_at=get_request().form.get('at'),
215 217
        )
216 218

  
217 219
    def tempfile(self):
......
421 423
            response.content_type = 'text/plain'
422 424
            return "Your browser should redirect you"
423 425

  
424
    def export_to_json(self, include_files=True, anonymise=False):
426
    def export_to_json(self, include_files=True, anonymise=False, values_at=None):
425 427
        get_response().set_content_type('application/json')
426
        return self.filled.export_to_json(include_files=include_files, anonymise=anonymise)
428
        return self.filled.export_to_json(
429
            include_files=include_files, anonymise=anonymise, values_at=values_at
430
        )
427 431

  
428 432
    def history(self):
429 433
        if not self.filled.evolution:
430
-