0001-api-get-formdata-from-history-70271.patch
tests/test_content_snapshots.py | ||
---|---|---|
1 |
import datetime |
|
1 | 2 |
import json |
2 | 3 |
import os |
4 |
import time |
|
3 | 5 | |
4 | 6 |
import pytest |
5 | 7 |
from webtest import Upload |
... | ... | |
7 | 9 |
from wcs import fields |
8 | 10 |
from wcs.api_access import ApiAccess |
9 | 11 |
from wcs.carddef import CardDef |
12 |
from wcs.formdata import Evolution |
|
10 | 13 |
from wcs.formdef import FormDef |
11 | 14 |
from wcs.qommon.http_request import HTTPRequest |
12 | 15 |
from wcs.qommon.ident.password_accounts import PasswordAccount |
... | ... | |
716 | 719 |
assert carddata.evolution[0].parts[0].formdef_id == str(carddef.id) |
717 | 720 |
assert carddata.evolution[0].parts[0].old_data == {} |
718 | 721 |
assert carddata.evolution[0].parts[0].new_data == {'1': 'xxx'} |
722 | ||
723 | ||
724 |
def test_api_formdata_at(pub, user, access, role): |
|
725 |
app = get_app(pub) |
|
726 |
app.set_authorization(('Basic', ('test', '12345'))) |
|
727 | ||
728 |
FormDef.wipe() |
|
729 |
formdef = FormDef() |
|
730 |
formdef.name = 'test' |
|
731 |
formdef.fields = [ |
|
732 |
fields.StringField(id='0', label='foobar', varname='foobar'), |
|
733 |
] |
|
734 |
workflow = Workflow(name='foo') |
|
735 |
workflow.possible_status = Workflow.get_default_workflow().possible_status[:] |
|
736 |
workflow.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(workflow) |
|
737 |
workflow.backoffice_fields_formdef.fields = [ |
|
738 |
fields.StringField(id='bo1', label='bo field 1', type='string', varname='plop'), |
|
739 |
] |
|
740 |
workflow.store() |
|
741 |
formdef.workflow_id = workflow.id |
|
742 |
formdef.workflow_roles = {'_receiver': role.id} |
|
743 |
formdef.store() |
|
744 | ||
745 |
formdef.data_class().wipe() |
|
746 |
formdata = formdef.data_class()() |
|
747 |
formdata.data = { |
|
748 |
'0': 'foo', |
|
749 |
'bo1': 'bar', |
|
750 |
} |
|
751 |
formdata.just_created() |
|
752 |
formdata.evolution[0].time = datetime.datetime(2022, 1, 2, 3, 4).timetuple() |
|
753 |
formdata.store() |
|
754 | ||
755 |
def get_evo_and_parts(formdata): |
|
756 |
for evo in formdata.evolution: |
|
757 |
for part in evo.parts or []: |
|
758 |
if isinstance(part, ContentSnapshotPart): |
|
759 |
yield time.strftime('%Y-%m-%d %H:%M', evo.time), part.old_data, part.new_data |
|
760 | ||
761 |
assert list(get_evo_and_parts(formdata)) == [('2022-01-02 03:04', {}, {'0': 'foo', 'bo1': 'bar'})] |
|
762 |
resp = app.get('/api/forms/test/%s/' % formdata.id) |
|
763 |
assert resp.json['fields'] == {'foobar': 'foo'} |
|
764 |
assert resp.json['workflow']['fields'] == {'plop': 'bar'} |
|
765 |
resp = app.get('/api/forms/test/list/?full=on') |
|
766 |
assert len(resp.json) == 1 |
|
767 |
assert resp.json[0]['fields'] == {'foobar': 'foo'} |
|
768 |
assert resp.json[0]['workflow']['fields'] == {'plop': 'bar'} |
|
769 | ||
770 |
# wrong format |
|
771 |
resp = app.get('/api/forms/test/%s/?at=bad-format' % formdata.id, status=400) |
|
772 |
assert resp.json['err_desc'] == 'Invalid value "bad-format" for "at"' |
|
773 |
resp = app.get('/api/forms/test/list/?full=on&at=bad-format', status=400) |
|
774 |
assert resp.json['err_desc'] == 'Invalid value "bad-format" for "at"' |
|
775 | ||
776 |
# before formdata creation |
|
777 |
resp = app.get('/api/forms/test/%s/' % formdata.id, params={'at': '2022-01-02T00:00:00'}, status=400) |
|
778 |
assert resp.json['err_desc'] == 'No data found for this datetime' |
|
779 |
resp = app.get('/api/forms/test/list/', params={'full': 'on', 'at': '2022-01-02T00:00:00'}) |
|
780 |
assert len(resp.json) == 0 |
|
781 |
resp = app.get('/api/forms/test/%s/' % formdata.id, params={'at': '2022-01-02T03:04:00'}, status=400) |
|
782 |
assert resp.json['err_desc'] == 'No data found for this datetime' |
|
783 |
resp = app.get('/api/forms/test/list/', params={'full': 'on', 'at': '2022-01-02T03:04:00'}) |
|
784 |
assert len(resp.json) == 0 |
|
785 | ||
786 |
# no ContentSnapshotPart (legacy formdata) |
|
787 |
formdata.evolution[0].parts = [] |
|
788 |
formdata.store() |
|
789 |
assert list(get_evo_and_parts(formdata)) == [] |
|
790 | ||
791 |
# add evolutions with ContentSnapshotPart |
|
792 |
evo = formdata.evolution[0] |
|
793 |
part = ContentSnapshotPart(formdata=formdata, old_data={}) |
|
794 |
part.new_data = {'0': 'bar', 'bo1': 'foo'} |
|
795 |
evo.add_part(part) |
|
796 | ||
797 |
evo = Evolution() |
|
798 |
evo.time = datetime.datetime(2022, 1, 2, 3, 5).timetuple() |
|
799 |
evo.status = formdata.status |
|
800 |
part = ContentSnapshotPart(formdata=formdata, old_data={'0': 'bar', 'bo1': 'foo'}) |
|
801 |
part.new_data = {'0': 'baz', 'bo1': 'foo'} |
|
802 |
evo.add_part(part) |
|
803 |
formdata.evolution.append(evo) |
|
804 | ||
805 |
evo = Evolution() |
|
806 |
evo.time = datetime.datetime(2022, 1, 4, 5, 6).timetuple() |
|
807 |
evo.status = formdata.status |
|
808 |
part = ContentSnapshotPart(formdata=formdata, old_data={'0': 'baz', 'bo1': 'foo'}) |
|
809 |
part.new_data = {'0': 'foooo', '1': 'unknown', 'bo1': 'foo'} |
|
810 |
evo.add_part(part) |
|
811 |
formdata.evolution.append(evo) |
|
812 | ||
813 |
evo = Evolution() |
|
814 |
evo.time = datetime.datetime(2022, 1, 5, 6, 7).timetuple() |
|
815 |
evo.status = formdata.status |
|
816 |
part = ContentSnapshotPart(formdata=formdata, old_data={'0': 'foooo', '1': 'unknown', 'bo1': 'foo'}) |
|
817 |
part.new_data = {'0': 'fooo', 'bo1': 'foo'} |
|
818 |
evo.add_part(part) |
|
819 |
part = ContentSnapshotPart(formdata=formdata, old_data={'0': 'fooo', 'bo1': 'foo'}) |
|
820 |
part.new_data = {'0': 'foo', 'bo1': 'bar'} |
|
821 |
evo.add_part(part) |
|
822 |
formdata.evolution.append(evo) |
|
823 | ||
824 |
formdata._store_all_evolution = True |
|
825 |
formdata.store() |
|
826 |
assert list(get_evo_and_parts(formdata)) == [ |
|
827 |
('2022-01-02 03:04', {}, {'0': 'bar', 'bo1': 'foo'}), |
|
828 |
('2022-01-02 03:05', {'0': 'bar', 'bo1': 'foo'}, {'0': 'baz', 'bo1': 'foo'}), |
|
829 |
('2022-01-04 05:06', {'0': 'baz', 'bo1': 'foo'}, {'0': 'foooo', '1': 'unknown', 'bo1': 'foo'}), |
|
830 |
('2022-01-05 06:07', {'0': 'foooo', '1': 'unknown', 'bo1': 'foo'}, {'0': 'fooo', 'bo1': 'foo'}), |
|
831 |
('2022-01-05 06:07', {'0': 'fooo', 'bo1': 'foo'}, {'0': 'foo', 'bo1': 'bar'}), |
|
832 |
] |
|
833 | ||
834 |
resp = app.get('/api/forms/test/%s/' % formdata.id, params={'at': '2022-01-03T00:00:00'}) |
|
835 |
assert resp.json['fields'] == {'foobar': 'baz'} |
|
836 |
assert resp.json['workflow']['fields'] == {'plop': 'foo'} |
|
837 |
resp = app.get('/api/forms/test/list/', params={'full': 'on', 'at': '2022-01-03T00:00:00'}) |
|
838 |
assert len(resp.json) == 1 |
|
839 |
assert resp.json[0]['fields'] == {'foobar': 'baz'} |
|
840 |
assert resp.json[0]['workflow']['fields'] == {'plop': 'foo'} |
|
841 | ||
842 |
resp = app.get('/api/forms/test/%s/' % formdata.id, params={'at': '2022-01-02T03:05:00'}) |
|
843 |
assert resp.json['fields'] == {'foobar': 'bar'} |
|
844 |
assert resp.json['workflow']['fields'] == {'plop': 'foo'} |
|
845 |
resp = app.get('/api/forms/test/list/', params={'full': 'on', 'at': '2022-01-02T03:05:00'}) |
|
846 |
assert len(resp.json) == 1 |
|
847 |
assert resp.json[0]['fields'] == {'foobar': 'bar'} |
|
848 |
assert resp.json[0]['workflow']['fields'] == {'plop': 'foo'} |
|
849 | ||
850 |
resp = app.get('/api/forms/test/%s/' % formdata.id, params={'at': '2022-01-02T03:06:00'}) |
|
851 |
assert resp.json['fields'] == {'foobar': 'baz'} |
|
852 |
assert resp.json['workflow']['fields'] == {'plop': 'foo'} |
|
853 |
resp = app.get('/api/forms/test/list/', params={'full': 'on', 'at': '2022-01-02T03:06:00'}) |
|
854 |
assert len(resp.json) == 1 |
|
855 |
assert resp.json[0]['fields'] == {'foobar': 'baz'} |
|
856 |
assert resp.json[0]['workflow']['fields'] == {'plop': 'foo'} |
|
857 | ||
858 |
resp = app.get('/api/forms/test/%s/' % formdata.id, params={'at': '2022-01-04T00:00:00'}) |
|
859 |
assert resp.json['fields'] == {'foobar': 'baz'} |
|
860 |
assert resp.json['workflow']['fields'] == {'plop': 'foo'} |
|
861 |
resp = app.get('/api/forms/test/list/', params={'full': 'on', 'at': '2022-01-04T00:00:00'}) |
|
862 |
assert len(resp.json) == 1 |
|
863 |
assert resp.json[0]['fields'] == {'foobar': 'baz'} |
|
864 |
assert resp.json[0]['workflow']['fields'] == {'plop': 'foo'} |
|
865 | ||
866 |
resp = app.get('/api/forms/test/%s/' % formdata.id, params={'at': '2022-01-05T00:00:00'}) |
|
867 |
assert resp.json['fields'] == {'foobar': 'foooo'} |
|
868 |
assert resp.json['workflow']['fields'] == {'plop': 'foo'} |
|
869 |
resp = app.get('/api/forms/test/list/', params={'full': 'on', 'at': '2022-01-05T00:00:00'}) |
|
870 |
assert len(resp.json) == 1 |
|
871 |
assert resp.json[0]['fields'] == {'foobar': 'foooo'} |
|
872 |
assert resp.json[0]['workflow']['fields'] == {'plop': 'foo'} |
|
873 | ||
874 |
resp = app.get('/api/forms/test/%s/' % formdata.id, params={'at': '2022-01-05T06:07:00'}) |
|
875 |
assert resp.json['fields'] == {'foobar': 'foooo'} |
|
876 |
assert resp.json['workflow']['fields'] == {'plop': 'foo'} |
|
877 |
resp = app.get('/api/forms/test/list/', params={'full': 'on', 'at': '2022-01-05T06:07:00'}) |
|
878 |
assert len(resp.json) == 1 |
|
879 |
assert resp.json[0]['fields'] == {'foobar': 'foooo'} |
|
880 |
assert resp.json[0]['workflow']['fields'] == {'plop': 'foo'} |
|
881 | ||
882 |
resp = app.get('/api/forms/test/%s/' % formdata.id, params={'at': '2022-01-05T06:08:00'}) |
|
883 |
assert resp.json['fields'] == {'foobar': 'foo'} |
|
884 |
assert resp.json['workflow']['fields'] == {'plop': 'bar'} |
|
885 |
resp = app.get('/api/forms/test/list/', params={'full': 'on', 'at': '2022-01-05T06:08:00'}) |
|
886 |
assert len(resp.json) == 1 |
|
887 |
assert resp.json[0]['fields'] == {'foobar': 'foo'} |
|
888 |
assert resp.json[0]['workflow']['fields'] == {'plop': 'bar'} |
|
889 | ||
890 |
resp = app.get('/api/forms/test/%s/' % formdata.id, params={'at': '2022-01-06T00:00:00'}) |
|
891 |
assert resp.json['fields'] == {'foobar': 'foo'} |
|
892 |
assert resp.json['workflow']['fields'] == {'plop': 'bar'} |
|
893 |
resp = app.get('/api/forms/test/list/', params={'full': 'on', 'at': '2022-01-06T00:00:00'}) |
|
894 |
assert len(resp.json) == 1 |
|
895 |
assert resp.json[0]['fields'] == {'foobar': 'foo'} |
|
896 |
assert resp.json[0]['workflow']['fields'] == {'plop': 'bar'} |
|
897 | ||
898 |
resp = app.get('/api/forms/test/%s/' % formdata.id, params={'at': '2022-01-07T00:00:00'}) |
|
899 |
assert resp.json['fields'] == {'foobar': 'foo'} |
|
900 |
assert resp.json['workflow']['fields'] == {'plop': 'bar'} |
|
901 |
resp = app.get('/api/forms/test/list/', params={'full': 'on', 'at': '2022-01-07T00:00:00'}) |
|
902 |
assert len(resp.json) == 1 |
|
903 |
assert resp.json[0]['fields'] == {'foobar': 'foo'} |
|
904 |
assert resp.json[0]['workflow']['fields'] == {'plop': 'bar'} |
wcs/backoffice/management.py | ||
---|---|---|
38 | 38 |
from wcs.carddef import CardDef |
39 | 39 |
from wcs.categories import Category |
40 | 40 |
from wcs.conditions import Condition |
41 |
from wcs.formdata import FormData |
|
41 |
from wcs.formdata import FormData, NoContentSnapshotAt
|
|
42 | 42 |
from wcs.formdef import FormDef |
43 | 43 |
from wcs.forms.backoffice import FormDefUI |
44 | 44 |
from wcs.forms.common import FormdefDirectoryBase, FormStatusPage |
... | ... | |
2471 | 2471 |
include_files=False, |
2472 | 2472 |
include_roles=True, |
2473 | 2473 |
include_unnamed_fields=False, |
2474 |
values_at=get_request().form.get('at'), |
|
2474 | 2475 |
) |
2475 | 2476 |
else: |
2476 | 2477 |
output = [ |
... | ... | |
4297 | 4298 |
include_files, |
4298 | 4299 |
include_roles, |
4299 | 4300 |
include_unnamed_fields, |
4301 |
values_at=None, |
|
4300 | 4302 |
): |
4303 |
if values_at: |
|
4304 |
try: |
|
4305 |
values_at = datetime.datetime.fromisoformat(values_at).timetuple() |
|
4306 |
except ValueError: |
|
4307 |
raise RequestError('Invalid value "%s" for "at"' % values_at) |
|
4308 | ||
4301 | 4309 |
formdef = self.kwargs['formdef_class'].get(self.kwargs['formdef_id']) |
4302 | 4310 |
prefetched_users = None |
4303 | 4311 |
prefetched_roles = None |
... | ... | |
4327 | 4335 |
} |
4328 | 4336 |
output = [] |
4329 | 4337 |
for formdata in items: |
4330 |
data = formdata.get_json_export_dict( |
|
4331 |
anonymise=anonymise, |
|
4332 |
user=user, |
|
4333 |
digest_key=digest_key, |
|
4334 |
prefetched_users=prefetched_users, |
|
4335 |
prefetched_roles=prefetched_roles, |
|
4336 |
include_evolution=include_evolution, |
|
4337 |
include_files=include_files, |
|
4338 |
include_roles=include_roles, |
|
4339 |
include_unnamed_fields=include_unnamed_fields, |
|
4340 |
) |
|
4338 |
try: |
|
4339 |
data = formdata.get_json_export_dict( |
|
4340 |
anonymise=anonymise, |
|
4341 |
user=user, |
|
4342 |
digest_key=digest_key, |
|
4343 |
prefetched_users=prefetched_users, |
|
4344 |
prefetched_roles=prefetched_roles, |
|
4345 |
include_evolution=include_evolution, |
|
4346 |
include_files=include_files, |
|
4347 |
include_roles=include_roles, |
|
4348 |
include_unnamed_fields=include_unnamed_fields, |
|
4349 |
values_at=values_at, |
|
4350 |
) |
|
4351 |
except NoContentSnapshotAt: |
|
4352 |
continue |
|
4341 | 4353 |
data.pop('digests') |
4342 | 4354 |
if digest_key: |
4343 | 4355 |
data['digest'] = (formdata.digests or {}).get(digest_key) |
wcs/formdata.py | ||
---|---|---|
24 | 24 |
import time |
25 | 25 | |
26 | 26 |
from quixote import get_publisher, get_request, get_session |
27 |
from quixote.errors import RequestError |
|
27 | 28 |
from quixote.html import htmltext |
28 | 29 |
from quixote.http_request import Upload |
29 | 30 | |
... | ... | |
35 | 36 |
from .qommon.template import Template |
36 | 37 | |
37 | 38 | |
39 |
class NoContentSnapshotAt(RequestError): |
|
40 |
pass |
|
41 | ||
42 | ||
38 | 43 |
def get_dict_with_varnames(fields, data, formdata=None, varnames_only=False): |
39 | 44 |
new_data = {} |
40 | 45 |
for field in fields: |
... | ... | |
1332 | 1337 |
store_dict[store_key + '_structured'] = data.get('%s_structured' % field.id) |
1333 | 1338 |
return new_data |
1334 | 1339 | |
1335 |
def get_json_dict(self, fields, include_files=True, anonymise=False, include_unnamed_fields=False): |
|
1340 |
def get_json_dict(self, data, fields, include_files=True, anonymise=False, include_unnamed_fields=False):
|
|
1336 | 1341 |
return self.get_json_data_dict( |
1337 |
self.data,
|
|
1342 |
data, |
|
1338 | 1343 |
fields, |
1339 | 1344 |
formdata=self, |
1340 | 1345 |
include_files=include_files, |
... | ... | |
1353 | 1358 |
include_evolution=True, |
1354 | 1359 |
include_roles=True, |
1355 | 1360 |
include_unnamed_fields=False, |
1361 |
values_at=None, |
|
1356 | 1362 |
): |
1363 |
# noqa pylint: disable=too-many-arguments |
|
1357 | 1364 |
data = {} |
1358 | 1365 |
data['id'] = str(self.id) |
1359 | 1366 |
data['digests'] = self.digests |
... | ... | |
1379 | 1386 | |
1380 | 1387 |
data['user'] = user.get_json_export_dict(full=isinstance(self.formdef, CardDef)) |
1381 | 1388 | |
1389 |
_data = self.data |
|
1390 |
if values_at: |
|
1391 |
from wcs.workflows import ContentSnapshotPart |
|
1392 | ||
1393 |
matching_part = None |
|
1394 |
for evo in reversed(self.evolution or []): |
|
1395 |
for part in reversed(evo.parts or []): |
|
1396 |
if isinstance(part, ContentSnapshotPart): |
|
1397 |
if evo.time < values_at: |
|
1398 |
matching_part = part |
|
1399 |
break |
|
1400 |
if matching_part: |
|
1401 |
break |
|
1402 |
if not matching_part: |
|
1403 |
raise NoContentSnapshotAt('No data found for this datetime') |
|
1404 |
_data = matching_part.new_data |
|
1405 | ||
1382 | 1406 |
data['fields'] = self.get_json_dict( |
1407 |
_data, |
|
1383 | 1408 |
self.formdef.fields, |
1384 | 1409 |
include_files=include_files, |
1385 | 1410 |
anonymise=anonymise, |
... | ... | |
1398 | 1423 |
data['workflow']['data'] = self.workflow_data |
1399 | 1424 |
if self.formdef.workflow.get_backoffice_fields(): |
1400 | 1425 |
data['workflow']['fields'] = self.get_json_dict( |
1426 |
_data, |
|
1401 | 1427 |
self.formdef.workflow.get_backoffice_fields(), |
1402 | 1428 |
include_files=include_files, |
1403 | 1429 |
anonymise=anonymise, |
... | ... | |
1472 | 1498 | |
1473 | 1499 |
return data |
1474 | 1500 | |
1475 |
def export_to_json(self, include_files=True, anonymise=False): |
|
1476 |
data = self.get_json_export_dict(include_files=include_files, anonymise=anonymise) |
|
1501 |
def export_to_json(self, include_files=True, anonymise=False, values_at=None): |
|
1502 |
data = self.get_json_export_dict( |
|
1503 |
include_files=include_files, anonymise=anonymise, values_at=values_at |
|
1504 |
) |
|
1477 | 1505 |
return json.dumps(data, cls=misc.JSONEncoder) |
1478 | 1506 | |
1479 | 1507 |
def get_object_key(self): |
wcs/forms/common.py | ||
---|---|---|
14 | 14 |
# You should have received a copy of the GNU General Public License |
15 | 15 |
# along with this program; if not, see <http://www.gnu.org/licenses/>. |
16 | 16 | |
17 |
import datetime |
|
17 | 18 |
import json |
18 | 19 |
import os |
19 | 20 |
import time |
... | ... | |
21 | 22 | |
22 | 23 |
from quixote import get_publisher, get_request, get_response, get_session, redirect |
23 | 24 |
from quixote.directory import Directory |
25 |
from quixote.errors import RequestError |
|
24 | 26 |
from quixote.html import TemplateIO, htmltext |
25 | 27 |
from quixote.util import randbytes |
26 | 28 | |
... | ... | |
210 | 212 |
if str(self.filled.id) not in [str(x) for x in item_ids]: |
211 | 213 |
raise errors.TraversalError(_('ID not available in filtered view')) |
212 | 214 | |
215 |
values_at = get_request().form.get('at') |
|
216 |
if values_at: |
|
217 |
try: |
|
218 |
values_at = datetime.datetime.fromisoformat(values_at).timetuple() |
|
219 |
except ValueError: |
|
220 |
raise RequestError('Invalid value "%s" for "at"' % values_at) |
|
213 | 221 |
return self.export_to_json( |
214 |
anonymise=anonymise, include_files=get_query_flag('include-files-content', default=True) |
|
222 |
anonymise=anonymise, |
|
223 |
include_files=get_query_flag('include-files-content', default=True), |
|
224 |
values_at=values_at, |
|
215 | 225 |
) |
216 | 226 | |
217 | 227 |
def tempfile(self): |
... | ... | |
421 | 431 |
response.content_type = 'text/plain' |
422 | 432 |
return "Your browser should redirect you" |
423 | 433 | |
424 |
def export_to_json(self, include_files=True, anonymise=False): |
|
434 |
def export_to_json(self, include_files=True, anonymise=False, values_at=None):
|
|
425 | 435 |
get_response().set_content_type('application/json') |
426 |
return self.filled.export_to_json(include_files=include_files, anonymise=anonymise) |
|
436 |
return self.filled.export_to_json( |
|
437 |
include_files=include_files, anonymise=anonymise, values_at=values_at |
|
438 |
) |
|
427 | 439 | |
428 | 440 |
def history(self): |
429 | 441 |
if not self.filled.evolution: |
430 |
- |