From 4268c8823902e0add4a91a02bc4f7beb3e6a0168 Mon Sep 17 00:00:00 2001 From: Benjamin Dauvergne Date: Thu, 26 Sep 2019 22:38:52 +0200 Subject: [PATCH 2/2] mdel: improve code style (#36471) --- passerelle/apps/mdel/mdel.py | 62 +++--- passerelle/apps/mdel/models.py | 357 +++++++++++++++++++-------------- passerelle/apps/mdel/utils.py | 11 +- tests/test_mdel.py | 98 +++++---- 4 files changed, 286 insertions(+), 242 deletions(-) diff --git a/passerelle/apps/mdel/mdel.py b/passerelle/apps/mdel/mdel.py index 02bc4ac4..974da004 100644 --- a/passerelle/apps/mdel/mdel.py +++ b/passerelle/apps/mdel/mdel.py @@ -1,5 +1,6 @@ -# Passerelle - uniform access to data and services -# Copyright (C) 2016 Entr'ouvert +# coding: utf-8 +# passerelle - uniform access to data and services- +# Copyright (C) 2019 Entr'ouvert # # This program is free software: you can redistribute it and/or modify it # under the terms of the GNU Affero General Public License as published @@ -14,6 +15,8 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . +from __future__ import unicode_literals + import os import base64 import datetime @@ -30,12 +33,7 @@ from utils import ElementFactory from passerelle.utils.jsonresponse import APIError -def get_resource_base_dir(): - return default_storage.path('mdel') - - class AttachedFile(object): - def __init__(self, code, filename, b64_content): if code not in ('JI', 'JD'): raise APIError('%s is not a valid code (JI or JD)' % code) @@ -51,17 +49,15 @@ class AttachedFile(object): class MDELBase(object): - def to_string(self): raw_string = etree.tostring(self.xml, encoding='utf-8') parsed_string = minidom.parseString(raw_string) return parsed_string.toprettyxml(indent='\t') - def save(self, subfolder, filename): + def save(self, directory, filename): """Save object as xml file """ - folder = os.path.join(get_resource_base_dir(), subfolder) - path = os.path.join(folder, filename) + path = os.path.join(directory, filename) default_storage.save(path, ContentFile(self.to_string())) @@ -247,23 +243,43 @@ class Data(MDELBase): return data + @classmethod + def path_to_xml(cls, path, value, parent): + '''Resolve an XML `path` starting from `parent` and the target node content to `value`.''' + + if isinstance(path, list) and len(path) > 1: + parent_child = parent.find(path[0]) + if parent_child is not None: + element = parent_child + path.pop(0) + else: + element = ElementFactory(path.pop(0)) + element.append(cls.path_to_xml(path, value, element), allow_new=False) + else: + element = ElementFactory(path[0], text=value) + return element + @property def xml(self): - elements = [] + '''Generate an XML document applying values from `self.data` to `self.mapping`''' + # extract text content from data for each path in mapping + elements = [] for key, path in self.mapping: if key in self.data: elements.append((path, self.data[key])) + # recursively create XML nodes using self.path_to_xml() root = ElementFactory(self.root_element, **self.root_attributes) - for path, value in elements: path_splitted = path.split('_') - root.append(json_to_xml(path_splitted, value, root), allow_new=False) + root.append(self.path_to_xml(path_splitted, value, root), allow_new=False) return root + class ILEData(Data): + '''Démarche inscription sur les listes électorales''' mapping = [ ('nom_famille', 'Inscription_Electeur_Noms_NomFamille'), @@ -359,6 +375,7 @@ class ILEData(Data): class AECData(Data): + '''Démarche acte d'état civil''' mapping = [ ('aec_type_raw', 'DemandeActe_TypeActe_Code'), @@ -444,20 +461,3 @@ class AECData(Data): self.root_attributes = {'canal_utilise': '0'} super(AECData, self).__init__(demand_id, data) - - -def json_to_xml(path, value, parent): - - if isinstance(path, list) and len(path) > 1: - parent_child = parent.find(path[0]) - if parent_child is not None: - element = parent_child - path.pop(0) - else: - element = ElementFactory(path.pop(0)) - - element.append(json_to_xml(path, value, element), allow_new=False) - else: - element = ElementFactory(path[0], text=value) - - return element diff --git a/passerelle/apps/mdel/models.py b/passerelle/apps/mdel/models.py index 7ef8a9db..069e0873 100644 --- a/passerelle/apps/mdel/models.py +++ b/passerelle/apps/mdel/models.py @@ -1,5 +1,5 @@ -# -*- coding: utf-8 -*- -# Passerelle - uniform access to data and services +# coding: utf-8 +# passerelle - uniform access to data and services # Copyright (C) 2016 Entr'ouvert # # This program is free software: you can redistribute it and/or modify it @@ -15,11 +15,17 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . +from __future__ import unicode_literals + import os import json +import shutil +import tempfile -from django.db import models +from django.core.files.storage import default_storage +from django.db import models, transaction from django.utils.translation import ugettext_lazy as _ +from django.utils import six from passerelle.base.models import BaseResource from passerelle.utils.api import endpoint @@ -30,7 +36,7 @@ from . import mdel from .utils import zipdir, get_file_content_from_zip, parse_date -DEMAND_TYPES = ['ILE-LA', 'RCO-LA', 'AEC-LA'] +DEMAND_TYPES = ['ILE-LA', 'AEC-LA'] STATUS_MAPPING = { '100': 'closed', @@ -82,6 +88,34 @@ class MDEL(BaseResource): def get_verbose_name(cls): return cls._meta.verbose_name + @property + def base_dir(self): + path = os.path.join(default_storage.path('mdel'), self.slug) + if not os.path.exists(path): + os.makedirs(path) + return path + + @property + def tmp_dir(self): + path = os.path.join(self.base_dir, 'tmp') + if not os.path.exists(path): + os.makedirs(path) + return path + + @property + def inputs_dir(self): + path = os.path.join(self.base_dir, 'inputs') + if not os.path.exists(path): + os.makedirs(path) + return path + + @property + def outputs_dir(self): + path = os.path.join(self.base_dir, 'outputs') + if not os.path.exists(path): + os.makedirs(path) + return path + @endpoint(perm='can_access', methods=['post']) def create(self, request, *args, **kwargs): """Create a demand @@ -101,7 +135,7 @@ class MDEL(BaseResource): demand_type = formdata.pop('demand_type', '').upper() if demand_type not in DEMAND_TYPES: - raise APIError('demand_type must be : %r' % DEMAND_TYPES) + raise APIError('demand_type must be one of : %s' % ', '.join(DEMAND_TYPES)) if 'display_id' not in formdata: raise APIError('display_id is required') @@ -111,15 +145,26 @@ class MDEL(BaseResource): demand_id = formdata.pop('display_id') - demand, created = Demand.objects.get_or_create(num=demand_id, flow_type=demand_type, resource=self) - if not created: - demand.step += 1 - - demand.create_zip(formdata) - - demand.save() - - return {'data': {'demand_id': demand.demand_id}} + demand_dir = tempfile.mkdtemp(dir=self.tmp_dir) + try: + with transaction.atomic(): + # prevent collision between concurrent requests + demand, created = Demand.objects.select_for_update().get_or_create( + num=demand_id, flow_type=demand_type, resource=self) + if not created: + demand.step += 1 + + zip_path = demand.create_zip(demand_dir, formdata) + demand.save() + os.renames(zip_path, os.path.join(self.inputs_dir, demand.name + '.zip')) + finally: + shutil.rmtree(demand_dir) + + return { + 'data': { + 'demand_id': demand.demand_id + } + } @endpoint(perm='can_access') def status(self, request, *args, **kwargs): @@ -129,17 +174,20 @@ class MDEL(BaseResource): if not demand_id: raise APIError('demand_id is required') - demand = Demand.objects.get(demand_id=demand_id, resource=self) + with transaction.atomic(): + demand = self.demand_set.select_for_update().get(demand_id=demand_id) + status = demand.get_status() + demand.save() - status = demand.get_status() - - demand.save() return {'data': status} @endpoint(perm='can_access') def applicants(self, request, without=''): - return {'data': [item for item in APPLICANTS - if item.get('id') not in without.split(',')]} + return { + 'data': [ + item for item in APPLICANTS if item.get('id') not in without.split(',') + ] + } @endpoint(perm='can_access') def certificates(self, request): @@ -147,10 +195,14 @@ class MDEL(BaseResource): @endpoint(name='certificate-types', perm='can_access') def certificate_types(self, request, without=''): - return {'data': [item for item in CERTIFICATE_TYPES - if item.get('id') not in without.split(',')]} + return { + 'data': [ + item for item in CERTIFICATE_TYPES if item.get('id') not in without.split(',') + ] + } +@six.python_2_unicode_compatible class Demand(models.Model): created_at = models.DateTimeField(auto_now_add=True) updated_at = models.DateTimeField(auto_now=True) @@ -162,7 +214,7 @@ class Demand(models.Model): step = models.IntegerField(default=0) demand_id = models.CharField(max_length=128, null=True) - def __unicode__(self): + def __str__(self): return '%s - %s - %s' % (self.resource.slug, self.demand_id, self.status) class Meta: @@ -180,7 +232,125 @@ class Demand(models.Model): def filename(self): return '%s.zip' % self.name - def create_zip(self, formdata): + def create_zip_ile_la(self, formdata, attached_files, inputs_dir): + proofs = [('JI', 'justificatif_identite'), ('JD', 'justificatif_domicile')] + + for proof_code, proof_attribute in proofs: + + documents = [value for key, value in formdata.items() + if (key.startswith(proof_attribute) + and isinstance(value, dict) + and 'filename' in value + and 'content' in value)] + if not documents: + raise APIError('%s and all its attributes are required' % proof_attribute) + for document in documents: + filename, b64_content = document.get('filename'), document.get('content') + attached_files.append(mdel.AttachedFile(proof_code, filename, b64_content)) + + # process address additional information + adresse_complement = [] + + complement_keys = sorted([key for key in formdata if key.startswith('adresse_complement')]) + + for key in complement_keys: + adresse_complement.append(formdata[key]) + + if adresse_complement: + formdata['adresse_complement'] = ', '.join(adresse_complement) + + # get contact info + contacts = [('TEL', 'contact_telephone'), ('EMAIL', 'contact_email')] + + for contact_code, contact_uri in contacts: + uri = formdata.get(contact_uri) + if uri: + formdata['contact_uri'] = uri + formdata['contact_code'] = contact_code + + # mdel protocol only supports two values here (prem & cci) + # but we may want the form to have more specific values; + # we name them (prem|cci)_suffix and squeeze the suffix here. + if not formdata.get('anterieur_situation_raw', None): + raise APIError('anterieur_situation_raw is required') + + formdata['anterieur_situation_raw'] = formdata.get('anterieur_situation_raw').split('_')[0] + + doc = mdel.ILEData(self.demand_id, formdata) + doc.save(inputs_dir) + + for attached_file in attached_files: + attached_file.save(inputs_dir) + + def create_zip_aec_la(self, formdata, attached_files, inputs_dir): + # Set date format + if formdata.get('aec_type_raw') != 'NAISSANCE' and not formdata.get('date_acte'): + raise APIError(' is required') + if formdata.get('date_acte'): + formdata['date_acte'] = parse_date(formdata['date_acte']) + if formdata.get('titulaire_naiss_date'): + formdata['titulaire_naiss_date'] = parse_date(formdata['titulaire_naiss_date']) + if formdata.get('titulaire2_naiss_date'): + formdata['titulaire2_naiss_date'] = parse_date(formdata['titulaire2_naiss_date']) + + # Ensuring that all titles are uppercase + for key in formdata.keys(): + if 'civilite' in key: + formdata[key] = formdata[key].upper() + + # Merging street number and street name + demandeur_adresse_voie = formdata.get('demandeur_adresse_voie') + demandeur_adresse_num = formdata.get('demandeur_adresse_num') + if demandeur_adresse_voie and demandeur_adresse_num: + formdata['demandeur_adresse_voie'] = '%s %s' % (demandeur_adresse_num, demandeur_adresse_voie) + + # Set foreign address if country is not France + adresse_keys = ['etage', 'batiment', 'voie', 'code_postal', 'ville'] + adresse_keys = ['demandeur_adresse_%s' % key for key in adresse_keys] + demandeur_adresse_pays_raw = formdata.get('demandeur_adresse_pays_raw') + demandeur_adresse_etrangere = formdata.get('demandeur_adresse_etrangere') + demandeur_adresse_etrangere_pays_raw = formdata.get('demandeur_adresse_etrangere_pays_raw') + if (demandeur_adresse_etrangere_pays_raw + or (demandeur_adresse_pays_raw and demandeur_adresse_pays_raw != 'FRA')): + formdata.pop('demandeur_adresse_pays_raw', None) + if not demandeur_adresse_etrangere_pays_raw: + formdata['demandeur_adresse_etrangere_pays_raw'] = demandeur_adresse_pays_raw + if demandeur_adresse_etrangere: + # dismiss french address if the foreign one is filled + for key in adresse_keys: + if key in formdata: + del formdata[key] + else: + # build foreign address from french address fields + adresse_etrangere = [] + for key in adresse_keys: + value = formdata.pop(key, '') + if value: + if key != 'demandeur_adresse_ville': + adresse_etrangere.append(value) + else: + adresse_etrangere[-1] += ' %s' % value + formdata['demandeur_adresse_etrangere'] = ', '.join(adresse_etrangere) + + # Set aec_nature if aec_type_raw == DECES + if formdata.get('aec_type_raw') == 'DECES' and not formdata.get('aec_nature_raw'): + formdata['aec_nature'] = u'Copie integrale' + formdata['aec_nature_raw'] = 'COPIE-INTEGRALE' + + # Set motif_demand if none + if not formdata.get('motif_demande'): + formdata['motif_demande'] = 'Autre' + formdata['motif_demande_raw'] = 'Autre' + + # handling requester when 'Autre' + if formdata.get('qualite_demandeur_autre') and not formdata.get('qualite_demandeur_raw'): + formdata['qualite_demandeur'] = formdata['qualite_demandeur_autre'] + formdata['qualite_demandeur_raw'] = 'Autre' + + doc = mdel.AECData(self.demand_id, formdata) + doc.save(inputs_dir) + + def create_zip(self, inputs_dir, formdata): """ Creates demand as zip folder """ @@ -189,131 +359,14 @@ class Demand(models.Model): flow_type = self.flow_type demand_num = self.num - resource_base_dir = mdel.get_resource_base_dir() - - inputs_dir = os.path.join(resource_base_dir, self.resource.slug, - 'inputs', self.name) - attached_files = [] - if flow_type == 'ILE-LA': - - proofs = [('JI', 'justificatif_identite'), ('JD', 'justificatif_domicile')] - - for proof_code, proof_attribute in proofs: - - documents = [value for key, value in formdata.items() - if key.startswith(proof_attribute) and isinstance(value, dict) and 'filename' in value and 'content' in value] - if not documents: - raise APIError('%s and all its attributes are required' % proof_attribute) - for document in documents: - filename, b64_content = document.get('filename'), document.get('content') - attached_files.append(mdel.AttachedFile(proof_code, filename, b64_content)) - - # process address additional information - adresse_complement = [] - - complement_keys = sorted([key for key in formdata if key.startswith('adresse_complement')]) - - for key in complement_keys: - adresse_complement.append(formdata[key]) - - if adresse_complement: - formdata['adresse_complement'] = ', '.join(adresse_complement) - - # get contact info - contacts = [('TEL', 'contact_telephone'), ('EMAIL', 'contact_email')] - - for contact_code, contact_uri in contacts: - uri = formdata.get(contact_uri) - if uri: - formdata['contact_uri'] = uri - formdata['contact_code'] = contact_code - - # mdel protocol only supports two values here (prem & cci) - # but we may want the form to have more specific values; - # we name them (prem|cci)_suffix and squeeze the suffix here. - if not formdata.get('anterieur_situation_raw', None): - raise APIError('anterieur_situation_raw is required') - - formdata['anterieur_situation_raw'] = formdata.get('anterieur_situation_raw').split('_')[0] - - doc = mdel.ILEData(self.demand_id, formdata) - doc.save(inputs_dir) - - for attached_file in attached_files: - attached_file.save(inputs_dir) - - elif flow_type == 'RCO-LA': - raise APIError('RCO-LA processing not implemented') - - else: - # Set date format - if formdata.get('aec_type_raw') != 'NAISSANCE' and not formdata.get('date_acte'): - raise APIError(' is required') - if formdata.get('date_acte'): - formdata['date_acte'] = parse_date(formdata['date_acte']) - if formdata.get('titulaire_naiss_date'): - formdata['titulaire_naiss_date'] = parse_date(formdata['titulaire_naiss_date']) - if formdata.get('titulaire2_naiss_date'): - formdata['titulaire2_naiss_date'] = parse_date(formdata['titulaire2_naiss_date']) - - # Ensuring that all titles are uppercase - for key in formdata.keys(): - if 'civilite' in key: - formdata[key] = formdata[key].upper() - - # Merging street number and street name - demandeur_adresse_voie = formdata.get('demandeur_adresse_voie') - demandeur_adresse_num = formdata.get('demandeur_adresse_num') - if demandeur_adresse_voie and demandeur_adresse_num: - formdata['demandeur_adresse_voie'] = '%s %s' % (demandeur_adresse_num, demandeur_adresse_voie) - - # Set foreign address if country is not France - adresse_keys = ['etage', 'batiment', 'voie', 'code_postal', 'ville'] - adresse_keys = ['demandeur_adresse_%s' % key for key in adresse_keys] - demandeur_adresse_pays_raw = formdata.get('demandeur_adresse_pays_raw') - demandeur_adresse_etrangere = formdata.get('demandeur_adresse_etrangere') - demandeur_adresse_etrangere_pays_raw = formdata.get('demandeur_adresse_etrangere_pays_raw') - if (demandeur_adresse_etrangere_pays_raw - or (demandeur_adresse_pays_raw and demandeur_adresse_pays_raw != 'FRA')): - formdata.pop('demandeur_adresse_pays_raw', None) - if not demandeur_adresse_etrangere_pays_raw: - formdata['demandeur_adresse_etrangere_pays_raw'] = demandeur_adresse_pays_raw - if demandeur_adresse_etrangere: - # dismiss french address if the foreign one is filled - for key in adresse_keys: - if key in formdata: - del formdata[key] - else: - # build foreign address from french address fields - adresse_etrangere = [] - for key in adresse_keys: - value = formdata.pop(key, '') - if value: - if key != 'demandeur_adresse_ville': - adresse_etrangere.append(value) - else: - adresse_etrangere[-1] += ' %s' % value - formdata['demandeur_adresse_etrangere'] = ', '.join(adresse_etrangere) - - # Set aec_nature if aec_type_raw == DECES - if formdata.get('aec_type_raw') == 'DECES' and not formdata.get('aec_nature_raw'): - formdata['aec_nature'] = u'Copie integrale' - formdata['aec_nature_raw'] = 'COPIE-INTEGRALE' - - # Set motif_demand if none - if not formdata.get('motif_demande'): - formdata['motif_demande'] = 'Autre' - formdata['motif_demande_raw'] = 'Autre' - - # handling requester when 'Autre' - if formdata.get('qualite_demandeur_autre') and not formdata.get('qualite_demandeur_raw'): - formdata['qualite_demandeur'] = formdata['qualite_demandeur_autre'] - formdata['qualite_demandeur_raw'] = 'Autre' - - doc = mdel.AECData(self.demand_id, formdata) - doc.save(inputs_dir) + # create main document + try: + method = getattr(self, 'create_zip_%s' % flow_type.lower().replace('-', '_')) + except AttributeError: + raise APIError('flow type %s is unsupported' % flow_type) + method(formdata, attached_files, inputs_dir=inputs_dir) submission_date = formdata.get('receipt_time', None) @@ -339,15 +392,9 @@ class Demand(models.Model): namespace = {'ns2': 'http://finances.gouv.fr/dgme/pec/message/v1'} - resource_base_dir = mdel.get_resource_base_dir() - output_dir = os.path.join(resource_base_dir, self.resource.slug, 'outputs') - - if not os.path.exists(output_dir): - os.makedirs(output_dir) - # list all demand response zip files zip_files = {} - for zfile in os.listdir(output_dir): + for zfile in os.listdir(self.resource.outputs_dir): if zfile.lower().startswith(self.demand_id.lower()) and zfile.lower().endswith('.zip'): fname = os.path.splitext(zfile)[0] try: @@ -363,7 +410,7 @@ class Demand(models.Model): else: return result - path = os.path.join(output_dir, zip_file) + path = os.path.join(self.resource.outputs_dir, zip_file) content = get_file_content_from_zip(path, 'message.xml') element = mdel.etree.fromstring(content) diff --git a/passerelle/apps/mdel/utils.py b/passerelle/apps/mdel/utils.py index 4fc5589e..ad8b4ed9 100644 --- a/passerelle/apps/mdel/utils.py +++ b/passerelle/apps/mdel/utils.py @@ -1,4 +1,4 @@ -# Passerelle - uniform access to data and services +# passerelle - uniform access to data and services # Copyright (C) 2016 Entr'ouvert # # This program is free software: you can redistribute it and/or modify it @@ -14,6 +14,8 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . +from __future__ import unicode_literals + import os import zipfile from xml.etree import ElementTree as etree @@ -22,18 +24,18 @@ from django.utils.dateparse import parse_date as django_parse_date from passerelle.utils.jsonresponse import APIError + def parse_date(date): try: parsed_date = django_parse_date(date) except ValueError as e: - raise APIError('Invalid date: %r (%r)' % ( date, e)) + raise APIError('invalid date: %s (%s)' % (date, e)) if not parsed_date: raise APIError('date %r not iso-formated' % date) return parsed_date.isoformat() class ElementFactory(etree.Element): - def __init__(self, *args, **kwargs): self.text = kwargs.pop('text', None) namespace = kwargs.pop('namespace', None) @@ -46,16 +48,13 @@ class ElementFactory(etree.Element): super(ElementFactory, self).__init__(*args, **kwargs) def append(self, element, allow_new=True): - if not allow_new: if isinstance(element.tag, etree.QName): found = self.find(element.tag.text) else: found = self.find(element.tag) - if found is not None: return self - super(ElementFactory, self).append(element) return self diff --git a/tests/test_mdel.py b/tests/test_mdel.py index c16b64a9..32c83a2a 100644 --- a/tests/test_mdel.py +++ b/tests/test_mdel.py @@ -17,19 +17,21 @@ from __future__ import unicode_literals -import shutil +import contextlib import os import json import base64 import copy - +import zipfile from xml.etree import ElementTree as etree + +from distutils.dir_util import copy_tree from lxml import etree as letree import pytest from passerelle.apps.mdel.models import MDEL, Demand -from passerelle.apps.mdel.mdel import Message, Description, AttachedFile, get_resource_base_dir +from passerelle.apps.mdel.mdel import Message, Description, AttachedFile from passerelle.apps.mdel.utils import parse_date import utils @@ -52,7 +54,7 @@ def generate_xsd_fixture(path): xsd = letree.parse(path).getroot() schema = letree.XMLSchema(xsd) parser = letree.XMLParser(schema=schema) - letree.parse(doc, parser=parser) # Will raise an exception if schema not respected + return letree.parse(doc, parser=parser).getroot() # Will raise an exception if schema not respected return validate_schema return fixture @@ -71,6 +73,21 @@ def mdel(db): return utils.setup_access_rights(MDEL.objects.create(slug='test')) +@contextlib.contextmanager +def get_zip(mdel, base): + with zipfile.ZipFile( + os.path.join( + mdel.inputs_dir, + base + '.zip')) as zip_fd: + yield zip_fd + + +@contextlib.contextmanager +def get_inputs(mdel, base, name): + with get_zip(mdel, base) as zip_fd: + yield zip_fd.open(name) + + def test_message(): ns = {'mdel': 'http://finances.gouv.fr/dgme/pec/message/v1'} xmlns = 'http://finances.gouv.fr/dgme/gf/composants/teledemarchexml/donnee/metier' @@ -105,13 +122,6 @@ def test_description(): assert xml.find('Teledemarche/IdentifiantPlateforme').text == '1' -def test_invalid_demand_type(app, ile_payload): - ILE_PAYLOAD_INVALID = copy.deepcopy(ile_payload) - ILE_PAYLOAD_INVALID['extra']['demand_type'] = 'whatever' - resp = app.post_json('/mdel/test/create', params=ILE_PAYLOAD_INVALID, status=200) - assert resp.json['err_desc'] == "demand_type must be : ['ILE-LA', 'RCO-LA', 'AEC-LA']" - - def test_invalid_demand_no_form_number(app, ile_payload): ILE_PAYLOAD_INVALID_NO = copy.deepcopy(ile_payload) ILE_PAYLOAD_INVALID_NO.pop('display_id') @@ -123,18 +133,16 @@ def test_unsupported_demand_type(app, ile_payload): RCO_PAYLOAD = copy.deepcopy(ile_payload) RCO_PAYLOAD['extra']['demand_type'] = 'rco-la' resp = app.post_json('/mdel/test/create', params=RCO_PAYLOAD, status=200) - assert resp.json['err_desc'] == "RCO-LA processing not implemented" + assert resp.json['err_desc'] == "demand_type must be one of : ILE-LA, AEC-LA" -def test_aec_naissance(app, aec_schema, aec_naissance_payload): +def test_aec_naissance(app, mdel, aec_schema, aec_naissance_payload): resp = app.post_json('/mdel/test/create', params=aec_naissance_payload, status=200) assert resp.json['data']['demand_id'] == '15-4-AEC-LA' - doc = os.path.join(get_resource_base_dir(), 'test', 'inputs', '15-4-AEC-LA--0', '15-4-AEC-LA-doc-.xml') - aec_schema(doc) - - root = etree.parse(doc).getroot() + with get_inputs(mdel, '15-4-AEC-LA--0', '15-4-AEC-LA-doc-.xml') as fd: + root = aec_schema(fd) assert root.tag == 'EnveloppeMetierType' @@ -178,15 +186,13 @@ def test_aec_naissance(app, aec_schema, aec_naissance_payload): assert root.find('DemandeActe/Titulaire/Filiation/Pere/Prenoms').text == 'Fritz' -def test_aec_naissance_etranger(app, aec_schema, aec_naissance_etranger_payload): +def test_aec_naissance_etranger(app, mdel, aec_schema, aec_naissance_etranger_payload): resp = app.post_json('/mdel/test/create', params=aec_naissance_etranger_payload, status=200) assert resp.json['data']['demand_id'] == '1-4-AEC-LA' - doc = os.path.join(get_resource_base_dir(), 'test', 'inputs', '1-4-AEC-LA--0', '1-4-AEC-LA-doc-.xml') - aec_schema(doc) - - root = etree.parse(doc).getroot() + with get_inputs(mdel, '1-4-AEC-LA--0', '1-4-AEC-LA-doc-.xml') as fd: + root = aec_schema(fd) assert root.tag == 'EnveloppeMetierType' @@ -197,16 +203,13 @@ def test_aec_naissance_etranger(app, aec_schema, aec_naissance_etranger_payload) == '4 rue des coquelicots, 3800 Bern') -def test_aec_mariage(app, aec_schema, aec_mariage_payload): +def test_aec_mariage(app, mdel, aec_schema, aec_mariage_payload): resp = app.post_json('/mdel/test/create', params=aec_mariage_payload, status=200) assert resp.json['data']['demand_id'] == '16-1-AEC-LA' - doc = os.path.join(get_resource_base_dir(), 'test', 'inputs', '16-1-AEC-LA--0', '16-1-AEC-LA-doc-.xml') - - aec_schema(doc) - - root = etree.parse(doc).getroot() + with get_inputs(mdel, '16-1-AEC-LA--0', '16-1-AEC-LA-doc-.xml') as fd: + root = aec_schema(fd) assert root.tag == 'EnveloppeMetierType' @@ -255,15 +258,13 @@ def test_aec_mariage(app, aec_schema, aec_mariage_payload): assert root.find('DemandeActe/Titulaire2/Filiation/Pere/Prenoms').text == 'Pascal' -def test_aec_deces(app, aec_schema, aec_deces_payload): +def test_aec_deces(app, mdel, aec_schema, aec_deces_payload): resp = app.post_json('/mdel/test/create', params=aec_deces_payload, status=200) assert resp.json['data']['demand_id'] == '17-1-AEC-LA' - doc = os.path.join(get_resource_base_dir(), 'test', 'inputs', '17-1-AEC-LA--0', '17-1-AEC-LA-doc-.xml') - aec_schema(doc) - - root = etree.parse(doc).getroot() + with get_inputs(mdel, '17-1-AEC-LA--0', '17-1-AEC-LA-doc-.xml') as fd: + root = aec_schema(fd) assert root.tag == 'EnveloppeMetierType' @@ -309,16 +310,12 @@ def test_aec_without_date_acte(app, aec_deces_payload): assert resp.json['err_desc'] == ' is required' -def test_ile(app, ile_schema, ile_payload): +def test_ile(app, mdel, ile_schema, ile_payload): resp = app.post_json('/mdel/test/create', params=ile_payload, status=200) assert resp.json['data']['demand_id'] == '1-14-ILE-LA' - base_doc = os.path.join(get_resource_base_dir(), 'test', 'inputs', '1-14-ILE-LA--0') - doc = os.path.join(base_doc, '1-14-ILE-LA-doc-.xml') - - ile_schema(doc) - - root = etree.parse(doc).getroot() + with get_inputs(mdel, '1-14-ILE-LA--0', '1-14-ILE-LA-doc-.xml') as fd: + root = ile_schema(fd) assert root.tag == 'AvisDInscription' @@ -346,8 +343,8 @@ def test_ile(app, ile_schema, ile_payload): assert root.find('SituationElectoraleAnterieure/PaysUeDerniereInscription/DivisionTerritoriale').text == 'Whatever' # checking that attached files are referenced in -ent-.xml file - desc = os.path.join(base_doc, '1-14-ILE-LA-ent-.xml') - root = etree.parse(desc).getroot() + with get_inputs(mdel, '1-14-ILE-LA--0', '1-14-ILE-LA-ent-.xml') as fd: + root = etree.parse(fd).getroot() ns = {'ns2': 'http://finances.gouv.fr/dgme/gf/composants/teledemarchexml/donnee/metier'} assert root.tag == '{%(ns2)s}EnteteMetierEnveloppe' % ns assert root.find('ns2:Teledemarche/ns2:NumeroTeledemarche', namespaces=ns).text == '1-14' @@ -377,8 +374,9 @@ def test_ile(app, ile_schema, ile_payload): '1-14-ILE-LA-ent-.xml', 'mdel_passeport_recto.pdf', 'mdel_passeport_verso.pdf', 'mdel_edf.pdf'] - for fname in os.listdir(base_doc): - assert fname in expected_files + with get_zip(mdel, '1-14-ILE-LA--0') as zip_fd: + for fname in zip_fd.namelist(): + assert fname in expected_files # Without anterieur_situation_raw payload = copy.deepcopy(ile_payload) @@ -402,9 +400,9 @@ def test_ile_invalid_value(app, ile_payload): def test_ile_get_status(app, mdel, ile_payload): - shutil.copytree( + copy_tree( os.path.join(data_dir, 'test', 'outputs'), - os.path.join(get_resource_base_dir(), 'test', 'outputs')) + mdel.outputs_dir) resp = app.post_json('/mdel/test/create', params=ile_payload, status=200) demand_id = resp.json['data']['demand_id'] assert demand_id == '1-14-ILE-LA' @@ -443,9 +441,9 @@ def test_get_status_unknown_demand(app): def test_get_status_no_response(app, mdel): - shutil.copytree( + copy_tree( os.path.join(data_dir, 'test', 'outputs'), - os.path.join(get_resource_base_dir(), 'test', 'outputs')) + mdel.outputs_dir) Demand.objects.create(resource=mdel, num='1-15', flow_type='ILE-LA', demand_id='1-15-ILE-LA') resp = app.get('/mdel/test/status', params={'demand_id': '1-15-ILE-LA'}, status=200) @@ -458,9 +456,9 @@ def test_get_status_no_response(app, mdel): def test_get_not_closed_status(app, mdel): - shutil.copytree( + copy_tree( os.path.join(data_dir, 'test', 'outputs'), - os.path.join(get_resource_base_dir(), 'test', 'outputs')) + mdel.outputs_dir) Demand.objects.create(resource=mdel, num='15-9', flow_type='AEC-LA', demand_id='15-9-AEC-LA') resp = app.get('/mdel/test/status', params={'demand_id': '15-9-AEC-LA'}, status=200) -- 2.23.0