Projet

Général

Profil

0002-mdel-improve-code-style-36471.patch

Benjamin Dauvergne, 27 septembre 2019 16:32

Télécharger (34,9 ko)

Voir les différences:

Subject: [PATCH 2/3] mdel: improve code style (#36471)

 passerelle/apps/mdel/mdel.py   |  62 +++---
 passerelle/apps/mdel/models.py | 357 +++++++++++++++++++--------------
 passerelle/apps/mdel/utils.py  |  11 +-
 tests/test_mdel.py             |  98 +++++----
 4 files changed, 286 insertions(+), 242 deletions(-)
passerelle/apps/mdel/mdel.py
1
# Passerelle - uniform access to data and services
2
# Copyright (C) 2016  Entr'ouvert
1
# coding: utf-8
2
# passerelle - uniform access to data and services-
3
# Copyright (C) 2019  Entr'ouvert
3 4
#
4 5
# This program is free software: you can redistribute it and/or modify it
5 6
# under the terms of the GNU Affero General Public License as published
......
14 15
# You should have received a copy of the GNU Affero General Public License
15 16
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 17

  
18
from __future__ import unicode_literals
19

  
17 20
import os
18 21
import base64
19 22
import datetime
......
30 33
from passerelle.utils.jsonresponse import APIError
31 34

  
32 35

  
33
def get_resource_base_dir():
34
    return default_storage.path('mdel')
35

  
36

  
37 36
class AttachedFile(object):
38

  
39 37
    def __init__(self, code, filename, b64_content):
40 38
        if code not in ('JI', 'JD'):
41 39
            raise APIError('%s is not a valid code (JI or JD)' % code)
......
51 49

  
52 50

  
53 51
class MDELBase(object):
54

  
55 52
    def to_string(self):
56 53
        raw_string = etree.tostring(self.xml, encoding='utf-8')
57 54
        parsed_string = minidom.parseString(raw_string)
58 55
        return parsed_string.toprettyxml(indent='\t')
59 56

  
60
    def save(self, subfolder, filename):
57
    def save(self, directory, filename):
61 58
        """Save object as xml file
62 59
        """
63
        folder = os.path.join(get_resource_base_dir(), subfolder)
64
        path = os.path.join(folder, filename)
60
        path = os.path.join(directory, filename)
65 61
        default_storage.save(path, ContentFile(self.to_string()))
66 62

  
67 63

  
......
247 243

  
248 244
        return data
249 245

  
246
    @classmethod
247
    def path_to_xml(cls, path, value, parent):
248
        '''Resolve an XML `path` starting from `parent` and the target node content to `value`.'''
249

  
250
        if isinstance(path, list) and len(path) > 1:
251
            parent_child = parent.find(path[0])
252
            if parent_child is not None:
253
                element = parent_child
254
                path.pop(0)
255
            else:
256
                element = ElementFactory(path.pop(0))
257
            element.append(cls.path_to_xml(path, value, element), allow_new=False)
258
        else:
259
            element = ElementFactory(path[0], text=value)
260
        return element
261

  
250 262
    @property
251 263
    def xml(self):
252
        elements = []
264
        '''Generate an XML document applying values from `self.data` to `self.mapping`'''
253 265

  
266
        # extract text content from data for each path in mapping
267
        elements = []
254 268
        for key, path in self.mapping:
255 269
            if key in self.data:
256 270
                elements.append((path, self.data[key]))
257 271

  
272
        # recursively create XML nodes using self.path_to_xml()
258 273
        root = ElementFactory(self.root_element, **self.root_attributes)
259

  
260 274
        for path, value in elements:
261 275
            path_splitted = path.split('_')
262
            root.append(json_to_xml(path_splitted, value, root), allow_new=False)
276
            root.append(self.path_to_xml(path_splitted, value, root), allow_new=False)
263 277

  
264 278
        return root
265 279

  
280

  
266 281
class ILEData(Data):
282
    '''Démarche inscription sur les listes électorales'''
267 283

  
268 284
    mapping = [
269 285
        ('nom_famille', 'Inscription_Electeur_Noms_NomFamille'),
......
359 375

  
360 376

  
361 377
class AECData(Data):
378
    '''Démarche acte d'état civil'''
362 379

  
363 380
    mapping = [
364 381
        ('aec_type_raw', 'DemandeActe_TypeActe_Code'),
......
444 461
        self.root_attributes = {'canal_utilise': '0'}
445 462

  
446 463
        super(AECData, self).__init__(demand_id, data)
447

  
448

  
449
def json_to_xml(path, value, parent):
450

  
451
    if isinstance(path, list) and len(path) > 1:
452
        parent_child = parent.find(path[0])
453
        if parent_child is not None:
454
            element = parent_child
455
            path.pop(0)
456
        else:
457
            element = ElementFactory(path.pop(0))
458

  
459
        element.append(json_to_xml(path, value, element), allow_new=False)
460
    else:
461
        element = ElementFactory(path[0], text=value)
462

  
463
    return element
passerelle/apps/mdel/models.py
1
# -*- coding: utf-8 -*-
2
# Passerelle - uniform access to data and services
1
# coding: utf-8
2
# passerelle - uniform access to data and services
3 3
# Copyright (C) 2016  Entr'ouvert
4 4
#
5 5
# This program is free software: you can redistribute it and/or modify it
......
15 15
# You should have received a copy of the GNU Affero General Public License
16 16
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 17

  
18
from __future__ import unicode_literals
19

  
18 20
import os
19 21
import json
22
import shutil
23
import tempfile
20 24

  
21
from django.db import models
25
from django.core.files.storage import default_storage
26
from django.db import models, transaction
22 27
from django.utils.translation import ugettext_lazy as _
28
from django.utils import six
23 29

  
24 30
from passerelle.base.models import BaseResource
25 31
from passerelle.utils.api import endpoint
......
30 36
from .utils import zipdir, get_file_content_from_zip, parse_date
31 37

  
32 38

  
33
DEMAND_TYPES = ['ILE-LA', 'RCO-LA', 'AEC-LA']
39
DEMAND_TYPES = ['ILE-LA', 'AEC-LA']
34 40

  
35 41
STATUS_MAPPING = {
36 42
    '100': 'closed',
......
82 88
    def get_verbose_name(cls):
83 89
        return cls._meta.verbose_name
84 90

  
91
    @property
92
    def base_dir(self):
93
        path = os.path.join(default_storage.path('mdel'), self.slug)
94
        if not os.path.exists(path):
95
            os.makedirs(path)
96
        return path
97

  
98
    @property
99
    def tmp_dir(self):
100
        path = os.path.join(self.base_dir, 'tmp')
101
        if not os.path.exists(path):
102
            os.makedirs(path)
103
        return path
104

  
105
    @property
106
    def inputs_dir(self):
107
        path = os.path.join(self.base_dir, 'inputs')
108
        if not os.path.exists(path):
109
            os.makedirs(path)
110
        return path
111

  
112
    @property
113
    def outputs_dir(self):
114
        path = os.path.join(self.base_dir, 'outputs')
115
        if not os.path.exists(path):
116
            os.makedirs(path)
117
        return path
118

  
85 119
    @endpoint(perm='can_access', methods=['post'])
86 120
    def create(self, request, *args, **kwargs):
87 121
        """Create a demand
......
101 135

  
102 136
        demand_type = formdata.pop('demand_type', '').upper()
103 137
        if demand_type not in DEMAND_TYPES:
104
            raise APIError('demand_type must be : %r' % DEMAND_TYPES)
138
            raise APIError('demand_type must be one of : %s' % ', '.join(DEMAND_TYPES))
105 139

  
106 140
        if 'display_id' not in formdata:
107 141
            raise APIError('display_id is required')
......
111 145

  
112 146
        demand_id = formdata.pop('display_id')
113 147

  
114
        demand, created = Demand.objects.get_or_create(num=demand_id, flow_type=demand_type, resource=self)
115
        if not created:
116
            demand.step += 1
117

  
118
        demand.create_zip(formdata)
119

  
120
        demand.save()
121

  
122
        return {'data': {'demand_id': demand.demand_id}}
148
        demand_dir = tempfile.mkdtemp(dir=self.tmp_dir)
149
        try:
150
            with transaction.atomic():
151
                # prevent collision between concurrent requests
152
                demand, created = Demand.objects.select_for_update().get_or_create(
153
                    num=demand_id, flow_type=demand_type, resource=self)
154
                if not created:
155
                    demand.step += 1
156

  
157
                zip_path = demand.create_zip(demand_dir, formdata)
158
                demand.save()
159
                os.renames(zip_path, os.path.join(self.inputs_dir, demand.name + '.zip'))
160
        finally:
161
            shutil.rmtree(demand_dir)
162

  
163
        return {
164
            'data': {
165
                'demand_id': demand.demand_id
166
            }
167
        }
123 168

  
124 169
    @endpoint(perm='can_access')
125 170
    def status(self, request, *args, **kwargs):
......
129 174
        if not demand_id:
130 175
            raise APIError('demand_id is required')
131 176

  
132
        demand = Demand.objects.get(demand_id=demand_id, resource=self)
177
        with transaction.atomic():
178
            demand = self.demand_set.select_for_update().get(demand_id=demand_id)
179
            status = demand.get_status()
180
            demand.save()
133 181

  
134
        status = demand.get_status()
135

  
136
        demand.save()
137 182
        return {'data': status}
138 183

  
139 184
    @endpoint(perm='can_access')
140 185
    def applicants(self, request, without=''):
141
        return {'data': [item for item in APPLICANTS
142
                if item.get('id') not in without.split(',')]}
186
        return {
187
            'data': [
188
                item for item in APPLICANTS if item.get('id') not in without.split(',')
189
            ]
190
        }
143 191

  
144 192
    @endpoint(perm='can_access')
145 193
    def certificates(self, request):
......
147 195

  
148 196
    @endpoint(name='certificate-types', perm='can_access')
149 197
    def certificate_types(self, request, without=''):
150
        return {'data': [item for item in CERTIFICATE_TYPES
151
                if item.get('id') not in without.split(',')]}
198
        return {
199
            'data': [
200
                item for item in CERTIFICATE_TYPES if item.get('id') not in without.split(',')
201
            ]
202
        }
152 203

  
153 204

  
205
@six.python_2_unicode_compatible
154 206
class Demand(models.Model):
155 207
    created_at = models.DateTimeField(auto_now_add=True)
156 208
    updated_at = models.DateTimeField(auto_now=True)
......
162 214
    step = models.IntegerField(default=0)
163 215
    demand_id = models.CharField(max_length=128, null=True)
164 216

  
165
    def __unicode__(self):
217
    def __str__(self):
166 218
        return '%s - %s - %s' % (self.resource.slug, self.demand_id, self.status)
167 219

  
168 220
    class Meta:
......
180 232
    def filename(self):
181 233
        return '%s.zip' % self.name
182 234

  
183
    def create_zip(self, formdata):
235
    def create_zip_ile_la(self, formdata, attached_files, inputs_dir):
236
        proofs = [('JI', 'justificatif_identite'), ('JD', 'justificatif_domicile')]
237

  
238
        for proof_code, proof_attribute in proofs:
239

  
240
            documents = [value for key, value in formdata.items()
241
                         if (key.startswith(proof_attribute)
242
                             and isinstance(value, dict)
243
                             and 'filename' in value
244
                             and 'content' in value)]
245
            if not documents:
246
                raise APIError('%s and all its attributes are required' % proof_attribute)
247
            for document in documents:
248
                filename, b64_content = document.get('filename'), document.get('content')
249
                attached_files.append(mdel.AttachedFile(proof_code, filename, b64_content))
250

  
251
        # process address additional information
252
        adresse_complement = []
253

  
254
        complement_keys = sorted([key for key in formdata if key.startswith('adresse_complement')])
255

  
256
        for key in complement_keys:
257
            adresse_complement.append(formdata[key])
258

  
259
        if adresse_complement:
260
            formdata['adresse_complement'] = ', '.join(adresse_complement)
261

  
262
        # get contact info
263
        contacts = [('TEL', 'contact_telephone'), ('EMAIL', 'contact_email')]
264

  
265
        for contact_code, contact_uri in contacts:
266
            uri = formdata.get(contact_uri)
267
            if uri:
268
                formdata['contact_uri'] = uri
269
                formdata['contact_code'] = contact_code
270

  
271
        # mdel protocol only supports two values here (prem & cci)
272
        # but we may want the form to have more specific values;
273
        # we name them (prem|cci)_suffix and squeeze the suffix here.
274
        if not formdata.get('anterieur_situation_raw', None):
275
            raise APIError('anterieur_situation_raw is required')
276

  
277
        formdata['anterieur_situation_raw'] = formdata.get('anterieur_situation_raw').split('_')[0]
278

  
279
        doc = mdel.ILEData(self.demand_id, formdata)
280
        doc.save(inputs_dir)
281

  
282
        for attached_file in attached_files:
283
            attached_file.save(inputs_dir)
284

  
285
    def create_zip_aec_la(self, formdata, attached_files, inputs_dir):
286
        # Set date format
287
        if formdata.get('aec_type_raw') != 'NAISSANCE' and not formdata.get('date_acte'):
288
            raise APIError('<date_acte> is required')
289
        if formdata.get('date_acte'):
290
            formdata['date_acte'] = parse_date(formdata['date_acte'])
291
        if formdata.get('titulaire_naiss_date'):
292
            formdata['titulaire_naiss_date'] = parse_date(formdata['titulaire_naiss_date'])
293
        if formdata.get('titulaire2_naiss_date'):
294
            formdata['titulaire2_naiss_date'] = parse_date(formdata['titulaire2_naiss_date'])
295

  
296
        # Ensuring that all titles are uppercase
297
        for key in formdata.keys():
298
            if 'civilite' in key:
299
                formdata[key] = formdata[key].upper()
300

  
301
        # Merging street number and street name
302
        demandeur_adresse_voie = formdata.get('demandeur_adresse_voie')
303
        demandeur_adresse_num = formdata.get('demandeur_adresse_num')
304
        if demandeur_adresse_voie and demandeur_adresse_num:
305
            formdata['demandeur_adresse_voie'] = '%s %s' % (demandeur_adresse_num, demandeur_adresse_voie)
306

  
307
        # Set foreign address if country is not France
308
        adresse_keys = ['etage', 'batiment', 'voie', 'code_postal', 'ville']
309
        adresse_keys = ['demandeur_adresse_%s' % key for key in adresse_keys]
310
        demandeur_adresse_pays_raw = formdata.get('demandeur_adresse_pays_raw')
311
        demandeur_adresse_etrangere = formdata.get('demandeur_adresse_etrangere')
312
        demandeur_adresse_etrangere_pays_raw = formdata.get('demandeur_adresse_etrangere_pays_raw')
313
        if (demandeur_adresse_etrangere_pays_raw
314
                or (demandeur_adresse_pays_raw and demandeur_adresse_pays_raw != 'FRA')):
315
            formdata.pop('demandeur_adresse_pays_raw', None)
316
            if not demandeur_adresse_etrangere_pays_raw:
317
                formdata['demandeur_adresse_etrangere_pays_raw'] = demandeur_adresse_pays_raw
318
            if demandeur_adresse_etrangere:
319
                # dismiss french address if the foreign one is filled
320
                for key in adresse_keys:
321
                    if key in formdata:
322
                        del formdata[key]
323
            else:
324
                # build foreign address from french address fields
325
                adresse_etrangere = []
326
                for key in adresse_keys:
327
                    value = formdata.pop(key, '')
328
                    if value:
329
                        if key != 'demandeur_adresse_ville':
330
                            adresse_etrangere.append(value)
331
                        else:
332
                            adresse_etrangere[-1] += ' %s' % value
333
                formdata['demandeur_adresse_etrangere'] = ', '.join(adresse_etrangere)
334

  
335
        # Set aec_nature if aec_type_raw == DECES
336
        if formdata.get('aec_type_raw') == 'DECES' and not formdata.get('aec_nature_raw'):
337
            formdata['aec_nature'] = u'Copie integrale'
338
            formdata['aec_nature_raw'] = 'COPIE-INTEGRALE'
339

  
340
        # Set motif_demand if none
341
        if not formdata.get('motif_demande'):
342
            formdata['motif_demande'] = 'Autre'
343
            formdata['motif_demande_raw'] = 'Autre'
344

  
345
        # handling requester when 'Autre'
346
        if formdata.get('qualite_demandeur_autre') and not formdata.get('qualite_demandeur_raw'):
347
            formdata['qualite_demandeur'] = formdata['qualite_demandeur_autre']
348
            formdata['qualite_demandeur_raw'] = 'Autre'
349

  
350
        doc = mdel.AECData(self.demand_id, formdata)
351
        doc.save(inputs_dir)
352

  
353
    def create_zip(self, inputs_dir, formdata):
184 354
        """
185 355
        Creates demand as zip folder
186 356
        """
......
189 359
        flow_type = self.flow_type
190 360
        demand_num = self.num
191 361

  
192
        resource_base_dir = mdel.get_resource_base_dir()
193

  
194
        inputs_dir = os.path.join(resource_base_dir, self.resource.slug,
195
                                  'inputs', self.name)
196

  
197 362
        attached_files = []
198 363

  
199
        if flow_type == 'ILE-LA':
200

  
201
            proofs = [('JI', 'justificatif_identite'), ('JD', 'justificatif_domicile')]
202

  
203
            for proof_code, proof_attribute in proofs:
204

  
205
                documents = [value for key, value in formdata.items()
206
                             if key.startswith(proof_attribute) and isinstance(value, dict) and 'filename' in value and 'content' in value]
207
                if not documents:
208
                    raise APIError('%s and all its attributes are required' % proof_attribute)
209
                for document in documents:
210
                    filename, b64_content = document.get('filename'), document.get('content')
211
                    attached_files.append(mdel.AttachedFile(proof_code, filename, b64_content))
212

  
213
            # process address additional information
214
            adresse_complement = []
215

  
216
            complement_keys = sorted([key for key in formdata if key.startswith('adresse_complement')])
217

  
218
            for key in complement_keys:
219
                adresse_complement.append(formdata[key])
220

  
221
            if adresse_complement:
222
                formdata['adresse_complement'] = ', '.join(adresse_complement)
223

  
224
            # get contact info
225
            contacts = [('TEL', 'contact_telephone'), ('EMAIL', 'contact_email')]
226

  
227
            for contact_code, contact_uri in contacts:
228
                uri = formdata.get(contact_uri)
229
                if uri:
230
                    formdata['contact_uri'] = uri
231
                    formdata['contact_code'] = contact_code
232

  
233
            # mdel protocol only supports two values here (prem & cci)
234
            # but we may want the form to have more specific values;
235
            # we name them (prem|cci)_suffix and squeeze the suffix here.
236
            if not formdata.get('anterieur_situation_raw', None):
237
                raise APIError('anterieur_situation_raw is required')
238

  
239
            formdata['anterieur_situation_raw'] = formdata.get('anterieur_situation_raw').split('_')[0]
240

  
241
            doc = mdel.ILEData(self.demand_id, formdata)
242
            doc.save(inputs_dir)
243

  
244
            for attached_file in attached_files:
245
                attached_file.save(inputs_dir)
246

  
247
        elif flow_type == 'RCO-LA':
248
            raise APIError('RCO-LA processing not implemented')
249

  
250
        else:
251
            # Set date format
252
            if formdata.get('aec_type_raw') != 'NAISSANCE' and not formdata.get('date_acte'):
253
                raise APIError('<date_acte> is required')
254
            if formdata.get('date_acte'):
255
                formdata['date_acte'] = parse_date(formdata['date_acte'])
256
            if formdata.get('titulaire_naiss_date'):
257
                formdata['titulaire_naiss_date'] = parse_date(formdata['titulaire_naiss_date'])
258
            if formdata.get('titulaire2_naiss_date'):
259
                formdata['titulaire2_naiss_date'] = parse_date(formdata['titulaire2_naiss_date'])
260

  
261
            # Ensuring that all titles are uppercase
262
            for key in formdata.keys():
263
                if 'civilite' in key:
264
                    formdata[key] = formdata[key].upper()
265

  
266
            # Merging street number and street name
267
            demandeur_adresse_voie = formdata.get('demandeur_adresse_voie')
268
            demandeur_adresse_num = formdata.get('demandeur_adresse_num')
269
            if demandeur_adresse_voie and demandeur_adresse_num:
270
                formdata['demandeur_adresse_voie'] = '%s %s' % (demandeur_adresse_num, demandeur_adresse_voie)
271

  
272
            # Set foreign address if country is not France
273
            adresse_keys = ['etage', 'batiment', 'voie', 'code_postal', 'ville']
274
            adresse_keys = ['demandeur_adresse_%s' % key for key in adresse_keys]
275
            demandeur_adresse_pays_raw = formdata.get('demandeur_adresse_pays_raw')
276
            demandeur_adresse_etrangere = formdata.get('demandeur_adresse_etrangere')
277
            demandeur_adresse_etrangere_pays_raw = formdata.get('demandeur_adresse_etrangere_pays_raw')
278
            if (demandeur_adresse_etrangere_pays_raw
279
                    or (demandeur_adresse_pays_raw and demandeur_adresse_pays_raw != 'FRA')):
280
                formdata.pop('demandeur_adresse_pays_raw', None)
281
                if not demandeur_adresse_etrangere_pays_raw:
282
                    formdata['demandeur_adresse_etrangere_pays_raw'] = demandeur_adresse_pays_raw
283
                if demandeur_adresse_etrangere:
284
                    # dismiss french address if the foreign one is filled
285
                    for key in adresse_keys:
286
                        if key in formdata:
287
                            del formdata[key]
288
                else:
289
                    # build foreign address from french address fields
290
                    adresse_etrangere = []
291
                    for key in adresse_keys:
292
                        value = formdata.pop(key, '')
293
                        if value:
294
                            if key != 'demandeur_adresse_ville':
295
                                adresse_etrangere.append(value)
296
                            else:
297
                                adresse_etrangere[-1] += ' %s' % value
298
                    formdata['demandeur_adresse_etrangere'] = ', '.join(adresse_etrangere)
299

  
300
            # Set aec_nature if aec_type_raw == DECES
301
            if formdata.get('aec_type_raw') == 'DECES' and not formdata.get('aec_nature_raw'):
302
                formdata['aec_nature'] = u'Copie integrale'
303
                formdata['aec_nature_raw'] = 'COPIE-INTEGRALE'
304

  
305
            # Set motif_demand if none
306
            if not formdata.get('motif_demande'):
307
                formdata['motif_demande'] = 'Autre'
308
                formdata['motif_demande_raw'] = 'Autre'
309

  
310
            # handling requester when 'Autre'
311
            if formdata.get('qualite_demandeur_autre') and not formdata.get('qualite_demandeur_raw'):
312
                formdata['qualite_demandeur'] = formdata['qualite_demandeur_autre']
313
                formdata['qualite_demandeur_raw'] = 'Autre'
314

  
315
            doc = mdel.AECData(self.demand_id, formdata)
316
            doc.save(inputs_dir)
364
        # create main document
365
        try:
366
            method = getattr(self, 'create_zip_%s' % flow_type.lower().replace('-', '_'))
367
        except AttributeError:
368
            raise APIError('flow type %s is unsupported' % flow_type)
369
        method(formdata, attached_files, inputs_dir=inputs_dir)
317 370

  
318 371
        submission_date = formdata.get('receipt_time', None)
319 372

  
......
339 392

  
340 393
        namespace = {'ns2': 'http://finances.gouv.fr/dgme/pec/message/v1'}
341 394

  
342
        resource_base_dir = mdel.get_resource_base_dir()
343
        output_dir = os.path.join(resource_base_dir, self.resource.slug, 'outputs')
344

  
345
        if not os.path.exists(output_dir):
346
            os.makedirs(output_dir)
347

  
348 395
        # list all demand response zip files
349 396
        zip_files = {}
350
        for zfile in os.listdir(output_dir):
397
        for zfile in os.listdir(self.resource.outputs_dir):
351 398
            if zfile.lower().startswith(self.demand_id.lower()) and zfile.lower().endswith('.zip'):
352 399
                fname = os.path.splitext(zfile)[0]
353 400
                try:
......
363 410
        else:
364 411
            return result
365 412

  
366
        path = os.path.join(output_dir, zip_file)
413
        path = os.path.join(self.resource.outputs_dir, zip_file)
367 414

  
368 415
        content = get_file_content_from_zip(path, 'message.xml')
369 416
        element = mdel.etree.fromstring(content)
passerelle/apps/mdel/utils.py
1
# Passerelle - uniform access to data and services
1
# passerelle - uniform access to data and services
2 2
# Copyright (C) 2016  Entr'ouvert
3 3
#
4 4
# This program is free software: you can redistribute it and/or modify it
......
14 14
# You should have received a copy of the GNU Affero General Public License
15 15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 16

  
17
from __future__ import unicode_literals
18

  
17 19
import os
18 20
import zipfile
19 21
from xml.etree import ElementTree as etree
......
22 24

  
23 25
from passerelle.utils.jsonresponse import APIError
24 26

  
27

  
25 28
def parse_date(date):
26 29
    try:
27 30
        parsed_date = django_parse_date(date)
28 31
    except ValueError as e:
29
        raise APIError('Invalid date: %r (%r)' % ( date, e))
32
        raise APIError('invalid date: %s (%s)' % (date, e))
30 33
    if not parsed_date:
31 34
        raise APIError('date %r not iso-formated' % date)
32 35
    return parsed_date.isoformat()
33 36

  
34 37

  
35 38
class ElementFactory(etree.Element):
36

  
37 39
    def __init__(self, *args, **kwargs):
38 40
        self.text = kwargs.pop('text', None)
39 41
        namespace = kwargs.pop('namespace', None)
......
46 48
            super(ElementFactory, self).__init__(*args, **kwargs)
47 49

  
48 50
    def append(self, element, allow_new=True):
49

  
50 51
        if not allow_new:
51 52
            if isinstance(element.tag, etree.QName):
52 53
                found = self.find(element.tag.text)
53 54
            else:
54 55
                found = self.find(element.tag)
55

  
56 56
            if found is not None:
57 57
                return self
58

  
59 58
        super(ElementFactory, self).append(element)
60 59
        return self
61 60

  
tests/test_mdel.py
17 17

  
18 18
from __future__ import unicode_literals
19 19

  
20
import shutil
20
import contextlib
21 21
import os
22 22
import json
23 23
import base64
24 24
import copy
25

  
25
import zipfile
26 26
from xml.etree import ElementTree as etree
27

  
28
from distutils.dir_util import copy_tree
27 29
from lxml import etree as letree
28 30

  
29 31
import pytest
30 32

  
31 33
from passerelle.apps.mdel.models import MDEL, Demand
32
from passerelle.apps.mdel.mdel import Message, Description, AttachedFile, get_resource_base_dir
34
from passerelle.apps.mdel.mdel import Message, Description, AttachedFile
33 35
from passerelle.apps.mdel.utils import parse_date
34 36

  
35 37
import utils
......
52 54
            xsd = letree.parse(path).getroot()
53 55
            schema = letree.XMLSchema(xsd)
54 56
            parser = letree.XMLParser(schema=schema)
55
            letree.parse(doc, parser=parser)  # Will raise an exception if schema not respected
57
            return letree.parse(doc, parser=parser).getroot()  # Will raise an exception if schema not respected
56 58
        return validate_schema
57 59
    return fixture
58 60

  
......
71 73
    return utils.setup_access_rights(MDEL.objects.create(slug='test'))
72 74

  
73 75

  
76
@contextlib.contextmanager
77
def get_zip(mdel, base):
78
    with zipfile.ZipFile(
79
            os.path.join(
80
                mdel.inputs_dir,
81
                base + '.zip')) as zip_fd:
82
        yield zip_fd
83

  
84

  
85
@contextlib.contextmanager
86
def get_inputs(mdel, base, name):
87
    with get_zip(mdel, base) as zip_fd:
88
        yield zip_fd.open(name)
89

  
90

  
74 91
def test_message():
75 92
    ns = {'mdel': 'http://finances.gouv.fr/dgme/pec/message/v1'}
76 93
    xmlns = 'http://finances.gouv.fr/dgme/gf/composants/teledemarchexml/donnee/metier'
......
105 122
    assert xml.find('Teledemarche/IdentifiantPlateforme').text == '1'
106 123

  
107 124

  
108
def test_invalid_demand_type(app, ile_payload):
109
    ILE_PAYLOAD_INVALID = copy.deepcopy(ile_payload)
110
    ILE_PAYLOAD_INVALID['extra']['demand_type'] = 'whatever'
111
    resp = app.post_json('/mdel/test/create', params=ILE_PAYLOAD_INVALID, status=200)
112
    assert resp.json['err_desc'] == "demand_type must be : ['ILE-LA', 'RCO-LA', 'AEC-LA']"
113

  
114

  
115 125
def test_invalid_demand_no_form_number(app, ile_payload):
116 126
    ILE_PAYLOAD_INVALID_NO = copy.deepcopy(ile_payload)
117 127
    ILE_PAYLOAD_INVALID_NO.pop('display_id')
......
123 133
    RCO_PAYLOAD = copy.deepcopy(ile_payload)
124 134
    RCO_PAYLOAD['extra']['demand_type'] = 'rco-la'
125 135
    resp = app.post_json('/mdel/test/create', params=RCO_PAYLOAD, status=200)
126
    assert resp.json['err_desc'] == "RCO-LA processing not implemented"
136
    assert resp.json['err_desc'] == "demand_type must be one of : ILE-LA, AEC-LA"
127 137

  
128 138

  
129
def test_aec_naissance(app, aec_schema, aec_naissance_payload):
139
def test_aec_naissance(app, mdel, aec_schema, aec_naissance_payload):
130 140
    resp = app.post_json('/mdel/test/create', params=aec_naissance_payload, status=200)
131 141

  
132 142
    assert resp.json['data']['demand_id'] == '15-4-AEC-LA'
133 143

  
134
    doc = os.path.join(get_resource_base_dir(), 'test', 'inputs', '15-4-AEC-LA--0', '15-4-AEC-LA-doc-.xml')
135
    aec_schema(doc)
136

  
137
    root = etree.parse(doc).getroot()
144
    with get_inputs(mdel, '15-4-AEC-LA--0', '15-4-AEC-LA-doc-.xml') as fd:
145
        root = aec_schema(fd)
138 146

  
139 147
    assert root.tag == 'EnveloppeMetierType'
140 148

  
......
178 186
    assert root.find('DemandeActe/Titulaire/Filiation/Pere/Prenoms').text == 'Fritz'
179 187

  
180 188

  
181
def test_aec_naissance_etranger(app, aec_schema, aec_naissance_etranger_payload):
189
def test_aec_naissance_etranger(app, mdel, aec_schema, aec_naissance_etranger_payload):
182 190
    resp = app.post_json('/mdel/test/create', params=aec_naissance_etranger_payload, status=200)
183 191

  
184 192
    assert resp.json['data']['demand_id'] == '1-4-AEC-LA'
185 193

  
186
    doc = os.path.join(get_resource_base_dir(), 'test', 'inputs', '1-4-AEC-LA--0', '1-4-AEC-LA-doc-.xml')
187
    aec_schema(doc)
188

  
189
    root = etree.parse(doc).getroot()
194
    with get_inputs(mdel, '1-4-AEC-LA--0', '1-4-AEC-LA-doc-.xml') as fd:
195
        root = aec_schema(fd)
190 196

  
191 197
    assert root.tag == 'EnveloppeMetierType'
192 198

  
......
197 203
            == '4 rue des coquelicots, 3800 Bern')
198 204

  
199 205

  
200
def test_aec_mariage(app, aec_schema, aec_mariage_payload):
206
def test_aec_mariage(app, mdel, aec_schema, aec_mariage_payload):
201 207
    resp = app.post_json('/mdel/test/create', params=aec_mariage_payload, status=200)
202 208

  
203 209
    assert resp.json['data']['demand_id'] == '16-1-AEC-LA'
204 210

  
205
    doc = os.path.join(get_resource_base_dir(), 'test', 'inputs', '16-1-AEC-LA--0', '16-1-AEC-LA-doc-.xml')
206

  
207
    aec_schema(doc)
208

  
209
    root = etree.parse(doc).getroot()
211
    with get_inputs(mdel, '16-1-AEC-LA--0', '16-1-AEC-LA-doc-.xml') as fd:
212
        root = aec_schema(fd)
210 213

  
211 214
    assert root.tag == 'EnveloppeMetierType'
212 215

  
......
255 258
    assert root.find('DemandeActe/Titulaire2/Filiation/Pere/Prenoms').text == 'Pascal'
256 259

  
257 260

  
258
def test_aec_deces(app, aec_schema, aec_deces_payload):
261
def test_aec_deces(app, mdel, aec_schema, aec_deces_payload):
259 262
    resp = app.post_json('/mdel/test/create', params=aec_deces_payload, status=200)
260 263

  
261 264
    assert resp.json['data']['demand_id'] == '17-1-AEC-LA'
262 265

  
263
    doc = os.path.join(get_resource_base_dir(), 'test', 'inputs', '17-1-AEC-LA--0', '17-1-AEC-LA-doc-.xml')
264
    aec_schema(doc)
265

  
266
    root = etree.parse(doc).getroot()
266
    with get_inputs(mdel, '17-1-AEC-LA--0', '17-1-AEC-LA-doc-.xml') as fd:
267
        root = aec_schema(fd)
267 268

  
268 269
    assert root.tag == 'EnveloppeMetierType'
269 270

  
......
309 310
    assert resp.json['err_desc'] == '<date_acte> is required'
310 311

  
311 312

  
312
def test_ile(app, ile_schema, ile_payload):
313
def test_ile(app, mdel, ile_schema, ile_payload):
313 314
    resp = app.post_json('/mdel/test/create', params=ile_payload, status=200)
314 315
    assert resp.json['data']['demand_id'] == '1-14-ILE-LA'
315 316

  
316
    base_doc = os.path.join(get_resource_base_dir(), 'test', 'inputs', '1-14-ILE-LA--0')
317
    doc = os.path.join(base_doc, '1-14-ILE-LA-doc-.xml')
318

  
319
    ile_schema(doc)
320

  
321
    root = etree.parse(doc).getroot()
317
    with get_inputs(mdel, '1-14-ILE-LA--0', '1-14-ILE-LA-doc-.xml') as fd:
318
        root = ile_schema(fd)
322 319

  
323 320
    assert root.tag == 'AvisDInscription'
324 321

  
......
346 343
    assert root.find('SituationElectoraleAnterieure/PaysUeDerniereInscription/DivisionTerritoriale').text == 'Whatever'
347 344

  
348 345
    # checking that attached files are referenced in -ent-.xml file
349
    desc = os.path.join(base_doc, '1-14-ILE-LA-ent-.xml')
350
    root = etree.parse(desc).getroot()
346
    with get_inputs(mdel, '1-14-ILE-LA--0', '1-14-ILE-LA-ent-.xml') as fd:
347
        root = etree.parse(fd).getroot()
351 348
    ns = {'ns2': 'http://finances.gouv.fr/dgme/gf/composants/teledemarchexml/donnee/metier'}
352 349
    assert root.tag == '{%(ns2)s}EnteteMetierEnveloppe' % ns
353 350
    assert root.find('ns2:Teledemarche/ns2:NumeroTeledemarche', namespaces=ns).text == '1-14'
......
377 374
                      '1-14-ILE-LA-ent-.xml', 'mdel_passeport_recto.pdf',
378 375
                      'mdel_passeport_verso.pdf', 'mdel_edf.pdf']
379 376

  
380
    for fname in os.listdir(base_doc):
381
        assert fname in expected_files
377
    with get_zip(mdel, '1-14-ILE-LA--0') as zip_fd:
378
        for fname in zip_fd.namelist():
379
            assert fname in expected_files
382 380

  
383 381
    # Without anterieur_situation_raw
384 382
    payload = copy.deepcopy(ile_payload)
......
402 400

  
403 401

  
404 402
def test_ile_get_status(app, mdel, ile_payload):
405
    shutil.copytree(
403
    copy_tree(
406 404
        os.path.join(data_dir, 'test', 'outputs'),
407
        os.path.join(get_resource_base_dir(), 'test', 'outputs'))
405
        mdel.outputs_dir)
408 406
    resp = app.post_json('/mdel/test/create', params=ile_payload, status=200)
409 407
    demand_id = resp.json['data']['demand_id']
410 408
    assert demand_id == '1-14-ILE-LA'
......
443 441

  
444 442

  
445 443
def test_get_status_no_response(app, mdel):
446
    shutil.copytree(
444
    copy_tree(
447 445
        os.path.join(data_dir, 'test', 'outputs'),
448
        os.path.join(get_resource_base_dir(), 'test', 'outputs'))
446
        mdel.outputs_dir)
449 447
    Demand.objects.create(resource=mdel, num='1-15', flow_type='ILE-LA', demand_id='1-15-ILE-LA')
450 448

  
451 449
    resp = app.get('/mdel/test/status', params={'demand_id': '1-15-ILE-LA'}, status=200)
......
458 456

  
459 457

  
460 458
def test_get_not_closed_status(app, mdel):
461
    shutil.copytree(
459
    copy_tree(
462 460
        os.path.join(data_dir, 'test', 'outputs'),
463
        os.path.join(get_resource_base_dir(), 'test', 'outputs'))
461
        mdel.outputs_dir)
464 462
    Demand.objects.create(resource=mdel, num='15-9', flow_type='AEC-LA', demand_id='15-9-AEC-LA')
465 463

  
466 464
    resp = app.get('/mdel/test/status', params={'demand_id': '15-9-AEC-LA'}, status=200)
467
-