Projet

Général

Profil

0010-utils-add-conversion-from-XMLSchema-to-JSON-schema-3.patch

Benjamin Dauvergne, 05 novembre 2019 19:22

Télécharger (15,9 ko)

Voir les différences:

Subject: [PATCH 10/11] utils: add conversion from XMLSchema to JSON schema
 (#35818)

We target the Draft 7 jsonschema specification.
 passerelle/utils/xml.py | 205 ++++++++++++++++++++++++++++++++++++++++
 setup.py                |   1 +
 tests/data/pacs-doc.xml | 101 ++++++++++++++++++++
 tests/test_utils_xml.py |  52 +++++++++-
 4 files changed, 358 insertions(+), 1 deletion(-)
 create mode 100644 tests/data/pacs-doc.xml
passerelle/utils/xml.py
14 14
# You should have received a copy of the GNU Affero General Public License
15 15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 16

  
17
from collections import OrderedDict
18
import copy
19

  
20
import jsonschema
21
import xmlschema
22

  
17 23

  
18 24
def text_content(node):
19 25
    '''Extract text content from node and all its children. Equivalent to
......
81 87
                if child_content:
82 88
                    d[child.tag].append(child_content)
83 89
    return d
90

  
91

  
92
class JSONSchemaFromXMLSchema(object):
93
    def __init__(self, xml_schema, root_element):
94
        if not isinstance(xml_schema, xmlschema.XMLSchema):
95
            xml_schema = xmlschema.XMLSchema(xml_schema)
96
        self.xml_schema = xml_schema
97
        self.json_schema = {
98
            'type': 'object',
99
            'properties': {
100
                root_element: self.element_to_jsonschema(
101
                    xml_schema.elements[root_element]),
102
            },
103
            'required': [root_element],
104
            'additionalProperties': False,
105
        }
106

  
107
    @classmethod
108
    def simpletype_to_jsonschema(cls, simple_type):
109
        assert isinstance(simple_type, xmlschema.validators.XsdSimpleType)
110

  
111
        if isinstance(simple_type, xmlschema.validators.XsdAtomicBuiltin):
112
            if (simple_type.min_length
113
                    or simple_type.max_length
114
                    or simple_type.white_space not in ('collapse', 'preserve')
115
                    or simple_type.patterns):
116
                raise NotImplementedError(simple_type)
117

  
118
            if simple_type.name == xmlschema.qnames.XSD_STRING:
119
                schema = {'type': 'string'}
120
            elif simple_type.name == xmlschema.qnames.XSD_INTEGER:
121
                schema = {'type': 'integer'}
122
            elif simple_type.name == xmlschema.qnames.XSD_BOOLEAN:
123
                schema = {'type': 'boolean'}
124
            elif simple_type.name == xmlschema.qnames.XSD_DOUBLE:
125
                schema = {'type': 'number'}
126
            else:
127
                raise NotImplementedError(simple_type)
128
            return schema
129
        elif isinstance(simple_type, xmlschema.validators.XsdAtomicRestriction):
130
            if (simple_type.white_space not in ('collapse', 'preserve')
131
                    or simple_type.patterns):
132
                raise NotImplementedError(simple_type)
133
            schema = OrderedDict(cls.simpletype_to_jsonschema(simple_type.base_type))
134
            for validator in simple_type.validators:
135
                if isinstance(validator, xmlschema.validators.XsdEnumerationFacets):
136
                    schema['enum'] = validator.enumeration
137
                elif (isinstance(validator, xmlschema.validators.XsdMinLengthFacet)
138
                      and simple_type.base_type.name == xmlschema.qnames.XSD_STRING):
139
                    schema['minLength'] = validator.value
140
                elif (isinstance(validator, xmlschema.validators.XsdMaxLengthFacet)
141
                      and simple_type.base_type.name == xmlschema.qnames.XSD_STRING):
142
                    schema['maxLength'] = validator.value
143
                elif (isinstance(validator, xmlschema.validators.XsdLengthFacet)
144
                      and simple_type.base_type.name == xmlschema.qnames.XSD_STRING):
145
                    schema['minLength'] = validator.value
146
                    schema['maxLength'] = validator.value
147
                else:
148
                    raise NotImplementedError(validator)
149
            return schema
150
        raise NotImplementedError(simple_type)
151

  
152
    @classmethod
153
    def attributegroup_to_jsonschema(cls, attributegroup, schema, required=None):
154
        assert isinstance(attributegroup, xmlschema.validators.XsdAttributeGroup)
155

  
156
        properties = schema.setdefault('properties', OrderedDict())
157
        for component in attributegroup.iter_component():
158
            if component.use == 'prohibited':
159
                continue
160
            if required is not None and component.use != 'optional':
161
                if component.name not in schema.get('required', []):
162
                    schema.setdefault('required', []).append(component.name)
163
            if component.ref:
164
                raise NotImplementedError(component)
165
            else:
166
                properties[component.name] = cls.simpletype_to_jsonschema(component.type)
167

  
168
    @classmethod
169
    def group_to_alternatives(cls, group, alternatives=None):
170
        alternatives = alternatives or [[]]
171

  
172
        if group.model == 'choice':
173
            cls.choice_to_alternatives(group, alternatives=alternatives)
174
        elif group.model == 'sequence' or group.model == 'all':
175
            cls.sequence_to_alternatives(group, alternatives=alternatives)
176
        else:
177
            raise NotImplementedError(group)
178

  
179
        return alternatives
180

  
181
    @classmethod
182
    def choice_to_alternatives(cls, group, alternatives):
183
        new_alternatives = alternatives
184
        alternatives = list(alternatives)
185
        new_alternatives[:] = []
186

  
187
        for component in group:
188
            if isinstance(component, xmlschema.validators.XsdElement):
189
                for alternative in alternatives:
190
                    alternative = alternative + [component]
191
                    new_alternatives.append(alternative)
192
            elif isinstance(component, xmlschema.validators.XsdGroup):
193
                sub_alternatives = [list(alternative) for alternative in alternatives]
194
                cls.group_to_alternatives(component, alternatives=sub_alternatives)
195
                for alternative in sub_alternatives:
196
                    new_alternatives.append(alternative)
197
            else:
198
                raise NotImplementedError(component)
199

  
200
    @classmethod
201
    def sequence_to_alternatives(cls, group, alternatives):
202
        for component in group:
203
            if isinstance(component, xmlschema.validators.XsdElement):
204
                for alternative in alternatives:
205
                    alternative.append(component)
206
            elif isinstance(component, xmlschema.validators.XsdGroup):
207
                cls.group_to_alternatives(component, alternatives=alternatives)
208
            else:
209
                raise NotImplementedError(component)
210

  
211
    @classmethod
212
    def group_to_jsonschema(cls, group, schema, base_schema=None):
213
        assert isinstance(group, xmlschema.validators.XsdGroup)
214

  
215
        alternatives = cls.group_to_alternatives(group)
216

  
217
        assert len(alternatives) >= 1 and all(len(alternative) >= 1 for alternative in alternatives), alternatives
218

  
219
        def fill_schema_with_alternative(schema, alternative):
220
            for component in alternative:
221
                properties = schema.setdefault('properties', OrderedDict())
222
                properties[component.name] = cls.element_to_jsonschema(component)
223
                if (component.min_occurs > 0
224
                        and component.name not in schema.get('required', [])):
225
                    schema.setdefault('required', []).append(component.name)
226

  
227
        if len(alternatives) == 1:
228
            fill_schema_with_alternative(schema, alternatives[0])
229
        elif len(alternatives) > 1:
230
            base_schema = copy.deepcopy(schema)
231
            schema.clear()
232
            one_of = []
233
            schema['oneOf'] = one_of
234
            for alternative in alternatives:
235
                new_schema = copy.deepcopy(base_schema)
236
                fill_schema_with_alternative(new_schema, alternative)
237
                one_of.append(new_schema)
238

  
239
    @classmethod
240
    def type_to_jsonschema(cls, xmltype, depth=0):
241
        assert isinstance(xmltype, xmlschema.validators.XsdType)
242

  
243
        if xmltype.is_simple():
244
            schema = cls.simpletype_to_jsonschema(xmltype)
245
            if depth == 0:
246
                schema = {'oneOf': [schema, {'type': 'null'}]}
247
            return schema
248
        elif xmltype.has_simple_content():
249
            base_schema = cls.type_to_jsonschema(xmltype.base_type, depth=depth + 1)
250
            if not xmltype.attributes:
251
                schema = base_schema
252
            else:
253
                cls.attributegroup_to_jsonschema(xmltype.attributes)
254
                schema['properties']['$'] = base_schema
255
            if depth == 0:
256
                schema = {'oneOf': [schema, {'type': 'null'}]}
257
            return schema
258
        else:
259
            if xmltype.has_mixed_content() or not xmltype.is_element_only():
260
                raise NotImplementedError(xmltype)
261

  
262
            schema = OrderedDict({'type': 'object'})
263
            schema['additionalProperties'] = False
264
            if xmltype.attributes:
265
                cls.attributegroup_to_jsonschema(schema)
266
            cls.group_to_jsonschema(xmltype.content_type, schema)
267
            return schema
268

  
269
    @classmethod
270
    def element_to_jsonschema(cls, element):
271
        assert isinstance(element, xmlschema.validators.XsdElement)
272

  
273
        is_array = element.max_occurs > 1 or element.max_occurs is None
274
        type_schema = cls.type_to_jsonschema(element.type)
275
        if is_array:
276
            d = {
277
                'type': 'array',
278
                'items': type_schema,
279
                'minItems': element.min_occurs,
280
            }
281
            if element.max_occurs is not None:
282
                d['maxItems'] = element.max_occurs
283
            return d
284
        else:
285
            return type_schema
286

  
287
    def validate(self, instance):
288
        return jsonschema.validate(instance=instance, schema=self.json_schema)
setup.py
110 110
            'paramiko',
111 111
            'pdfrw',
112 112
            'httplib2',
113
            'xmlschema',
113 114
        ],
114 115
        cmdclass={
115 116
            'build': build,
tests/data/pacs-doc.xml
1
<?xml version="1.0" encoding="UTF-8" ?>
2
<PACS>
3
	<partenaire1>
4
		<civilite>MME</civilite>
5
        <nomNaissance>Doe</nomNaissance>
6
   		<prenoms>Jane</prenoms>
7
   		<codeNationalite>FRA</codeNationalite>
8
   		<codeNationalite>BHS</codeNationalite>
9
   		<codeNationalite>BEL</codeNationalite>
10
   		<jourNaissance>28</jourNaissance>
11
   		<moisNaissance>01</moisNaissance>
12
   		<anneeNaissance>1950</anneeNaissance>
13
   		<LieuNaissance>
14
			<localite>ST ETIENNE</localite>
15
			<codePostal>42000</codePostal>
16
			<codeInsee>42218</codeInsee>
17
			<departement>Loire</departement>
18
			<codePays>FRA</codePays>
19
		</LieuNaissance>
20
   		<ofpra>false</ofpra>
21
   		<mesureJuridique>true</mesureJuridique>
22
   		<adressePostale>
23
   					<NumeroLibelleVoie>1 rue du test</NumeroLibelleVoie>
24
		<Complement1>Appartement, étage, escalier</Complement1>
25
		<Complement2>Résidence, bâtiment ou immeuble</Complement2>
26
		<LieuDitBpCommuneDeleguee>BP1</LieuDitBpCommuneDeleguee>
27
		<CodePostal>05100</CodePostal>
28
		<Localite>VILLAR ST PANCRACE</Localite>
29
		<Pays>FRA</Pays>
30
   		</adressePostale>
31
   		<adresseElectronique>mates@entrouvert.com</adresseElectronique>
32
   		<telephone>+33123456789</telephone>
33
   				<titreIdentiteVerifie>true</titreIdentiteVerifie>
34
	</partenaire1>
35
	<partenaire2>
36
		<civilite>MME</civilite>
37
        <nomNaissance>Doe</nomNaissance>
38
   		<prenoms>Jane</prenoms>
39
   		<codeNationalite>BEL</codeNationalite>
40
   		   		   		<jourNaissance>28</jourNaissance>
41
   		<moisNaissance>01</moisNaissance>
42
   		<anneeNaissance>1982</anneeNaissance>
43
   		<LieuNaissance>
44
			<localite>CLERMONT FERRAND</localite>
45
			<codePostal>63000</codePostal>
46
			<codeInsee>63113</codeInsee>
47
			<departement>Puy-de-dôme</departement>
48
			<codePays>FRA</codePays>
49
		</LieuNaissance>
50
   		<ofpra>false</ofpra>
51
   		<mesureJuridique>true</mesureJuridique>
52
   		<adressePostale>
53
   					<NumeroLibelleVoie>2 rue du test</NumeroLibelleVoie>
54
								<CodePostal>05100</CodePostal>
55
		<Localite>VILLAR ST PANCRACE</Localite>
56
		<Pays>FRA</Pays>
57
   		</adressePostale>
58
   		<adresseElectronique>mates@entrouvert.com</adresseElectronique>
59
   		<telephone>+33123456789</telephone>
60
   				<titreIdentiteVerifie>false</titreIdentiteVerifie>
61
	</partenaire2>
62
	<convention>
63
						<conventionType>
64
			<aideMaterielMontant>100000</aideMaterielMontant>
65
			<regimePacs>legal</regimePacs>
66
			<aideMateriel>
67
				<typeAideMateriel>aideFixe</typeAideMateriel>
68
			</aideMateriel>
69
		</conventionType>
70
			</convention>
71
	<residenceCommune>
72
								<NumeroLibelleVoie>3 place du test</NumeroLibelleVoie>
73
								<CodePostal>05100</CodePostal>
74
		<Localite>VILLAR ST PANCRACE</Localite>
75
		<Pays></Pays>
76
	</residenceCommune>
77
	<attestationHonneur>
78
		<nonParente>true</nonParente>
79
		<residenceCommune>true</residenceCommune>
80
	</attestationHonneur>
81

  
82
</PACS>
83

  
84

  
85

  
86

  
87

  
88

  
89

  
90

  
91

  
92

  
93

  
94

  
95

  
96

  
97

  
98

  
99

  
100

  
101

  
tests/test_utils_xml.py
1
# passerelle - uniform access to multiple data sources and services
2
# Copyright (C) 2019 Entr'ouvert
3
#
4
# This program is free software: you can redistribute it and/or modify it
5
# under the terms of the GNU Affero General Public License as published
6
# by the Free Software Foundation, either version 3 of the License, or
7
# (at your option) any later version.
8
#
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
# GNU Affero General Public License for more details.
13
#
14
# You should have received a copy of the GNU Affero General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

  
1 17
import xml.etree.ElementTree as ET
2 18

  
3
from passerelle.utils.xml import to_json, text_content
19
import xmlschema
20

  
21
import jsonschema
22

  
23
from passerelle.utils.xml import to_json, text_content, JSONSchemaFromXMLSchema
24
from passerelle.utils.json import flatten_json_schema, flatten, unflatten
4 25

  
5 26

  
6 27
def test_text_content():
......
31 52
            {'text3': '4'},
32 53
        ]
33 54
    }
55

  
56

  
57
def test_xmlschema_to_jsonschema():
58
    schema_path = 'passerelle/apps/sp_fr/depotDossierPACS.XSD'
59

  
60
    # go from XML to JSON,
61
    # convert XMLSchema to JSONSchema
62
    # validate jsonschema, on converted data,
63
    # flatten the JSON schema,
64
    # flatten the data,
65
    # validate flattened data with flatenned JSON schema
66
    # unflatten data
67
    # convert unflattened data to XML
68
    # convert XML to JSON
69
    # then compare to initially converted JSON data
70
    schema = xmlschema.XMLSchema(schema_path, converter=xmlschema.UnorderedConverter)
71
    json_schema = JSONSchemaFromXMLSchema(schema, 'PACS')
72
    d = schema.elements['PACS'].decode(ET.parse('tests/data/pacs-doc.xml').getroot())
73
    d = {'PACS': d}
74
    json_schema.validate(d)
75
    flattened_json_schema = flatten_json_schema(json_schema.json_schema)
76
    flattened_d = flatten(d)
77
    jsonschema.validate(instance=flattened_d, schema=flattened_json_schema)
78
    d2 = unflatten(d)
79
    json_schema.validate(d2)
80

  
81
    tree = schema.elements['PACS'].encode(d2['PACS'])
82
    d3 = schema.elements['PACS'].decode(tree)
83
    assert d == {'PACS': d3}
34
-