Projet

Général

Profil

0001-workflow-check-datasource-existence-on-import-48164.patch

Lauréline Guérin, 09 novembre 2020 17:42

Télécharger (22,7 ko)

Voir les différences:

Subject: [PATCH] workflow: check datasource existence on import (#48164)

 tests/test_workflow_import.py | 103 +++++++++++++++++++++++++++++-----
 wcs/admin/forms.py            |   4 +-
 wcs/admin/settings.py         |   5 +-
 wcs/admin/workflows.py        |   4 +-
 wcs/formdef.py                |  95 ++++++++++++++++---------------
 wcs/publisher.py              |   3 +-
 wcs/wf/form.py                |   4 +-
 wcs/workflows.py              |  37 ++++++++----
 8 files changed, 177 insertions(+), 78 deletions(-)
tests/test_workflow_import.py
3 3
import pytest
4 4
import xml.etree.ElementTree as ET
5 5

  
6
from quixote.http_request import Upload
7

  
6 8
from django.utils.six import BytesIO
7 9

  
10
from wcs.carddef import CardDef
8 11
from wcs.formdef import FormDef
9 12
from wcs.mail_templates import MailTemplate
10 13
from wcs.workflows import (
......
21 24
from wcs.wf.redirect_to_url import RedirectToUrlWorkflowStatusItem
22 25
from wcs.wf.create_formdata import CreateFormdataWorkflowStatusItem, Mapping
23 26
from wcs.wf.external_workflow import ExternalWorkflowGlobalAction
24
from wcs.roles import Role
27
from wcs.wf.form import FormWorkflowStatusItem, WorkflowFormFieldsFormDef
28
from wcs.wf.jump import JumpWorkflowStatusItem
25 29
from wcs.fields import StringField, FileField
30
from wcs.qommon.form import UploadedFile
31
from wcs.roles import Role
32
from wcs.workflows import ExportToModel, WorkflowVariablesFieldsFormDef, DisplayMessageWorkflowStatusItem
33

  
26 34

  
27 35
from wcs.qommon.misc import indent_xml as indent
28 36

  
......
110 118
    role.name = 'Test Role'
111 119
    role.store()
112 120

  
113

  
114
    from wcs.wf.dispatch import DispatchWorkflowStatusItem
115

  
116 121
    dispatch = DispatchWorkflowStatusItem()
117 122
    dispatch.id = '_x'
118 123
    dispatch.role_id = 5
......
250 255
    wf = Workflow(name='status')
251 256
    st1 = wf.add_status('Status1', 'st1')
252 257

  
253
    from wcs.wf.form import FormWorkflowStatusItem, WorkflowFormFieldsFormDef
254

  
255 258
    display_form = FormWorkflowStatusItem()
256 259
    display_form.id = '_x'
257 260
    display_form.formdef = WorkflowFormFieldsFormDef(item=display_form)
......
270 273
    wf.store()
271 274
    st1 = wf.add_status('Status1', 'st1')
272 275

  
273
    from quixote.http_request import Upload
274
    from wcs.qommon.form import UploadedFile
275
    from wcs.workflows import ExportToModel
276

  
277 276
    export_to = ExportToModel()
278 277
    export_to.label = 'test'
279 278
    upload = Upload('/foo/bar', content_type='application/vnd.oasis.opendocument.text')
......
326 325
    st1 = wf.add_status('Status1', 'st1')
327 326
    st2 = wf.add_status('Status2', 'st2')
328 327

  
329
    from wcs.wf.jump import JumpWorkflowStatusItem
330 328
    jump = JumpWorkflowStatusItem()
331 329
    jump.id = '_jump'
332 330
    jump.by = ['_submitter', '_receiver']
......
391 389

  
392 390
def test_variables_formdef(pub):
393 391
    wf = Workflow(name='variables')
394
    from wcs.workflows import WorkflowVariablesFieldsFormDef
395 392
    wf.variables_formdef = WorkflowVariablesFieldsFormDef(workflow=wf)
396 393
    wf.variables_formdef.fields.append(StringField(label='Test', type='string'))
397 394
    wf2 = assert_import_export_works(wf)
......
535 532
    wf = Workflow(name='status')
536 533
    st1 = wf.add_status('Status1', 'st1')
537 534

  
538
    from wcs.workflows import DisplayMessageWorkflowStatusItem
539

  
540 535
    display = DisplayMessageWorkflowStatusItem()
541 536
    display.message = 'hey'
542 537
    display.to = ['_submitter', '1']
......
848 843
    export = ET.tostring(wf.export_to_xml(include_id=True))
849 844
    with pytest.raises(WorkflowImportError, match='Unknown referenced mail template'):
850 845
        Workflow.import_from_xml_tree(ET.fromstring(export), include_id=True)
846

  
847

  
848
def test_unknown_data_source(pub):
849
    wf1 = Workflow(name='status')
850
    st1 = wf1.add_status('Status1', 'st1')
851
    display_form = FormWorkflowStatusItem()
852
    display_form.id = '_x'
853
    display_form.formdef = WorkflowFormFieldsFormDef(item=display_form)
854
    display_form.formdef.fields = [StringField(label='Test', type='string', data_source={'type': 'foobar'})]
855
    st1.items.append(display_form)
856
    display_form.parent = st1
857

  
858
    wf2 = Workflow(name='variables')
859
    wf2.variables_formdef = WorkflowVariablesFieldsFormDef(workflow=wf2)
860
    wf2.variables_formdef.fields = [StringField(label='Test', type='string', data_source={'type': 'foobar'})]
861

  
862
    wf3 = Workflow(name='bo fields')
863
    wf3.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(wf3)
864
    wf3.backoffice_fields_formdef.fields = [
865
        StringField(
866
            id='bo1', label='1st backoffice field',
867
            type='string', varname='backoffice_blah',
868
            data_source={'type': 'foobar'})
869
    ]
870

  
871
    for wf in [wf1, wf2, wf3]:
872
        export = ET.tostring(export_to_indented_xml(wf))
873
        with pytest.raises(WorkflowImportError, match='Unknown datasources'):
874
            Workflow.import_from_xml(BytesIO(export))
875

  
876
    # carddef as datasource
877
    CardDef.wipe()
878
    carddef = CardDef()
879
    carddef.name = 'foo'
880
    carddef.fields = [StringField(id='1', label='Test', type='string', varname='foo')]
881
    carddef.store()
882

  
883
    display_form.formdef.fields[0].data_source = {'type': 'carddef:foo'}
884
    wf2.variables_formdef.fields[0].data_source = {'type': 'carddef:foo'}
885
    wf3.backoffice_fields_formdef.fields[0].data_source = {'type': 'carddef:foo'}
886

  
887
    for wf in [wf1, wf2, wf3]:
888
        export = ET.tostring(export_to_indented_xml(wf))
889
        Workflow.import_from_xml(BytesIO(export))
890

  
891
    display_form.formdef.fields[0].data_source = {'type': 'carddef:unknown'}
892
    wf2.variables_formdef.fields[0].data_source = {'type': 'carddef:unknown'}
893
    wf3.backoffice_fields_formdef.fields[0].data_source = {'type': 'carddef:unknown'}
894

  
895
    for wf in [wf1, wf2, wf3]:
896
        export = ET.tostring(export_to_indented_xml(wf))
897
        with pytest.raises(WorkflowImportError, match='Unknown datasources'):
898
            Workflow.import_from_xml(BytesIO(export))
899

  
900
    # carddef custom view as datasource
901
    pub.custom_view_class.wipe()
902
    custom_view = pub.custom_view_class()
903
    custom_view.title = 'card view'
904
    custom_view.formdef = carddef
905
    custom_view.columns = {'list': [{'id': 'id'}]}
906
    custom_view.filters = {}
907
    custom_view.visibility = 'datasource'
908
    custom_view.store()
909

  
910
    display_form.formdef.fields[0].data_source = {'type': 'carddef:foo:card-view'}
911
    wf2.variables_formdef.fields[0].data_source = {'type': 'carddef:foo:card-view'}
912
    wf3.backoffice_fields_formdef.fields[0].data_source = {'type': 'carddef:foo:card-view'}
913

  
914
    for wf in [wf1, wf2, wf3]:
915
        export = ET.tostring(export_to_indented_xml(wf))
916
        Workflow.import_from_xml(BytesIO(export))
917

  
918
    display_form.formdef.fields[0].data_source = {'type': 'carddef:foo:unknown'}
919
    wf2.variables_formdef.fields[0].data_source = {'type': 'carddef:foo:unknown'}
920
    wf3.backoffice_fields_formdef.fields[0].data_source = {'type': 'carddef:foo:unknown'}
921

  
922
    for wf in [wf1, wf2, wf3]:
923
        export = ET.tostring(export_to_indented_xml(wf))
924
        with pytest.raises(WorkflowImportError, match='Unknown datasources'):
925
            Workflow.import_from_xml(BytesIO(export))
wcs/admin/forms.py
1070 1070
            new_formdef = self.formdef_class.import_from_xml(fp, include_id=True)
1071 1071
        except FormdefImportError as e:
1072 1072
            error = True
1073
            reason = _(e.msg)
1073
            reason = _(e.msg) % e.msg_args
1074 1074
            if e.details:
1075 1075
                reason += ' [%s]' % e.details
1076 1076
        except ValueError:
......
1738 1738
                get_session().message = ('info', _(self.import_error_message))
1739 1739
        except FormdefImportError as e:
1740 1740
            error = True
1741
            reason = _(e.msg)
1741
            reason = _(e.msg) % e.msg_args
1742 1742
            if e.details:
1743 1743
                reason += ' [%s]' % e.details
1744 1744
        except ValueError:
wcs/admin/settings.py
1015 1015
                reason = _('Not a valid export file')
1016 1016
            except WorkflowImportError as e:
1017 1017
                results = None
1018
                reason = _('Failed to import a workflow (%s); site import did not complete.') % (_(e) % e.msg_args)
1018
                msg = _(e.msg) % e.msg_args
1019
                if e.details:
1020
                    msg += ' [%s]' % e.details
1021
                reason = _('Failed to import a workflow (%s); site import did not complete.') % (msg)
1019 1022
            html_top('settings', title = _('Import'))
1020 1023
            r = TemplateIO(html=True)
1021 1024
            r += htmltext('<h2>%s</h2>') % _('Import')
wcs/admin/workflows.py
1973 1973
            workflow = Workflow.import_from_xml(fp)
1974 1974
        except WorkflowImportError as e:
1975 1975
            error = True
1976
            reason = _(e) % e.msg_args
1976
            reason = _(e.msg) % e.msg_args
1977
            if e.details:
1978
                reason += ' [%s]' % e.details
1977 1979
        except ValueError:
1978 1980
            error = True
1979 1981

  
wcs/formdef.py
54 54

  
55 55

  
56 56
class FormdefImportError(Exception):
57
    def __init__(self, msg, details=None):
57
    def __init__(self, msg, msg_args=None, details=None):
58 58
        self.msg = msg
59
        self.msg_args = msg_args or ()
59 60
        self.details = details
60 61

  
61 62

  
......
1030 1031
    @classmethod
1031 1032
    def import_from_xml(cls, fd, charset=None, include_id=False,
1032 1033
            fix_on_error=False, check_datasources=True):
1033
        from wcs.carddef import CardDef
1034 1034
        try:
1035 1035
            tree = ET.parse(fd)
1036 1036
        except:
1037 1037
            raise ValueError()
1038
        formdef = cls.import_from_xml_tree(tree, charset=charset,
1039
                include_id=include_id, fix_on_error=fix_on_error)
1038
        formdef = cls.import_from_xml_tree(
1039
            tree, charset=charset,
1040
            include_id=include_id, fix_on_error=fix_on_error,
1041
            check_datasources=check_datasources)
1040 1042

  
1041 1043
        if formdef.url_name:
1042 1044
            try:
......
1052 1054
            if formdef.max_field_id < max_field_id:
1053 1055
                formdef.max_field_id = max_field_id
1054 1056

  
1055
        if check_datasources:
1056
            # check if datasources are defined
1057
            unknown_datasources = set()
1058
            for field in formdef.fields:
1059
                data_source = getattr(field, 'data_source', None)
1060
                if data_source:
1061
                    data_source_id = data_source.get('type')
1062
                    if isinstance(data_sources.get_object(data_source),
1063
                                  data_sources.StubNamedDataSource):
1064
                        unknown_datasources.add(data_source_id)
1065
                    elif data_source_id and data_source_id.startswith('carddef:'):
1066
                        parts = data_source_id.split(':')
1067
                        # check if carddef exists
1068
                        url_name = parts[1]
1069
                        if formdef.xml_root_node == 'carddef' and formdef.url_name == url_name:
1070
                            # reference to itself, it's ok
1071
                            continue
1072
                        try:
1073
                            CardDef.get_by_urlname(url_name)
1074
                        except KeyError:
1075
                            unknown_datasources.add(data_source_id)
1076
                            continue
1077

  
1078
                        if len(parts) == 2:
1079
                            continue
1080

  
1081
                        lookup_criterias = [
1082
                            Equal('formdef_type', 'carddef'),
1083
                            Equal('visibility', 'datasource'),
1084
                            Equal('slug', parts[2]),
1085
                        ]
1086
                        try:
1087
                            get_publisher().custom_view_class.select(lookup_criterias)[0]
1088
                        except IndexError:
1089
                            unknown_datasources.add(data_source_id)
1090

  
1091
            if unknown_datasources:
1092
                raise FormdefImportError(N_('Unknown datasources'),
1093
                        details=', '.join(sorted(unknown_datasources)))
1094

  
1095 1057
        # check if all field id are unique
1096 1058
        known_field_ids = set()
1097 1059
        for field in formdef.fields:
......
1103 1065

  
1104 1066
    @classmethod
1105 1067
    def import_from_xml_tree(cls, tree, include_id=False, charset=None,
1106
            fix_on_error=False, snapshot=False):
1068
            fix_on_error=False, snapshot=False, check_datasources=True):
1069
        from wcs.carddef import CardDef
1070

  
1107 1071
        if charset is None:
1108 1072
            charset = get_publisher().site_charset
1109 1073
        assert charset == 'utf-8'
......
1262 1226
            for child in node:
1263 1227
                formdef.required_authentication_contexts.append(str(child.text))
1264 1228

  
1229
        if check_datasources:
1230
            # check if datasources are defined
1231
            unknown_datasources = set()
1232
            for field in formdef.fields:
1233
                data_source = getattr(field, 'data_source', None)
1234
                if data_source:
1235
                    data_source_id = data_source.get('type')
1236
                    if isinstance(data_sources.get_object(data_source),
1237
                                  data_sources.StubNamedDataSource):
1238
                        unknown_datasources.add(data_source_id)
1239
                    elif data_source_id and data_source_id.startswith('carddef:'):
1240
                        parts = data_source_id.split(':')
1241
                        # check if carddef exists
1242
                        url_name = parts[1]
1243
                        if formdef.xml_root_node == 'carddef' and formdef.url_name == url_name:
1244
                            # reference to itself, it's ok
1245
                            continue
1246
                        try:
1247
                            CardDef.get_by_urlname(url_name)
1248
                        except KeyError:
1249
                            unknown_datasources.add(data_source_id)
1250
                            continue
1251

  
1252
                        if len(parts) == 2:
1253
                            continue
1254

  
1255
                        lookup_criterias = [
1256
                            Equal('formdef_type', 'carddef'),
1257
                            Equal('visibility', 'datasource'),
1258
                            Equal('slug', parts[2]),
1259
                        ]
1260
                        try:
1261
                            get_publisher().custom_view_class.select(lookup_criterias)[0]
1262
                        except IndexError:
1263
                            unknown_datasources.add(data_source_id)
1264

  
1265
            if unknown_datasources:
1266
                raise FormdefImportError(
1267
                    N_('Unknown datasources'),
1268
                    details=', '.join(sorted(unknown_datasources)))
1269

  
1265 1270
        return formdef
1266 1271

  
1267 1272
    def get_detailed_email_form(self, formdata, url):
wcs/publisher.py
240 240
        from wcs.workflows import Workflow
241 241
        for f in z.namelist():
242 242
            if os.path.dirname(f) == 'workflows_xml' and os.path.basename(f):
243
                workflow = Workflow.import_from_xml(z.open(f), include_id=True)
243
                workflow = Workflow.import_from_xml(
244
                    z.open(f), include_id=True, check_datasources=False)
244 245
                workflow.store()
245 246
                results['workflows'] += 1
246 247

  
wcs/wf/form.py
139 139
            fields.append(field.export_to_xml(charset=charset, include_id=include_id))
140 140
        return item
141 141

  
142
    def init_with_xml(self, elem, charset, include_id=False, snapshot=False):
142
    def init_with_xml(self, elem, charset, include_id=False, snapshot=False, check_datasources=True):
143 143
        WorkflowStatusItem.init_with_xml(self, elem, charset)
144 144
        el = elem.find('formdef')
145 145
        if el is None:
146 146
            return
147 147
        # we can always include id in the formdef export as it lives in
148 148
        # a different space, isolated from other formdefs.
149
        imported_formdef = FormDef.import_from_xml_tree(el, include_id=True, snapshot=snapshot)
149
        imported_formdef = FormDef.import_from_xml_tree(el, include_id=True, snapshot=snapshot, check_datasources=check_datasources)
150 150
        self.formdef = WorkflowFormFieldsFormDef(item=self)
151 151
        self.formdef.fields = imported_formdef.fields
152 152
        if self.formdef.max_field_id is None and self.formdef.fields:
wcs/workflows.py
44 44
from .conditions import Condition
45 45
from .roles import Role, logged_users_role, get_user_roles
46 46
from .fields import FileField
47
from .formdef import FormDef
47
from .formdef import FormDef, FormdefImportError
48 48
from .carddef import CardDef
49 49
from .formdata import Evolution
50 50
from .mail_templates import MailTemplate
......
94 94

  
95 95

  
96 96
class WorkflowImportError(Exception):
97
    def __init__(self, msg, msg_args=None):
98
        super(WorkflowImportError, self).__init__(msg)
97
    def __init__(self, msg, msg_args=None, details=None):
98
        self.msg = msg
99 99
        self.msg_args = msg_args or ()
100
        self.details = details
100 101

  
101 102

  
102 103
class AbortActionException(Exception):
......
624 625
        return root
625 626

  
626 627
    @classmethod
627
    def import_from_xml(cls, fd, include_id=False):
628
    def import_from_xml(cls, fd, include_id=False, check_datasources=True):
628 629
        try:
629 630
            tree = ET.parse(fd)
630 631
        except:
631 632
            raise ValueError()
632
        return cls.import_from_xml_tree(tree, include_id=include_id)
633
        return cls.import_from_xml_tree(tree, include_id=include_id, check_datasources=check_datasources)
633 634

  
634 635
    @classmethod
635
    def import_from_xml_tree(cls, tree, include_id=False, snapshot=False):
636
    def import_from_xml_tree(cls, tree, include_id=False, snapshot=False, check_datasources=True):
636 637
        charset = get_publisher().site_charset
637 638
        workflow = cls()
638 639
        if tree.find('name') is None or not tree.find('name').text:
......
666 667
        for status in tree.find('possible_status'):
667 668
            status_o = WorkflowStatus()
668 669
            status_o.parent = workflow
669
            status_o.init_with_xml(status, charset, include_id=include_id, snapshot=snapshot)
670
            try:
671
                status_o.init_with_xml(
672
                    status, charset, include_id=include_id, snapshot=snapshot, check_datasources=check_datasources)
673
            except FormdefImportError as e:
674
                raise WorkflowImportError(e.msg, details=e.details)
670 675
            workflow.possible_status.append(status_o)
671 676

  
672 677
        workflow.global_actions = []
......
689 694
        variables = tree.find('variables')
690 695
        if variables is not None:
691 696
            formdef = variables.find('formdef')
692
            imported_formdef = FormDef.import_from_xml_tree(formdef, include_id=True, snapshot=snapshot)
697
            try:
698
                imported_formdef = FormDef.import_from_xml_tree(
699
                    formdef, include_id=True, snapshot=snapshot, check_datasources=check_datasources)
700
            except FormdefImportError as e:
701
                raise WorkflowImportError(e.msg, details=e.details)
693 702
            workflow.variables_formdef = WorkflowVariablesFieldsFormDef(workflow=workflow)
694 703
            workflow.variables_formdef.fields = imported_formdef.fields
695 704

  
696 705
        variables = tree.find('backoffice-fields')
697 706
        if variables is not None:
698 707
            formdef = variables.find('formdef')
699
            imported_formdef = FormDef.import_from_xml_tree(formdef, include_id=True, snapshot=snapshot)
708
            try:
709
                imported_formdef = FormDef.import_from_xml_tree(
710
                    formdef, include_id=True, snapshot=snapshot, check_datasources=check_datasources)
711
            except FormdefImportError as e:
712
                raise WorkflowImportError(e.msg, details=e.details)
700 713
            workflow.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(workflow=workflow)
701 714
            workflow.backoffice_fields_formdef.fields = imported_formdef.fields
702 715

  
......
930 943
                    el.text = str(val)
931 944
        return node
932 945

  
933
    def init_with_xml(self, elem, charset, include_id=False, snapshot=False):
946
    def init_with_xml(self, elem, charset, include_id=False, snapshot=False, check_datasources=True):
934 947
        if include_id and elem.attrib.get('id'):
935 948
            self.id = elem.attrib.get('id')
936 949
        for attribute in self.get_parameters():
......
1757 1770
                include_id=include_id))
1758 1771
        return status
1759 1772

  
1760
    def init_with_xml(self, elem, charset, include_id=False, snapshot=False):
1773
    def init_with_xml(self, elem, charset, include_id=False, snapshot=False, check_datasources=True):
1761 1774
        self.id = xml_node_text(elem.find('id'))
1762 1775
        self.name = xml_node_text(elem.find('name'))
1763 1776
        if elem.find('colour') is not None:
......
1779 1792
            self.append_item(item_type)
1780 1793
            item_o = self.items[-1]
1781 1794
            item_o.parent = self
1782
            item_o.init_with_xml(item, charset, include_id=include_id, snapshot=snapshot)
1795
            item_o.init_with_xml(item, charset, include_id=include_id, snapshot=snapshot, check_datasources=check_datasources)
1783 1796

  
1784 1797
    def __repr__(self):
1785 1798
        return '<%s %s %r>' % (self.__class__.__name__, self.id, self.name)
1786
-