Projet

Général

Profil

0010-add-utils-for-parsing-XSD-files-31595.patch

Benjamin Dauvergne, 19 avril 2019 14:58

Télécharger (12,5 ko)

Voir les différences:

Subject: [PATCH 10/11] add utils for parsing XSD files (#31595)

 passerelle/utils/xsd.py | 318 ++++++++++++++++++++++++++++++++++++++++
 1 file changed, 318 insertions(+)
 create mode 100644 passerelle/utils/xsd.py
passerelle/utils/xsd.py
1
# passerelle - uniform access to multiple data sources and services
2
# Copyright (C) 2019 Entr'ouvert
3
#
4
# This program is free software: you can redistribute it and/or modify it
5
# under the terms of the GNU Affero General Public License as published
6
# by the Free Software Foundation, either version 3 of the License, or
7
# (at your option) any later version.
8
#
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
# GNU Affero General Public License for more details.
13
#
14
# You should have received a copy of the GNU Affero General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

  
17
import datetime
18

  
19
from django.utils import six
20

  
21
import isodate
22
from lxml import etree as ET
23
from zeep.utils import qname_attr
24

  
25

  
26
def parse_bool(boolean):
27
    return boolean.lower() == 'true'
28

  
29

  
30
def parse_date(date):
31
    if isinstance(date, datetime.date):
32
        return date
33
    return datetime.datetime.strptime('%Y-%m-%d', date).date()
34

  
35

  
36
XSD = 'http://www.w3.org/2001/XMLSchema'
37
ns = {'xsd': XSD}
38

  
39
SCHEMA = ET.QName(XSD, 'schema')
40
ANNOTATION = ET.QName(XSD, 'annotation')
41
ELEMENT = ET.QName(XSD, 'element')
42
ATTRIBUTE = ET.QName(XSD, 'attribute')
43
COMPLEX_TYPE = ET.QName(XSD, 'complexType')
44
SIMPLE_TYPE = ET.QName(XSD, 'simpleType')
45
COMPLEX_CONTENT = ET.QName(XSD, 'complexContent')
46
EXTENSION = ET.QName(XSD, 'extension')
47
RESTRICTION = ET.QName(XSD, 'restriction')
48
SEQUENCE = ET.QName(XSD, 'sequence')
49
CHOICE = ET.QName(XSD, 'choice')
50
ALL = ET.QName(XSD, 'all')
51
BOOLEAN = ET.QName(XSD, 'boolean')
52
STRING = ET.QName(XSD, 'string')
53
DATE = ET.QName(XSD, 'date')
54
INT = ET.QName(XSD, 'int')
55
INTEGER = ET.QName(XSD, 'integer')
56
DATE_TIME = ET.QName(XSD, 'dateTime')
57
ANY_TYPE = ET.QName(XSD, 'anyType')
58

  
59
TYPE_CASTER = {
60
    BOOLEAN: parse_bool,
61
    STRING: six.text_type,
62
    DATE: parse_date,
63
    INT: int,
64
    INTEGER: int,
65
    DATE_TIME: isodate.parse_datetime,
66
    ANY_TYPE: lambda v: v
67
}
68

  
69

  
70
class Schema(object):
71
    def __init__(self):
72
        self.types = {}
73
        self.elements = {}
74
        self.target_namespace = None
75
        self.element_form_default = 'qualified'
76
        self.attribute_form_default = 'unqualified'
77
        self.nsmap = {}
78

  
79
    def visit(self, root):
80
        assert root.tag == SCHEMA
81
        assert set(root.attrib) <= set(['targetNamespace', 'elementFormDefault', 'attributeFormDefault']), (
82
            'unsupported schema attributes %s' % root.attrib)
83
        self.target_namespace = root.get('targetNamespace')
84
        self.element_form_default = root.get('elementFormDefault', self.element_form_default)
85
        self.attribute_form_default = root.get('attributeFormDefault', self.attribute_form_default)
86
        self.nsmap = root.nsmap
87
        self.reverse_nsmap = {value: key for key, value in self.nsmap.items()}
88

  
89
        # first pass
90
        for node in root:
91
            if node.tag == COMPLEX_TYPE:
92
                name = qname_attr(node, 'name')
93
                assert name, 'unsupported top complexType without name'
94
                self.types[name] = {}
95
            elif node.tag == ELEMENT:
96
                name = qname_attr(node, 'name')
97
                assert name, 'unsupported top element without name'
98
                self.elements[name] = {}
99
            elif node.tag == SIMPLE_TYPE:
100
                name = qname_attr(node, 'name')
101
                assert name, 'unsupported top simpleType without name'
102
                self.types[name] = {}
103
            else:
104
                raise NotImplementedError('unsupported top element %s' % node)
105

  
106
        # second pass
107
        for node in root:
108
            if node.tag == COMPLEX_TYPE:
109
                d = self.visit_complex_type(node)
110
                target = self.types
111
            elif node.tag == SIMPLE_TYPE:
112
                d = self.visit_simple_type(node)
113
                target = self.types
114
            elif node.tag == ELEMENT:
115
                d = self.visit_element(node)
116
                target = self.elements
117
            else:
118
                raise NotImplementedError
119
            if not d['name'].namespace:
120
                d['name'] = ET.QName(self.target_namespace, d['name'].localname)
121
            target[d['name']] = d
122

  
123
    def visit_simple_type(self, node):
124
        # ignore annotations
125
        children = [child for child in node if child.tag != ANNOTATION]
126
        d = {}
127
        name = qname_attr(node, 'name')
128
        if name:
129
            d['name'] = name
130
        assert len(children) == 1, list(node)
131
        assert children[0].tag == RESTRICTION
132
        xsd_type = qname_attr(children[0], 'base')
133
        assert xsd_type == STRING
134
        d['type'] = STRING
135
        return d
136

  
137
    def visit_complex_content(self, node):
138
        d = {}
139
        name = qname_attr(node, 'name')
140
        if name:
141
            d['name'] = name
142
        assert len(node) == 1
143
        assert node[0].tag == EXTENSION
144
        xsd_type = qname_attr(node[0], 'base')
145
        d['type'] = xsd_type
146
        return d
147

  
148
    def visit_complex_type(self, node):
149
        # ignore annotations
150
        children = [child for child in node if child.tag != ANNOTATION]
151
        if children and children[0].tag in (SEQUENCE, CHOICE, ALL, COMPLEX_CONTENT):
152
            if children[0].tag == SEQUENCE:
153
                d = self.visit_sequence(children[0])
154
            elif children[0].tag == CHOICE:
155
                d = self.visit_choice(children[0])
156
            elif children[0].tag == ALL:
157
                d = self.visit_all(children[0])
158
            elif children[0].tag == COMPLEX_CONTENT:
159
                d = self.visit_complex_content(children[0])
160
            children = children[1:]
161
        else:
162
            d = {}
163
        for child in children:
164
            assert child.tag == ATTRIBUTE, 'unsupported complexType with child %s' % child
165
            name = qname_attr(child, 'name')
166
            assert name, 'attribute without a name %s' % ET.tostring(child)
167
            assert set(child.attrib) <= set(['use', 'type', 'name']), child.attrib
168
            attributes = d.setdefault('attributes', {})
169
            xsd_type = qname_attr(child, 'type')
170
            attributes[name] = {
171
                'name': name,
172
                'use': child.get('use', 'optional'),
173
                'type': xsd_type,
174
            }
175

  
176
        name = qname_attr(node, 'name')
177
        if name:
178
            d['name'] = name
179
        return d
180

  
181
    def visit_element(self, node, top=False):
182
        # ignore annotations
183
        assert set(node.attrib.keys()) <= set(['name', 'type', 'minOccurs', 'maxOccurs']), node.attrib
184
        children = [child for child in node if child.tag != ANNOTATION]
185
        # we handle elements with a name and one child, an anonymous complex type
186
        # or element without children referencing a complex type
187
        name = qname_attr(node, 'name')
188
        assert name is not None
189
        min_occurs = node.attrib.get('minOccurs') or 1
190
        max_occurs = node.attrib.get('maxOccurs') or 1
191
        d = {
192
            'name': name,
193
            'min_occurs': int(min_occurs),
194
            'max_occurs': max_occurs if max_occurs == 'unbounded' else int(max_occurs),
195
        }
196
        if len(children) == 1:
197
            ctype_node = children[0]
198
            assert ctype_node.tag == COMPLEX_TYPE
199
            assert ctype_node.attrib == {}
200
            d.update(self.visit_complex_type(ctype_node))
201
            return d
202
        elif len(children) == 0:
203
            xsd_type = qname_attr(node, 'type')
204
            if xsd_type is None:
205
                xsd_type = STRING
206
            d['type'] = xsd_type
207
            return d
208
        else:
209
            raise NotImplementedError('unsupported element with more than one children %s' % list(node))
210

  
211
    def visit_sequence(self, node):
212
        assert set(node.attrib) <= set(['maxOccurs']), node.attrib
213
        sequence = []
214

  
215
        for element_node in node:
216
            assert element_node.tag in(ELEMENT, CHOICE), (
217
                'unsupported sequence with child not an element or a choice %s' % ET.tostring(element_node))
218
            if element_node.tag == ELEMENT:
219
                sequence.append(self.visit_element(element_node))
220
            elif element_node.tag == CHOICE:
221
                sequence.append(self.visit_choice(element_node))
222

  
223
        d = {
224
            'sequence': sequence,
225
        }
226
        if 'maxOccurs' in node.attrib:
227
            d['max_occurs'] = node.get('maxOccurs', 1)
228
        return d
229

  
230
    def visit_all(self, node):
231
        return self.visit_sequence(node)
232

  
233
    def visit_choice(self, node):
234
        assert node.attrib == {}, 'unsupported choice with attributes %s' % node.attrib
235
        choice = []
236

  
237
        for element_node in node:
238
            assert element_node.tag == ELEMENT, 'unsupported sequence with child not an element %s' % node
239
            choice.append(self.visit_element(element_node))
240

  
241
        return {'choice': choice}
242

  
243
    def qname_display(self, name):
244
        if name.namespace in self.reverse_nsmap:
245
            name = '%s:%s' % (self.reverse_nsmap[name.namespace],
246
                              name.localname)
247
        return six.text_type(name)
248

  
249
    def paths(self):
250
        roots = sorted(self.elements.keys())
251

  
252
        def helper(path, ctype, is_type=False):
253
            name = None
254
            if 'name' in ctype:
255
                name = ctype['name']
256
            max_occurs = ctype.get('max_occurs', 1)
257
            max_occurs = 2 if max_occurs == 'unbounded' else max_occurs
258
            if 'type' in ctype:
259
                if name and not is_type:
260
                    path = path + [name]
261
                xsd_type = ctype['type']
262
                if xsd_type in self.types:
263
                    sub_type = self.types[xsd_type]
264
                    for subpath in helper(path, sub_type, is_type=True):
265
                        yield subpath
266
                else:
267
                    if max_occurs > 1:
268
                        for i in range(max_occurs):
269
                            yield path[:-1] + [ET.QName(name.namespace, name.localname + '_%d' % (i + 1))], xsd_type
270
                    yield path, xsd_type
271
            else:
272
                for extension in (['']
273
                                  if max_occurs == 1
274
                                  else [''] + ['_%s' % i for i in list(range(1, max_occurs + 1))]):
275
                    new_path = path
276
                    if name and not is_type:
277
                        new_path = new_path + [ET.QName(name.namespace, name.localname + extension)]
278
                    if 'sequence' in ctype:
279
                        for sub_ctype in ctype['sequence']:
280
                            for subpath in helper(new_path, sub_ctype):
281
                                yield subpath
282
                    elif 'choice' in ctype:
283
                        for sub_ctype in ctype['choice']:
284
                            for subpath in helper(new_path, sub_ctype):
285
                                yield subpath
286

  
287
        for root in roots:
288
            for path in helper([], self.elements[root]):
289
                yield path
290

  
291

  
292
@six.python_2_unicode_compatible
293
class Path(object):
294
    def __init__(self, path, xsd_type):
295
        assert path
296
        self.path = path
297
        self.xsd_type = xsd_type
298
        try:
299
            self.caster = TYPE_CASTER[xsd_type]
300
        except KeyError:
301
            raise KeyError(six.text_type(xsd_type))
302

  
303
    def resolve(self, root):
304
        def helper(node, path):
305
            if not path:
306
                return node
307
            else:
308
                for child in node:
309
                    if child.tag == path[0]:
310
                        return helper(child, path[1:])
311
        if root.tag != self.path[0]:
312
            return None
313
        child = helper(root, self.path[1:])
314
        if child is not None and child.text and not list(child):
315
            return self.caster(child.text)
316

  
317
    def __str__(self):
318
        return '.'.join(six.text_type(name) for name in self.path)
0
-