Projet

Général

Profil

0001-api-get-formdata-from-history-70271.patch

Lauréline Guérin, 21 octobre 2022 12:13

Télécharger (21,2 ko)

Voir les différences:

Subject: [PATCH] api: get formdata from history (#70271)

 tests/test_content_snapshots.py | 224 ++++++++++++++++++++++++++++++++
 wcs/backoffice/management.py    |  39 ++++--
 wcs/formdata.py                 |  36 ++++-
 wcs/forms/common.py             |  21 ++-
 4 files changed, 301 insertions(+), 19 deletions(-)
tests/test_content_snapshots.py
1
import datetime
1 2
import json
2 3
import os
4
import time
3 5

  
4 6
import pytest
7
from django.utils.timezone import make_aware
5 8
from webtest import Upload
6 9

  
7 10
from wcs import fields
8 11
from wcs.api_access import ApiAccess
9 12
from wcs.carddef import CardDef
13
from wcs.formdata import Evolution
10 14
from wcs.formdef import FormDef
11 15
from wcs.qommon.http_request import HTTPRequest
12 16
from wcs.qommon.ident.password_accounts import PasswordAccount
......
739 743
    assert carddata.evolution[0].parts[0].formdef_id == str(carddef.id)
740 744
    assert carddata.evolution[0].parts[0].old_data == {}
741 745
    assert carddata.evolution[0].parts[0].new_data == {'1': 'xxx'}
746

  
747

  
748
def test_api_formdata_at(pub, user, access, role):
749
    app = get_app(pub)
750
    app.set_authorization(('Basic', ('test', '12345')))
751

  
752
    FormDef.wipe()
753
    formdef = FormDef()
754
    formdef.name = 'test'
755
    formdef.fields = [
756
        fields.StringField(id='0', label='foobar', varname='foobar'),
757
    ]
758
    workflow = Workflow(name='foo')
759
    workflow.possible_status = Workflow.get_default_workflow().possible_status[:]
760
    workflow.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(workflow)
761
    workflow.backoffice_fields_formdef.fields = [
762
        fields.StringField(id='bo1', label='bo field 1', type='string', varname='plop'),
763
    ]
764
    workflow.store()
765
    formdef.workflow_id = workflow.id
766
    formdef.workflow_roles = {'_receiver': role.id}
767
    formdef.store()
768

  
769
    formdef.data_class().wipe()
770
    formdata = formdef.data_class()()
771
    formdata.data = {
772
        '0': 'foo',
773
        'bo1': 'bar',
774
    }
775
    formdata.just_created()
776
    formdata.evolution[-1].parts[0].datetime = make_aware(datetime.datetime(2022, 1, 2, 3, 4))
777
    formdata.store()
778

  
779
    def get_evo_and_parts(formdata):
780
        for evo in formdata.evolution:
781
            for part in evo.parts or []:
782
                if isinstance(part, ContentSnapshotPart):
783
                    yield part.datetime.strftime('%Y-%m-%d %H:%M'), part.old_data, part.new_data
784

  
785
    assert list(get_evo_and_parts(formdata)) == [('2022-01-02 03:04', {}, {'0': 'foo', 'bo1': 'bar'})]
786
    resp = app.get('/api/forms/test/%s/' % formdata.id)
787
    assert resp.json['fields'] == {'foobar': 'foo'}
788
    assert resp.json['workflow']['fields'] == {'plop': 'bar'}
789
    resp = app.get('/api/forms/test/list/?full=on')
790
    assert len(resp.json) == 1
791
    assert resp.json[0]['fields'] == {'foobar': 'foo'}
792
    assert resp.json[0]['workflow']['fields'] == {'plop': 'bar'}
793

  
794
    # wrong format
795
    resp = app.get('/api/forms/test/%s/?at=bad-format' % formdata.id, status=400)
796
    assert resp.json['err_desc'] == 'Invalid value "bad-format" for "at"'
797
    resp = app.get('/api/forms/test/list/?full=on&at=bad-format', status=400)
798
    assert resp.json['err_desc'] == 'Invalid value "bad-format" for "at"'
799

  
800
    # before formdata creation
801
    resp = app.get(
802
        '/api/forms/test/%s/' % formdata.id, params={'at': '2022-01-02T00:00:00+01:00'}, status=400
803
    )
804
    assert resp.json['err_desc'] == 'No data found for this datetime'
805
    resp = app.get('/api/forms/test/list/', params={'full': 'on', 'at': '2022-01-02T00:00:00+01:00'})
806
    assert len(resp.json) == 0
807
    resp = app.get(
808
        '/api/forms/test/%s/' % formdata.id, params={'at': '2022-01-02T03:04:00+01:00'}, status=400
809
    )
810
    assert resp.json['err_desc'] == 'No data found for this datetime'
811
    resp = app.get('/api/forms/test/list/', params={'full': 'on', 'at': '2022-01-02T03:04:00+01:00'})
812
    assert len(resp.json) == 0
813

  
814
    # no ContentSnapshotPart (legacy formdata)
815
    formdata.evolution[0].parts = []
816
    formdata.store()
817
    assert list(get_evo_and_parts(formdata)) == []
818

  
819
    # add evolutions with ContentSnapshotPart
820
    evo = formdata.evolution[0]
821
    part = ContentSnapshotPart(formdata=formdata, old_data={})
822
    part.new_data = {'0': 'bar', 'bo1': 'foo'}
823
    part.datetime = make_aware(datetime.datetime(2022, 1, 2, 3, 4))
824
    evo.add_part(part)
825

  
826
    evo = Evolution()
827
    evo.time = time.localtime()
828
    evo.status = formdata.status
829
    part = ContentSnapshotPart(formdata=formdata, old_data={'0': 'bar', 'bo1': 'foo'})
830
    part.new_data = {'0': 'baz', 'bo1': 'foo'}
831
    part.datetime = make_aware(datetime.datetime(2022, 1, 2, 3, 5))
832
    evo.add_part(part)
833
    formdata.evolution.append(evo)
834

  
835
    evo = Evolution()
836
    evo.time = time.localtime()
837
    evo.status = formdata.status
838
    part = ContentSnapshotPart(formdata=formdata, old_data={'0': 'baz', 'bo1': 'foo'})
839
    part.new_data = {'0': 'foooo', '1': 'unknown', 'bo1': 'foo'}
840
    part.datetime = make_aware(datetime.datetime(2022, 1, 4, 5, 6))
841
    evo.add_part(part)
842
    formdata.evolution.append(evo)
843

  
844
    evo = Evolution()
845
    evo.time = time.localtime()
846
    evo.status = formdata.status
847
    part = ContentSnapshotPart(formdata=formdata, old_data={'0': 'foooo', '1': 'unknown', 'bo1': 'foo'})
848
    part.new_data = {'0': 'fooo', 'bo1': 'foo'}
849
    part.datetime = make_aware(datetime.datetime(2022, 1, 5, 6, 7))
850
    evo.add_part(part)
851
    part = ContentSnapshotPart(formdata=formdata, old_data={'0': 'fooo', 'bo1': 'foo'})
852
    part.new_data = {'0': 'foo', 'bo1': 'bar'}
853
    part.datetime = make_aware(datetime.datetime(2022, 1, 5, 6, 7))
854
    evo.add_part(part)
855
    formdata.evolution.append(evo)
856

  
857
    formdata._store_all_evolution = True
858
    formdata.store()
859
    assert list(get_evo_and_parts(formdata)) == [
860
        ('2022-01-02 03:04', {}, {'0': 'bar', 'bo1': 'foo'}),
861
        ('2022-01-02 03:05', {'0': 'bar', 'bo1': 'foo'}, {'0': 'baz', 'bo1': 'foo'}),
862
        ('2022-01-04 05:06', {'0': 'baz', 'bo1': 'foo'}, {'0': 'foooo', '1': 'unknown', 'bo1': 'foo'}),
863
        ('2022-01-05 06:07', {'0': 'foooo', '1': 'unknown', 'bo1': 'foo'}, {'0': 'fooo', 'bo1': 'foo'}),
864
        ('2022-01-05 06:07', {'0': 'fooo', 'bo1': 'foo'}, {'0': 'foo', 'bo1': 'bar'}),
865
    ]
866

  
867
    resp = app.get('/api/forms/test/%s/' % formdata.id, params={'at': '2022-01-03T00:00:00+01:00'})
868
    assert resp.json['fields'] == {'foobar': 'baz'}
869
    assert resp.json['workflow']['fields'] == {'plop': 'foo'}
870
    resp = app.get('/api/forms/test/list/', params={'full': 'on', 'at': '2022-01-03T00:00:00+01:00'})
871
    assert len(resp.json) == 1
872
    assert resp.json[0]['fields'] == {'foobar': 'baz'}
873
    assert resp.json[0]['workflow']['fields'] == {'plop': 'foo'}
874

  
875
    resp = app.get('/api/forms/test/%s/' % formdata.id, params={'at': '2022-01-02T03:05:00+01:00'})
876
    assert resp.json['fields'] == {'foobar': 'bar'}
877
    assert resp.json['workflow']['fields'] == {'plop': 'foo'}
878
    resp = app.get('/api/forms/test/list/', params={'full': 'on', 'at': '2022-01-02T03:05:00+01:00'})
879
    assert len(resp.json) == 1
880
    assert resp.json[0]['fields'] == {'foobar': 'bar'}
881
    assert resp.json[0]['workflow']['fields'] == {'plop': 'foo'}
882

  
883
    resp = app.get('/api/forms/test/%s/' % formdata.id, params={'at': '2022-01-02T03:06:00+01:00'})
884
    assert resp.json['fields'] == {'foobar': 'baz'}
885
    assert resp.json['workflow']['fields'] == {'plop': 'foo'}
886
    resp = app.get('/api/forms/test/list/', params={'full': 'on', 'at': '2022-01-02T03:06:00+01:00'})
887
    assert len(resp.json) == 1
888
    assert resp.json[0]['fields'] == {'foobar': 'baz'}
889
    assert resp.json[0]['workflow']['fields'] == {'plop': 'foo'}
890

  
891
    resp = app.get('/api/forms/test/%s/' % formdata.id, params={'at': '2022-01-04T00:00:00+01:00'})
892
    assert resp.json['fields'] == {'foobar': 'baz'}
893
    assert resp.json['workflow']['fields'] == {'plop': 'foo'}
894
    resp = app.get('/api/forms/test/list/', params={'full': 'on', 'at': '2022-01-04T00:00:00+01:00'})
895
    assert len(resp.json) == 1
896
    assert resp.json[0]['fields'] == {'foobar': 'baz'}
897
    assert resp.json[0]['workflow']['fields'] == {'plop': 'foo'}
898

  
899
    resp = app.get('/api/forms/test/%s/' % formdata.id, params={'at': '2022-01-05T00:00:00+01:00'})
900
    assert resp.json['fields'] == {'foobar': 'foooo'}
901
    assert resp.json['workflow']['fields'] == {'plop': 'foo'}
902
    resp = app.get('/api/forms/test/list/', params={'full': 'on', 'at': '2022-01-05T00:00:00+01:00'})
903
    assert len(resp.json) == 1
904
    assert resp.json[0]['fields'] == {'foobar': 'foooo'}
905
    assert resp.json[0]['workflow']['fields'] == {'plop': 'foo'}
906

  
907
    resp = app.get('/api/forms/test/%s/' % formdata.id, params={'at': '2022-01-05T06:07:00+01:00'})
908
    assert resp.json['fields'] == {'foobar': 'foooo'}
909
    assert resp.json['workflow']['fields'] == {'plop': 'foo'}
910
    resp = app.get('/api/forms/test/list/', params={'full': 'on', 'at': '2022-01-05T06:07:00+01:00'})
911
    assert len(resp.json) == 1
912
    assert resp.json[0]['fields'] == {'foobar': 'foooo'}
913
    assert resp.json[0]['workflow']['fields'] == {'plop': 'foo'}
914

  
915
    resp = app.get('/api/forms/test/%s/' % formdata.id, params={'at': '2022-01-05T06:08:00+01:00'})
916
    assert resp.json['fields'] == {'foobar': 'foo'}
917
    assert resp.json['workflow']['fields'] == {'plop': 'bar'}
918
    resp = app.get('/api/forms/test/list/', params={'full': 'on', 'at': '2022-01-05T06:08:00+01:00'})
919
    assert len(resp.json) == 1
920
    assert resp.json[0]['fields'] == {'foobar': 'foo'}
921
    assert resp.json[0]['workflow']['fields'] == {'plop': 'bar'}
922

  
923
    resp = app.get('/api/forms/test/%s/' % formdata.id, params={'at': '2022-01-06T00:00:00+01:00'})
924
    assert resp.json['fields'] == {'foobar': 'foo'}
925
    assert resp.json['workflow']['fields'] == {'plop': 'bar'}
926
    resp = app.get('/api/forms/test/list/', params={'full': 'on', 'at': '2022-01-06T00:00:00+01:00'})
927
    assert len(resp.json) == 1
928
    assert resp.json[0]['fields'] == {'foobar': 'foo'}
929
    assert resp.json[0]['workflow']['fields'] == {'plop': 'bar'}
930

  
931
    resp = app.get('/api/forms/test/%s/' % formdata.id, params={'at': '2022-01-07T00:00:00+01:00'})
932
    assert resp.json['fields'] == {'foobar': 'foo'}
933
    assert resp.json['workflow']['fields'] == {'plop': 'bar'}
934
    resp = app.get('/api/forms/test/list/', params={'full': 'on', 'at': '2022-01-07T00:00:00+01:00'})
935
    assert len(resp.json) == 1
936
    assert resp.json[0]['fields'] == {'foobar': 'foo'}
937
    assert resp.json[0]['workflow']['fields'] == {'plop': 'bar'}
938

  
939
    # check with other TZ
940
    resp = app.get(
941
        '/api/forms/test/%s/' % formdata.id, params={'at': '2022-01-02T04:04:00+02:00'}, status=400
942
    )
943
    assert resp.json['err_desc'] == 'No data found for this datetime'
944
    resp = app.get('/api/forms/test/list/', params={'full': 'on', 'at': '2022-01-02T04:04:00+02:00'})
945
    assert len(resp.json) == 0
946
    resp = app.get('/api/forms/test/%s/' % formdata.id, params={'at': '2022-01-02T04:05:00+02:00'})
947
    assert resp.json['fields'] == {'foobar': 'bar'}
948
    assert resp.json['workflow']['fields'] == {'plop': 'foo'}
949
    resp = app.get('/api/forms/test/list/', params={'full': 'on', 'at': '2022-01-02T04:05:00+02:00'})
950
    assert len(resp.json) == 1
951
    assert resp.json[0]['fields'] == {'foobar': 'bar'}
952
    assert resp.json[0]['workflow']['fields'] == {'plop': 'foo'}
953

  
954
    # check without TZ
955
    resp = app.get('/api/forms/test/%s/' % formdata.id, params={'at': '2022-01-02T03:04:00'}, status=400)
956
    assert resp.json['err_desc'] == 'No data found for this datetime'
957
    resp = app.get('/api/forms/test/list/', params={'full': 'on', 'at': '2022-01-02T03:04:00'})
958
    assert len(resp.json) == 0
959
    resp = app.get('/api/forms/test/%s/' % formdata.id, params={'at': '2022-01-02T03:05:00'})
960
    assert resp.json['fields'] == {'foobar': 'bar'}
961
    assert resp.json['workflow']['fields'] == {'plop': 'foo'}
962
    resp = app.get('/api/forms/test/list/', params={'full': 'on', 'at': '2022-01-02T03:05:00'})
963
    assert len(resp.json) == 1
964
    assert resp.json[0]['fields'] == {'foobar': 'bar'}
965
    assert resp.json[0]['workflow']['fields'] == {'plop': 'foo'}
wcs/backoffice/management.py
26 26

  
27 27
import vobject
28 28
from django.utils.encoding import force_text
29
from django.utils.timezone import is_naive, make_aware
29 30
from quixote import get_publisher, get_request, get_response, get_session, redirect
30 31
from quixote.directory import Directory
31 32
from quixote.errors import RequestError
......
38 39
from wcs.carddef import CardDef
39 40
from wcs.categories import Category
40 41
from wcs.conditions import Condition
41
from wcs.formdata import FormData
42
from wcs.formdata import FormData, NoContentSnapshotAt
42 43
from wcs.formdef import FormDef
43 44
from wcs.forms.backoffice import FormDefUI
44 45
from wcs.forms.common import FormdefDirectoryBase, FormStatusPage
......
2471 2472
                include_files=False,
2472 2473
                include_roles=True,
2473 2474
                include_unnamed_fields=False,
2475
                values_at=get_request().form.get('at'),
2474 2476
            )
2475 2477
        else:
2476 2478
            output = [
......
4297 4299
        include_files,
4298 4300
        include_roles,
4299 4301
        include_unnamed_fields,
4302
        values_at=None,
4300 4303
    ):
4304
        if values_at:
4305
            try:
4306
                values_at = datetime.datetime.fromisoformat(values_at)
4307
                if is_naive(values_at):
4308
                    values_at = make_aware(values_at)
4309
            except ValueError:
4310
                raise RequestError('Invalid value "%s" for "at"' % values_at)
4311

  
4301 4312
        formdef = self.kwargs['formdef_class'].get(self.kwargs['formdef_id'])
4302 4313
        prefetched_users = None
4303 4314
        prefetched_roles = None
......
4327 4338
            }
4328 4339
        output = []
4329 4340
        for formdata in items:
4330
            data = formdata.get_json_export_dict(
4331
                anonymise=anonymise,
4332
                user=user,
4333
                digest_key=digest_key,
4334
                prefetched_users=prefetched_users,
4335
                prefetched_roles=prefetched_roles,
4336
                include_evolution=include_evolution,
4337
                include_files=include_files,
4338
                include_roles=include_roles,
4339
                include_unnamed_fields=include_unnamed_fields,
4340
            )
4341
            try:
4342
                data = formdata.get_json_export_dict(
4343
                    anonymise=anonymise,
4344
                    user=user,
4345
                    digest_key=digest_key,
4346
                    prefetched_users=prefetched_users,
4347
                    prefetched_roles=prefetched_roles,
4348
                    include_evolution=include_evolution,
4349
                    include_files=include_files,
4350
                    include_roles=include_roles,
4351
                    include_unnamed_fields=include_unnamed_fields,
4352
                    values_at=values_at,
4353
                )
4354
            except NoContentSnapshotAt:
4355
                continue
4341 4356
            data.pop('digests')
4342 4357
            if digest_key:
4343 4358
                data['digest'] = (formdata.digests or {}).get(digest_key)
wcs/formdata.py
24 24
import time
25 25

  
26 26
from quixote import get_publisher, get_request, get_session
27
from quixote.errors import RequestError
27 28
from quixote.html import htmltext
28 29
from quixote.http_request import Upload
29 30

  
......
35 36
from .qommon.template import Template
36 37

  
37 38

  
39
class NoContentSnapshotAt(RequestError):
40
    pass
41

  
42

  
38 43
def get_dict_with_varnames(fields, data, formdata=None, varnames_only=False):
39 44
    new_data = {}
40 45
    for field in fields:
......
1334 1339
                    store_dict[store_key + '_structured'] = data.get('%s_structured' % field.id)
1335 1340
        return new_data
1336 1341

  
1337
    def get_json_dict(self, fields, include_files=True, anonymise=False, include_unnamed_fields=False):
1342
    def get_json_dict(self, data, fields, include_files=True, anonymise=False, include_unnamed_fields=False):
1338 1343
        return self.get_json_data_dict(
1339
            self.data,
1344
            data,
1340 1345
            fields,
1341 1346
            formdata=self,
1342 1347
            include_files=include_files,
......
1355 1360
        include_evolution=True,
1356 1361
        include_roles=True,
1357 1362
        include_unnamed_fields=False,
1363
        values_at=None,
1358 1364
    ):
1365
        # noqa pylint: disable=too-many-arguments
1359 1366
        data = {}
1360 1367
        data['id'] = str(self.id)
1361 1368
        data['digests'] = self.digests
......
1381 1388

  
1382 1389
            data['user'] = user.get_json_export_dict(full=isinstance(self.formdef, CardDef))
1383 1390

  
1391
        _data = self.data
1392
        if values_at:
1393
            from wcs.workflows import ContentSnapshotPart
1394

  
1395
            matching_part = None
1396
            for evo in reversed(self.evolution or []):
1397
                for part in reversed(evo.parts or []):
1398
                    if isinstance(part, ContentSnapshotPart):
1399
                        if part.datetime < values_at:
1400
                            matching_part = part
1401
                        break
1402
                if matching_part:
1403
                    break
1404
            if not matching_part:
1405
                raise NoContentSnapshotAt('No data found for this datetime')
1406
            _data = matching_part.new_data
1407

  
1384 1408
        data['fields'] = self.get_json_dict(
1409
            _data,
1385 1410
            self.formdef.fields,
1386 1411
            include_files=include_files,
1387 1412
            anonymise=anonymise,
......
1400 1425
            data['workflow']['data'] = self.workflow_data
1401 1426
        if self.formdef.workflow.get_backoffice_fields():
1402 1427
            data['workflow']['fields'] = self.get_json_dict(
1428
                _data,
1403 1429
                self.formdef.workflow.get_backoffice_fields(),
1404 1430
                include_files=include_files,
1405 1431
                anonymise=anonymise,
......
1474 1500

  
1475 1501
        return data
1476 1502

  
1477
    def export_to_json(self, include_files=True, anonymise=False):
1478
        data = self.get_json_export_dict(include_files=include_files, anonymise=anonymise)
1503
    def export_to_json(self, include_files=True, anonymise=False, values_at=None):
1504
        data = self.get_json_export_dict(
1505
            include_files=include_files, anonymise=anonymise, values_at=values_at
1506
        )
1479 1507
        return json.dumps(data, cls=misc.JSONEncoder)
1480 1508

  
1481 1509
    def get_object_key(self):
wcs/forms/common.py
14 14
# You should have received a copy of the GNU General Public License
15 15
# along with this program; if not, see <http://www.gnu.org/licenses/>.
16 16

  
17
import datetime
17 18
import json
18 19
import os
19 20
import time
20 21
import urllib.parse
21 22

  
23
from django.utils.timezone import is_naive, make_aware
22 24
from quixote import get_publisher, get_request, get_response, get_session, redirect
23 25
from quixote.directory import Directory
26
from quixote.errors import RequestError
24 27
from quixote.html import TemplateIO, htmltext
25 28
from quixote.util import randbytes
26 29

  
......
210 213
            if str(self.filled.id) not in [str(x) for x in item_ids]:
211 214
                raise errors.TraversalError(_('ID not available in filtered view'))
212 215

  
216
        values_at = get_request().form.get('at')
217
        if values_at:
218
            try:
219
                values_at = datetime.datetime.fromisoformat(values_at)
220
                if is_naive(values_at):
221
                    values_at = make_aware(values_at)
222
            except ValueError:
223
                raise RequestError('Invalid value "%s" for "at"' % values_at)
213 224
        return self.export_to_json(
214
            anonymise=anonymise, include_files=get_query_flag('include-files-content', default=True)
225
            anonymise=anonymise,
226
            include_files=get_query_flag('include-files-content', default=True),
227
            values_at=values_at,
215 228
        )
216 229

  
217 230
    def tempfile(self):
......
421 434
            response.content_type = 'text/plain'
422 435
            return "Your browser should redirect you"
423 436

  
424
    def export_to_json(self, include_files=True, anonymise=False):
437
    def export_to_json(self, include_files=True, anonymise=False, values_at=None):
425 438
        get_response().set_content_type('application/json')
426
        return self.filled.export_to_json(include_files=include_files, anonymise=anonymise)
439
        return self.filled.export_to_json(
440
            include_files=include_files, anonymise=anonymise, values_at=values_at
441
        )
427 442

  
428 443
    def history(self):
429 444
        if not self.filled.evolution:
430
-