Projet

Général

Profil

0001-api-get-formdata-from-history-70271.patch

Lauréline Guérin, 18 octobre 2022 10:44

Télécharger (18,7 ko)

Voir les différences:

Subject: [PATCH] api: get formdata from history (#70271)

 tests/test_content_snapshots.py | 186 ++++++++++++++++++++++++++++++++
 wcs/backoffice/management.py    |  36 ++++---
 wcs/formdata.py                 |  36 ++++++-
 wcs/forms/common.py             |  18 +++-
 4 files changed, 257 insertions(+), 19 deletions(-)
tests/test_content_snapshots.py
1
import datetime
1 2
import json
2 3
import os
4
import time
3 5

  
4 6
import pytest
5 7
from webtest import Upload
......
7 9
from wcs import fields
8 10
from wcs.api_access import ApiAccess
9 11
from wcs.carddef import CardDef
12
from wcs.formdata import Evolution
10 13
from wcs.formdef import FormDef
11 14
from wcs.qommon.http_request import HTTPRequest
12 15
from wcs.qommon.ident.password_accounts import PasswordAccount
......
716 719
    assert carddata.evolution[0].parts[0].formdef_id == str(carddef.id)
717 720
    assert carddata.evolution[0].parts[0].old_data == {}
718 721
    assert carddata.evolution[0].parts[0].new_data == {'1': 'xxx'}
722

  
723

  
724
def test_api_formdata_at(pub, user, access, role):
725
    app = get_app(pub)
726
    app.set_authorization(('Basic', ('test', '12345')))
727

  
728
    FormDef.wipe()
729
    formdef = FormDef()
730
    formdef.name = 'test'
731
    formdef.fields = [
732
        fields.StringField(id='0', label='foobar', varname='foobar'),
733
    ]
734
    workflow = Workflow(name='foo')
735
    workflow.possible_status = Workflow.get_default_workflow().possible_status[:]
736
    workflow.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(workflow)
737
    workflow.backoffice_fields_formdef.fields = [
738
        fields.StringField(id='bo1', label='bo field 1', type='string', varname='plop'),
739
    ]
740
    workflow.store()
741
    formdef.workflow_id = workflow.id
742
    formdef.workflow_roles = {'_receiver': role.id}
743
    formdef.store()
744

  
745
    formdef.data_class().wipe()
746
    formdata = formdef.data_class()()
747
    formdata.data = {
748
        '0': 'foo',
749
        'bo1': 'bar',
750
    }
751
    formdata.just_created()
752
    formdata.evolution[0].time = datetime.datetime(2022, 1, 2, 3, 4).timetuple()
753
    formdata.store()
754

  
755
    def get_evo_and_parts(formdata):
756
        for evo in formdata.evolution:
757
            for part in evo.parts or []:
758
                if isinstance(part, ContentSnapshotPart):
759
                    yield time.strftime('%Y-%m-%d %H:%M', evo.time), part.old_data, part.new_data
760

  
761
    assert list(get_evo_and_parts(formdata)) == [('2022-01-02 03:04', {}, {'0': 'foo', 'bo1': 'bar'})]
762
    resp = app.get('/api/forms/test/%s/' % formdata.id)
763
    assert resp.json['fields'] == {'foobar': 'foo'}
764
    assert resp.json['workflow']['fields'] == {'plop': 'bar'}
765
    resp = app.get('/api/forms/test/list/?full=on')
766
    assert len(resp.json) == 1
767
    assert resp.json[0]['fields'] == {'foobar': 'foo'}
768
    assert resp.json[0]['workflow']['fields'] == {'plop': 'bar'}
769

  
770
    # wrong format
771
    resp = app.get('/api/forms/test/%s/?at=bad-format' % formdata.id, status=400)
772
    assert resp.json['err_desc'] == 'Invalid value "bad-format" for "at"'
773
    resp = app.get('/api/forms/test/list/?full=on&at=bad-format', status=400)
774
    assert resp.json['err_desc'] == 'Invalid value "bad-format" for "at"'
775

  
776
    # before formdata creation
777
    resp = app.get('/api/forms/test/%s/' % formdata.id, params={'at': '2022-01-02T00:00:00'}, status=400)
778
    assert resp.json['err_desc'] == 'No data found for this datetime'
779
    resp = app.get('/api/forms/test/list/', params={'full': 'on', 'at': '2022-01-02T00:00:00'})
780
    assert len(resp.json) == 0
781
    resp = app.get('/api/forms/test/%s/' % formdata.id, params={'at': '2022-01-02T03:04:00'}, status=400)
782
    assert resp.json['err_desc'] == 'No data found for this datetime'
783
    resp = app.get('/api/forms/test/list/', params={'full': 'on', 'at': '2022-01-02T03:04:00'})
784
    assert len(resp.json) == 0
785

  
786
    # no ContentSnapshotPart (legacy formdata)
787
    formdata.evolution[0].parts = []
788
    formdata.store()
789
    assert list(get_evo_and_parts(formdata)) == []
790

  
791
    # add evolutions with ContentSnapshotPart
792
    evo = formdata.evolution[0]
793
    part = ContentSnapshotPart(formdata=formdata, old_data={})
794
    part.new_data = {'0': 'bar', 'bo1': 'foo'}
795
    evo.add_part(part)
796

  
797
    evo = Evolution()
798
    evo.time = datetime.datetime(2022, 1, 2, 3, 5).timetuple()
799
    evo.status = formdata.status
800
    part = ContentSnapshotPart(formdata=formdata, old_data={'0': 'bar', 'bo1': 'foo'})
801
    part.new_data = {'0': 'baz', 'bo1': 'foo'}
802
    evo.add_part(part)
803
    formdata.evolution.append(evo)
804

  
805
    evo = Evolution()
806
    evo.time = datetime.datetime(2022, 1, 4, 5, 6).timetuple()
807
    evo.status = formdata.status
808
    part = ContentSnapshotPart(formdata=formdata, old_data={'0': 'baz', 'bo1': 'foo'})
809
    part.new_data = {'0': 'foooo', '1': 'unknown', 'bo1': 'foo'}
810
    evo.add_part(part)
811
    formdata.evolution.append(evo)
812

  
813
    evo = Evolution()
814
    evo.time = datetime.datetime(2022, 1, 5, 6, 7).timetuple()
815
    evo.status = formdata.status
816
    part = ContentSnapshotPart(formdata=formdata, old_data={'0': 'foooo', '1': 'unknown', 'bo1': 'foo'})
817
    part.new_data = {'0': 'fooo', 'bo1': 'foo'}
818
    evo.add_part(part)
819
    part = ContentSnapshotPart(formdata=formdata, old_data={'0': 'fooo', 'bo1': 'foo'})
820
    part.new_data = {'0': 'foo', 'bo1': 'bar'}
821
    evo.add_part(part)
822
    formdata.evolution.append(evo)
823

  
824
    formdata._store_all_evolution = True
825
    formdata.store()
826
    assert list(get_evo_and_parts(formdata)) == [
827
        ('2022-01-02 03:04', {}, {'0': 'bar', 'bo1': 'foo'}),
828
        ('2022-01-02 03:05', {'0': 'bar', 'bo1': 'foo'}, {'0': 'baz', 'bo1': 'foo'}),
829
        ('2022-01-04 05:06', {'0': 'baz', 'bo1': 'foo'}, {'0': 'foooo', '1': 'unknown', 'bo1': 'foo'}),
830
        ('2022-01-05 06:07', {'0': 'foooo', '1': 'unknown', 'bo1': 'foo'}, {'0': 'fooo', 'bo1': 'foo'}),
831
        ('2022-01-05 06:07', {'0': 'fooo', 'bo1': 'foo'}, {'0': 'foo', 'bo1': 'bar'}),
832
    ]
833

  
834
    resp = app.get('/api/forms/test/%s/' % formdata.id, params={'at': '2022-01-03T00:00:00'})
835
    assert resp.json['fields'] == {'foobar': 'baz'}
836
    assert resp.json['workflow']['fields'] == {'plop': 'foo'}
837
    resp = app.get('/api/forms/test/list/', params={'full': 'on', 'at': '2022-01-03T00:00:00'})
838
    assert len(resp.json) == 1
839
    assert resp.json[0]['fields'] == {'foobar': 'baz'}
840
    assert resp.json[0]['workflow']['fields'] == {'plop': 'foo'}
841

  
842
    resp = app.get('/api/forms/test/%s/' % formdata.id, params={'at': '2022-01-02T03:05:00'})
843
    assert resp.json['fields'] == {'foobar': 'bar'}
844
    assert resp.json['workflow']['fields'] == {'plop': 'foo'}
845
    resp = app.get('/api/forms/test/list/', params={'full': 'on', 'at': '2022-01-02T03:05:00'})
846
    assert len(resp.json) == 1
847
    assert resp.json[0]['fields'] == {'foobar': 'bar'}
848
    assert resp.json[0]['workflow']['fields'] == {'plop': 'foo'}
849

  
850
    resp = app.get('/api/forms/test/%s/' % formdata.id, params={'at': '2022-01-02T03:06:00'})
851
    assert resp.json['fields'] == {'foobar': 'baz'}
852
    assert resp.json['workflow']['fields'] == {'plop': 'foo'}
853
    resp = app.get('/api/forms/test/list/', params={'full': 'on', 'at': '2022-01-02T03:06:00'})
854
    assert len(resp.json) == 1
855
    assert resp.json[0]['fields'] == {'foobar': 'baz'}
856
    assert resp.json[0]['workflow']['fields'] == {'plop': 'foo'}
857

  
858
    resp = app.get('/api/forms/test/%s/' % formdata.id, params={'at': '2022-01-04T00:00:00'})
859
    assert resp.json['fields'] == {'foobar': 'baz'}
860
    assert resp.json['workflow']['fields'] == {'plop': 'foo'}
861
    resp = app.get('/api/forms/test/list/', params={'full': 'on', 'at': '2022-01-04T00:00:00'})
862
    assert len(resp.json) == 1
863
    assert resp.json[0]['fields'] == {'foobar': 'baz'}
864
    assert resp.json[0]['workflow']['fields'] == {'plop': 'foo'}
865

  
866
    resp = app.get('/api/forms/test/%s/' % formdata.id, params={'at': '2022-01-05T00:00:00'})
867
    assert resp.json['fields'] == {'foobar': 'foooo'}
868
    assert resp.json['workflow']['fields'] == {'plop': 'foo'}
869
    resp = app.get('/api/forms/test/list/', params={'full': 'on', 'at': '2022-01-05T00:00:00'})
870
    assert len(resp.json) == 1
871
    assert resp.json[0]['fields'] == {'foobar': 'foooo'}
872
    assert resp.json[0]['workflow']['fields'] == {'plop': 'foo'}
873

  
874
    resp = app.get('/api/forms/test/%s/' % formdata.id, params={'at': '2022-01-05T06:07:00'})
875
    assert resp.json['fields'] == {'foobar': 'foooo'}
876
    assert resp.json['workflow']['fields'] == {'plop': 'foo'}
877
    resp = app.get('/api/forms/test/list/', params={'full': 'on', 'at': '2022-01-05T06:07:00'})
878
    assert len(resp.json) == 1
879
    assert resp.json[0]['fields'] == {'foobar': 'foooo'}
880
    assert resp.json[0]['workflow']['fields'] == {'plop': 'foo'}
881

  
882
    resp = app.get('/api/forms/test/%s/' % formdata.id, params={'at': '2022-01-05T06:08:00'})
883
    assert resp.json['fields'] == {'foobar': 'foo'}
884
    assert resp.json['workflow']['fields'] == {'plop': 'bar'}
885
    resp = app.get('/api/forms/test/list/', params={'full': 'on', 'at': '2022-01-05T06:08:00'})
886
    assert len(resp.json) == 1
887
    assert resp.json[0]['fields'] == {'foobar': 'foo'}
888
    assert resp.json[0]['workflow']['fields'] == {'plop': 'bar'}
889

  
890
    resp = app.get('/api/forms/test/%s/' % formdata.id, params={'at': '2022-01-06T00:00:00'})
891
    assert resp.json['fields'] == {'foobar': 'foo'}
892
    assert resp.json['workflow']['fields'] == {'plop': 'bar'}
893
    resp = app.get('/api/forms/test/list/', params={'full': 'on', 'at': '2022-01-06T00:00:00'})
894
    assert len(resp.json) == 1
895
    assert resp.json[0]['fields'] == {'foobar': 'foo'}
896
    assert resp.json[0]['workflow']['fields'] == {'plop': 'bar'}
897

  
898
    resp = app.get('/api/forms/test/%s/' % formdata.id, params={'at': '2022-01-07T00:00:00'})
899
    assert resp.json['fields'] == {'foobar': 'foo'}
900
    assert resp.json['workflow']['fields'] == {'plop': 'bar'}
901
    resp = app.get('/api/forms/test/list/', params={'full': 'on', 'at': '2022-01-07T00:00:00'})
902
    assert len(resp.json) == 1
903
    assert resp.json[0]['fields'] == {'foobar': 'foo'}
904
    assert resp.json[0]['workflow']['fields'] == {'plop': 'bar'}
wcs/backoffice/management.py
38 38
from wcs.carddef import CardDef
39 39
from wcs.categories import Category
40 40
from wcs.conditions import Condition
41
from wcs.formdata import FormData
41
from wcs.formdata import FormData, NoContentSnapshotAt
42 42
from wcs.formdef import FormDef
43 43
from wcs.forms.backoffice import FormDefUI
44 44
from wcs.forms.common import FormdefDirectoryBase, FormStatusPage
......
2471 2471
                include_files=False,
2472 2472
                include_roles=True,
2473 2473
                include_unnamed_fields=False,
2474
                values_at=get_request().form.get('at'),
2474 2475
            )
2475 2476
        else:
2476 2477
            output = [
......
4297 4298
        include_files,
4298 4299
        include_roles,
4299 4300
        include_unnamed_fields,
4301
        values_at=None,
4300 4302
    ):
4303
        if values_at:
4304
            try:
4305
                values_at = datetime.datetime.fromisoformat(values_at).timetuple()
4306
            except ValueError:
4307
                raise RequestError('Invalid value "%s" for "at"' % values_at)
4308

  
4301 4309
        formdef = self.kwargs['formdef_class'].get(self.kwargs['formdef_id'])
4302 4310
        prefetched_users = None
4303 4311
        prefetched_roles = None
......
4327 4335
            }
4328 4336
        output = []
4329 4337
        for formdata in items:
4330
            data = formdata.get_json_export_dict(
4331
                anonymise=anonymise,
4332
                user=user,
4333
                digest_key=digest_key,
4334
                prefetched_users=prefetched_users,
4335
                prefetched_roles=prefetched_roles,
4336
                include_evolution=include_evolution,
4337
                include_files=include_files,
4338
                include_roles=include_roles,
4339
                include_unnamed_fields=include_unnamed_fields,
4340
            )
4338
            try:
4339
                data = formdata.get_json_export_dict(
4340
                    anonymise=anonymise,
4341
                    user=user,
4342
                    digest_key=digest_key,
4343
                    prefetched_users=prefetched_users,
4344
                    prefetched_roles=prefetched_roles,
4345
                    include_evolution=include_evolution,
4346
                    include_files=include_files,
4347
                    include_roles=include_roles,
4348
                    include_unnamed_fields=include_unnamed_fields,
4349
                    values_at=values_at,
4350
                )
4351
            except NoContentSnapshotAt:
4352
                continue
4341 4353
            data.pop('digests')
4342 4354
            if digest_key:
4343 4355
                data['digest'] = (formdata.digests or {}).get(digest_key)
wcs/formdata.py
24 24
import time
25 25

  
26 26
from quixote import get_publisher, get_request, get_session
27
from quixote.errors import RequestError
27 28
from quixote.html import htmltext
28 29
from quixote.http_request import Upload
29 30

  
......
35 36
from .qommon.template import Template
36 37

  
37 38

  
39
class NoContentSnapshotAt(RequestError):
40
    pass
41

  
42

  
38 43
def get_dict_with_varnames(fields, data, formdata=None, varnames_only=False):
39 44
    new_data = {}
40 45
    for field in fields:
......
1332 1337
                    store_dict[store_key + '_structured'] = data.get('%s_structured' % field.id)
1333 1338
        return new_data
1334 1339

  
1335
    def get_json_dict(self, fields, include_files=True, anonymise=False, include_unnamed_fields=False):
1340
    def get_json_dict(self, data, fields, include_files=True, anonymise=False, include_unnamed_fields=False):
1336 1341
        return self.get_json_data_dict(
1337
            self.data,
1342
            data,
1338 1343
            fields,
1339 1344
            formdata=self,
1340 1345
            include_files=include_files,
......
1353 1358
        include_evolution=True,
1354 1359
        include_roles=True,
1355 1360
        include_unnamed_fields=False,
1361
        values_at=None,
1356 1362
    ):
1363
        # noqa pylint: disable=too-many-arguments
1357 1364
        data = {}
1358 1365
        data['id'] = str(self.id)
1359 1366
        data['digests'] = self.digests
......
1379 1386

  
1380 1387
            data['user'] = user.get_json_export_dict(full=isinstance(self.formdef, CardDef))
1381 1388

  
1389
        _data = self.data
1390
        if values_at:
1391
            from wcs.workflows import ContentSnapshotPart
1392

  
1393
            matching_part = None
1394
            for evo in reversed(self.evolution or []):
1395
                for part in reversed(evo.parts or []):
1396
                    if isinstance(part, ContentSnapshotPart):
1397
                        if evo.time < values_at:
1398
                            matching_part = part
1399
                        break
1400
                if matching_part:
1401
                    break
1402
            if not matching_part:
1403
                raise NoContentSnapshotAt('No data found for this datetime')
1404
            _data = matching_part.new_data
1405

  
1382 1406
        data['fields'] = self.get_json_dict(
1407
            _data,
1383 1408
            self.formdef.fields,
1384 1409
            include_files=include_files,
1385 1410
            anonymise=anonymise,
......
1398 1423
            data['workflow']['data'] = self.workflow_data
1399 1424
        if self.formdef.workflow.get_backoffice_fields():
1400 1425
            data['workflow']['fields'] = self.get_json_dict(
1426
                _data,
1401 1427
                self.formdef.workflow.get_backoffice_fields(),
1402 1428
                include_files=include_files,
1403 1429
                anonymise=anonymise,
......
1472 1498

  
1473 1499
        return data
1474 1500

  
1475
    def export_to_json(self, include_files=True, anonymise=False):
1476
        data = self.get_json_export_dict(include_files=include_files, anonymise=anonymise)
1501
    def export_to_json(self, include_files=True, anonymise=False, values_at=None):
1502
        data = self.get_json_export_dict(
1503
            include_files=include_files, anonymise=anonymise, values_at=values_at
1504
        )
1477 1505
        return json.dumps(data, cls=misc.JSONEncoder)
1478 1506

  
1479 1507
    def get_object_key(self):
wcs/forms/common.py
14 14
# You should have received a copy of the GNU General Public License
15 15
# along with this program; if not, see <http://www.gnu.org/licenses/>.
16 16

  
17
import datetime
17 18
import json
18 19
import os
19 20
import time
......
21 22

  
22 23
from quixote import get_publisher, get_request, get_response, get_session, redirect
23 24
from quixote.directory import Directory
25
from quixote.errors import RequestError
24 26
from quixote.html import TemplateIO, htmltext
25 27
from quixote.util import randbytes
26 28

  
......
210 212
            if str(self.filled.id) not in [str(x) for x in item_ids]:
211 213
                raise errors.TraversalError(_('ID not available in filtered view'))
212 214

  
215
        values_at = get_request().form.get('at')
216
        if values_at:
217
            try:
218
                values_at = datetime.datetime.fromisoformat(values_at).timetuple()
219
            except ValueError:
220
                raise RequestError('Invalid value "%s" for "at"' % values_at)
213 221
        return self.export_to_json(
214
            anonymise=anonymise, include_files=get_query_flag('include-files-content', default=True)
222
            anonymise=anonymise,
223
            include_files=get_query_flag('include-files-content', default=True),
224
            values_at=values_at,
215 225
        )
216 226

  
217 227
    def tempfile(self):
......
421 431
            response.content_type = 'text/plain'
422 432
            return "Your browser should redirect you"
423 433

  
424
    def export_to_json(self, include_files=True, anonymise=False):
434
    def export_to_json(self, include_files=True, anonymise=False, values_at=None):
425 435
        get_response().set_content_type('application/json')
426
        return self.filled.export_to_json(include_files=include_files, anonymise=anonymise)
436
        return self.filled.export_to_json(
437
            include_files=include_files, anonymise=anonymise, values_at=values_at
438
        )
427 439

  
428 440
    def history(self):
429 441
        if not self.filled.evolution:
430
-