From 512f048fc2bdcace1a7bdd740a1a601191af6dfc Mon Sep 17 00:00:00 2001 From: Benjamin Dauvergne Date: Tue, 15 Oct 2019 11:16:42 +0200 Subject: [PATCH 10/11] utils: add conversion from XMLSchema to JSON schema (#35818) We target the Draft 7 jsonschema specification. --- passerelle/utils/xml.py | 205 ++++++++++++++++++++++++++++++++++++++++ setup.py | 1 + tests/data/pacs-doc.xml | 101 ++++++++++++++++++++ tests/test_utils_xml.py | 52 +++++++++- 4 files changed, 358 insertions(+), 1 deletion(-) create mode 100644 tests/data/pacs-doc.xml diff --git a/passerelle/utils/xml.py b/passerelle/utils/xml.py index ebe9213a..64843436 100644 --- a/passerelle/utils/xml.py +++ b/passerelle/utils/xml.py @@ -14,6 +14,12 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . +from collections import OrderedDict +import copy + +import jsonschema +import xmlschema + def text_content(node): '''Extract text content from node and all its children. Equivalent to @@ -81,3 +87,202 @@ def to_json(root): if child_content: d[child.tag].append(child_content) return d + + +class JSONSchemaFromXMLSchema(object): + def __init__(self, xml_schema, root_element): + if not isinstance(xml_schema, xmlschema.XMLSchema): + xml_schema = xmlschema.XMLSchema(xml_schema) + self.xml_schema = xml_schema + self.json_schema = { + 'type': 'object', + 'properties': { + root_element: self.element_to_jsonschema( + xml_schema.elements[root_element]), + }, + 'required': [root_element], + 'additionalProperties': False, + } + + @classmethod + def simpletype_to_jsonschema(cls, simple_type): + assert isinstance(simple_type, xmlschema.validators.XsdSimpleType) + + if isinstance(simple_type, xmlschema.validators.XsdAtomicBuiltin): + if (simple_type.min_length + or simple_type.max_length + or simple_type.white_space not in ('collapse', 'preserve') + or simple_type.patterns): + raise NotImplementedError(simple_type) + + if simple_type.name == xmlschema.qnames.XSD_STRING: + schema = {'type': 'string'} + elif simple_type.name == xmlschema.qnames.XSD_INTEGER: + schema = {'type': 'integer'} + elif simple_type.name == xmlschema.qnames.XSD_BOOLEAN: + schema = {'type': 'boolean'} + elif simple_type.name == xmlschema.qnames.XSD_DOUBLE: + schema = {'type': 'number'} + else: + raise NotImplementedError(simple_type) + return schema + elif isinstance(simple_type, xmlschema.validators.XsdAtomicRestriction): + if (simple_type.white_space not in ('collapse', 'preserve') + or simple_type.patterns): + raise NotImplementedError(simple_type) + schema = OrderedDict(cls.simpletype_to_jsonschema(simple_type.base_type)) + for validator in simple_type.validators: + if isinstance(validator, xmlschema.validators.XsdEnumerationFacets): + schema['enum'] = validator.enumeration + elif (isinstance(validator, xmlschema.validators.XsdMinLengthFacet) + and simple_type.base_type.name == xmlschema.qnames.XSD_STRING): + schema['minLength'] = validator.value + elif (isinstance(validator, xmlschema.validators.XsdMaxLengthFacet) + and simple_type.base_type.name == xmlschema.qnames.XSD_STRING): + schema['maxLength'] = validator.value + elif (isinstance(validator, xmlschema.validators.XsdLengthFacet) + and simple_type.base_type.name == xmlschema.qnames.XSD_STRING): + schema['minLength'] = validator.value + schema['maxLength'] = validator.value + else: + raise NotImplementedError(validator) + return schema + raise NotImplementedError(simple_type) + + @classmethod + def attributegroup_to_jsonschema(cls, attributegroup, schema, required=None): + assert isinstance(attributegroup, xmlschema.validators.XsdAttributeGroup) + + properties = schema.setdefault('properties', OrderedDict()) + for component in attributegroup.iter_component(): + if component.use == 'prohibited': + continue + if required is not None and component.use != 'optional': + if component.name not in schema.get('required', []): + schema.setdefault('required', []).append(component.name) + if component.ref: + raise NotImplementedError(component) + else: + properties[component.name] = cls.simpletype_to_jsonschema(component.type) + + @classmethod + def group_to_alternatives(cls, group, alternatives=None): + alternatives = alternatives or [[]] + + if group.model == 'choice': + cls.choice_to_alternatives(group, alternatives=alternatives) + elif group.model == 'sequence' or group.model == 'all': + cls.sequence_to_alternatives(group, alternatives=alternatives) + else: + raise NotImplementedError(group) + + return alternatives + + @classmethod + def choice_to_alternatives(cls, group, alternatives): + new_alternatives = alternatives + alternatives = list(alternatives) + new_alternatives[:] = [] + + for component in group: + if isinstance(component, xmlschema.validators.XsdElement): + for alternative in alternatives: + alternative = alternative + [component] + new_alternatives.append(alternative) + elif isinstance(component, xmlschema.validators.XsdGroup): + sub_alternatives = [list(alternative) for alternative in alternatives] + cls.group_to_alternatives(component, alternatives=sub_alternatives) + for alternative in sub_alternatives: + new_alternatives.append(alternative) + else: + raise NotImplementedError(component) + + @classmethod + def sequence_to_alternatives(cls, group, alternatives): + for component in group: + if isinstance(component, xmlschema.validators.XsdElement): + for alternative in alternatives: + alternative.append(component) + elif isinstance(component, xmlschema.validators.XsdGroup): + cls.group_to_alternatives(component, alternatives=alternatives) + else: + raise NotImplementedError(component) + + @classmethod + def group_to_jsonschema(cls, group, schema, base_schema=None): + assert isinstance(group, xmlschema.validators.XsdGroup) + + alternatives = cls.group_to_alternatives(group) + + assert len(alternatives) >= 1 and all(len(alternative) >= 1 for alternative in alternatives), alternatives + + def fill_schema_with_alternative(schema, alternative): + for component in alternative: + properties = schema.setdefault('properties', OrderedDict()) + properties[component.name] = cls.element_to_jsonschema(component) + if (component.min_occurs > 0 + and component.name not in schema.get('required', [])): + schema.setdefault('required', []).append(component.name) + + if len(alternatives) == 1: + fill_schema_with_alternative(schema, alternatives[0]) + elif len(alternatives) > 1: + base_schema = copy.deepcopy(schema) + schema.clear() + one_of = [] + schema['oneOf'] = one_of + for alternative in alternatives: + new_schema = copy.deepcopy(base_schema) + fill_schema_with_alternative(new_schema, alternative) + one_of.append(new_schema) + + @classmethod + def type_to_jsonschema(cls, xmltype, depth=0): + assert isinstance(xmltype, xmlschema.validators.XsdType) + + if xmltype.is_simple(): + schema = cls.simpletype_to_jsonschema(xmltype) + if depth == 0: + schema = {'oneOf': [schema, {'type': 'null'}]} + return schema + elif xmltype.has_simple_content(): + base_schema = cls.type_to_jsonschema(xmltype.base_type, depth=depth + 1) + if not xmltype.attributes: + schema = base_schema + else: + cls.attributegroup_to_jsonschema(xmltype.attributes) + schema['properties']['$'] = base_schema + if depth == 0: + schema = {'oneOf': [schema, {'type': 'null'}]} + return schema + else: + if xmltype.has_mixed_content() or not xmltype.is_element_only(): + raise NotImplementedError(xmltype) + + schema = OrderedDict({'type': 'object'}) + schema['additionalProperties'] = False + if xmltype.attributes: + cls.attributegroup_to_jsonschema(schema) + cls.group_to_jsonschema(xmltype.content_type, schema) + return schema + + @classmethod + def element_to_jsonschema(cls, element): + assert isinstance(element, xmlschema.validators.XsdElement) + + is_array = element.max_occurs > 1 or element.max_occurs is None + type_schema = cls.type_to_jsonschema(element.type) + if is_array: + d = { + 'type': 'array', + 'items': type_schema, + 'minItems': element.min_occurs, + } + if element.max_occurs is not None: + d['maxItems'] = element.max_occurs + return d + else: + return type_schema + + def validate(self, instance): + return jsonschema.validate(instance=instance, schema=self.json_schema) diff --git a/setup.py b/setup.py index 60c4fc73..ff87c93d 100755 --- a/setup.py +++ b/setup.py @@ -110,6 +110,7 @@ setup(name='passerelle', 'paramiko', 'pdfrw', 'httplib2', + 'xmlschema', ], cmdclass={ 'build': build, diff --git a/tests/data/pacs-doc.xml b/tests/data/pacs-doc.xml new file mode 100644 index 00000000..cb7a3962 --- /dev/null +++ b/tests/data/pacs-doc.xml @@ -0,0 +1,101 @@ + + + + MME + Doe + Jane + FRA + BHS + BEL + 28 + 01 + 1950 + + ST ETIENNE + 42000 + 42218 + Loire + FRA + + false + true + + 1 rue du test + Appartement, étage, escalier + Résidence, bâtiment ou immeuble + BP1 + 05100 + VILLAR ST PANCRACE + FRA + + mates@entrouvert.com + +33123456789 + true + + + MME + Doe + Jane + BEL + 28 + 01 + 1982 + + CLERMONT FERRAND + 63000 + 63113 + Puy-de-dôme + FRA + + false + true + + 2 rue du test + 05100 + VILLAR ST PANCRACE + FRA + + mates@entrouvert.com + +33123456789 + false + + + + 100000 + legal + + aideFixe + + + + + 3 place du test + 05100 + VILLAR ST PANCRACE + + + + true + true + + + + + + + + + + + + + + + + + + + + + + diff --git a/tests/test_utils_xml.py b/tests/test_utils_xml.py index 9dd5d66b..33f58c72 100644 --- a/tests/test_utils_xml.py +++ b/tests/test_utils_xml.py @@ -1,6 +1,27 @@ +# passerelle - uniform access to multiple data sources and services +# Copyright (C) 2019 Entr'ouvert +# +# This program is free software: you can redistribute it and/or modify it +# under the terms of the GNU Affero General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + import xml.etree.ElementTree as ET -from passerelle.utils.xml import to_json, text_content +import xmlschema + +import jsonschema + +from passerelle.utils.xml import to_json, text_content, JSONSchemaFromXMLSchema +from passerelle.utils.json import flatten_json_schema, flatten, unflatten def test_text_content(): @@ -31,3 +52,32 @@ def test_to_json(): {'text3': '4'}, ] } + + +def test_xmlschema_to_jsonschema(): + schema_path = 'passerelle/apps/sp_fr/depotDossierPACS.XSD' + + # go from XML to JSON, + # convert XMLSchema to JSONSchema + # validate jsonschema, on converted data, + # flatten the JSON schema, + # flatten the data, + # validate flattened data with flatenned JSON schema + # unflatten data + # convert unflattened data to XML + # convert XML to JSON + # then compare to initially converted JSON data + schema = xmlschema.XMLSchema(schema_path, converter=xmlschema.UnorderedConverter) + json_schema = JSONSchemaFromXMLSchema(schema, 'PACS') + d = schema.elements['PACS'].decode(ET.parse('tests/data/pacs-doc.xml').getroot()) + d = {'PACS': d} + json_schema.validate(d) + flattened_json_schema = flatten_json_schema(json_schema.json_schema) + flattened_d = flatten(d) + jsonschema.validate(instance=flattened_d, schema=flattened_json_schema) + d2 = unflatten(d) + json_schema.validate(d2) + + tree = schema.elements['PACS'].encode(d2['PACS']) + d3 = schema.elements['PACS'].decode(tree) + assert d == {'PACS': d3} -- 2.23.0