0002-mdel-improve-code-style-36471.patch
passerelle/apps/mdel/mdel.py | ||
---|---|---|
1 |
# Passerelle - uniform access to data and services |
|
2 |
# Copyright (C) 2016 Entr'ouvert |
|
1 |
# coding: utf-8 |
|
2 |
# passerelle - uniform access to data and services- |
|
3 |
# Copyright (C) 2019 Entr'ouvert |
|
3 | 4 |
# |
4 | 5 |
# This program is free software: you can redistribute it and/or modify it |
5 | 6 |
# under the terms of the GNU Affero General Public License as published |
... | ... | |
14 | 15 |
# You should have received a copy of the GNU Affero General Public License |
15 | 16 |
# along with this program. If not, see <http://www.gnu.org/licenses/>. |
16 | 17 | |
18 |
from __future__ import unicode_literals |
|
19 | ||
17 | 20 |
import os |
18 | 21 |
import base64 |
19 | 22 |
import datetime |
... | ... | |
30 | 33 |
from passerelle.utils.jsonresponse import APIError |
31 | 34 | |
32 | 35 | |
33 |
def get_resource_base_dir(): |
|
34 |
return default_storage.path('mdel') |
|
35 | ||
36 | ||
37 | 36 |
class AttachedFile(object): |
38 | ||
39 | 37 |
def __init__(self, code, filename, b64_content): |
40 | 38 |
if code not in ('JI', 'JD'): |
41 | 39 |
raise APIError('%s is not a valid code (JI or JD)' % code) |
... | ... | |
51 | 49 | |
52 | 50 | |
53 | 51 |
class MDELBase(object): |
54 | ||
55 | 52 |
def to_string(self): |
56 | 53 |
raw_string = etree.tostring(self.xml, encoding='utf-8') |
57 | 54 |
parsed_string = minidom.parseString(raw_string) |
58 | 55 |
return parsed_string.toprettyxml(indent='\t') |
59 | 56 | |
60 |
def save(self, subfolder, filename):
|
|
57 |
def save(self, directory, filename):
|
|
61 | 58 |
"""Save object as xml file |
62 | 59 |
""" |
63 |
folder = os.path.join(get_resource_base_dir(), subfolder) |
|
64 |
path = os.path.join(folder, filename) |
|
60 |
path = os.path.join(directory, filename) |
|
65 | 61 |
default_storage.save(path, ContentFile(self.to_string())) |
66 | 62 | |
67 | 63 | |
... | ... | |
247 | 243 | |
248 | 244 |
return data |
249 | 245 | |
246 |
@classmethod |
|
247 |
def path_to_xml(cls, path, value, parent): |
|
248 |
'''Resolve an XML `path` starting from `parent` and the target node content to `value`.''' |
|
249 | ||
250 |
if isinstance(path, list) and len(path) > 1: |
|
251 |
parent_child = parent.find(path[0]) |
|
252 |
if parent_child is not None: |
|
253 |
element = parent_child |
|
254 |
path.pop(0) |
|
255 |
else: |
|
256 |
element = ElementFactory(path.pop(0)) |
|
257 |
element.append(cls.path_to_xml(path, value, element), allow_new=False) |
|
258 |
else: |
|
259 |
element = ElementFactory(path[0], text=value) |
|
260 |
return element |
|
261 | ||
250 | 262 |
@property |
251 | 263 |
def xml(self): |
252 |
elements = []
|
|
264 |
'''Generate an XML document applying values from `self.data` to `self.mapping`'''
|
|
253 | 265 | |
266 |
# extract text content from data for each path in mapping |
|
267 |
elements = [] |
|
254 | 268 |
for key, path in self.mapping: |
255 | 269 |
if key in self.data: |
256 | 270 |
elements.append((path, self.data[key])) |
257 | 271 | |
272 |
# recursively create XML nodes using self.path_to_xml() |
|
258 | 273 |
root = ElementFactory(self.root_element, **self.root_attributes) |
259 | ||
260 | 274 |
for path, value in elements: |
261 | 275 |
path_splitted = path.split('_') |
262 |
root.append(json_to_xml(path_splitted, value, root), allow_new=False)
|
|
276 |
root.append(self.path_to_xml(path_splitted, value, root), allow_new=False)
|
|
263 | 277 | |
264 | 278 |
return root |
265 | 279 | |
280 | ||
266 | 281 |
class ILEData(Data): |
282 |
'''Démarche inscription sur les listes électorales''' |
|
267 | 283 | |
268 | 284 |
mapping = [ |
269 | 285 |
('nom_famille', 'Inscription_Electeur_Noms_NomFamille'), |
... | ... | |
359 | 375 | |
360 | 376 | |
361 | 377 |
class AECData(Data): |
378 |
'''Démarche acte d'état civil''' |
|
362 | 379 | |
363 | 380 |
mapping = [ |
364 | 381 |
('aec_type_raw', 'DemandeActe_TypeActe_Code'), |
... | ... | |
444 | 461 |
self.root_attributes = {'canal_utilise': '0'} |
445 | 462 | |
446 | 463 |
super(AECData, self).__init__(demand_id, data) |
447 | ||
448 | ||
449 |
def json_to_xml(path, value, parent): |
|
450 | ||
451 |
if isinstance(path, list) and len(path) > 1: |
|
452 |
parent_child = parent.find(path[0]) |
|
453 |
if parent_child is not None: |
|
454 |
element = parent_child |
|
455 |
path.pop(0) |
|
456 |
else: |
|
457 |
element = ElementFactory(path.pop(0)) |
|
458 | ||
459 |
element.append(json_to_xml(path, value, element), allow_new=False) |
|
460 |
else: |
|
461 |
element = ElementFactory(path[0], text=value) |
|
462 | ||
463 |
return element |
passerelle/apps/mdel/models.py | ||
---|---|---|
1 |
# -*- coding: utf-8 -*-
|
|
2 |
# Passerelle - uniform access to data and services
|
|
1 |
# coding: utf-8
|
|
2 |
# passerelle - uniform access to data and services
|
|
3 | 3 |
# Copyright (C) 2016 Entr'ouvert |
4 | 4 |
# |
5 | 5 |
# This program is free software: you can redistribute it and/or modify it |
... | ... | |
15 | 15 |
# You should have received a copy of the GNU Affero General Public License |
16 | 16 |
# along with this program. If not, see <http://www.gnu.org/licenses/>. |
17 | 17 | |
18 |
from __future__ import unicode_literals |
|
19 | ||
18 | 20 |
import os |
19 | 21 |
import json |
22 |
import shutil |
|
23 |
import tempfile |
|
20 | 24 | |
21 |
from django.db import models |
|
25 |
from django.core.files.storage import default_storage |
|
26 |
from django.db import models, transaction |
|
22 | 27 |
from django.utils.translation import ugettext_lazy as _ |
28 |
from django.utils import six |
|
23 | 29 | |
24 | 30 |
from passerelle.base.models import BaseResource |
25 | 31 |
from passerelle.utils.api import endpoint |
... | ... | |
30 | 36 |
from .utils import zipdir, get_file_content_from_zip, parse_date |
31 | 37 | |
32 | 38 | |
33 |
DEMAND_TYPES = ['ILE-LA', 'RCO-LA', 'AEC-LA']
|
|
39 |
DEMAND_TYPES = ['ILE-LA', 'AEC-LA'] |
|
34 | 40 | |
35 | 41 |
STATUS_MAPPING = { |
36 | 42 |
'100': 'closed', |
... | ... | |
82 | 88 |
def get_verbose_name(cls): |
83 | 89 |
return cls._meta.verbose_name |
84 | 90 | |
91 |
@property |
|
92 |
def base_dir(self): |
|
93 |
path = os.path.join(default_storage.path('mdel'), self.slug) |
|
94 |
if not os.path.exists(path): |
|
95 |
os.makedirs(path) |
|
96 |
return path |
|
97 | ||
98 |
@property |
|
99 |
def tmp_dir(self): |
|
100 |
path = os.path.join(self.base_dir, 'tmp') |
|
101 |
if not os.path.exists(path): |
|
102 |
os.makedirs(path) |
|
103 |
return path |
|
104 | ||
105 |
@property |
|
106 |
def inputs_dir(self): |
|
107 |
path = os.path.join(self.base_dir, 'inputs') |
|
108 |
if not os.path.exists(path): |
|
109 |
os.makedirs(path) |
|
110 |
return path |
|
111 | ||
112 |
@property |
|
113 |
def outputs_dir(self): |
|
114 |
path = os.path.join(self.base_dir, 'outputs') |
|
115 |
if not os.path.exists(path): |
|
116 |
os.makedirs(path) |
|
117 |
return path |
|
118 | ||
85 | 119 |
@endpoint(perm='can_access', methods=['post']) |
86 | 120 |
def create(self, request, *args, **kwargs): |
87 | 121 |
"""Create a demand |
... | ... | |
101 | 135 | |
102 | 136 |
demand_type = formdata.pop('demand_type', '').upper() |
103 | 137 |
if demand_type not in DEMAND_TYPES: |
104 |
raise APIError('demand_type must be : %r' % DEMAND_TYPES)
|
|
138 |
raise APIError('demand_type must be one of : %s' % ', '.join(DEMAND_TYPES))
|
|
105 | 139 | |
106 | 140 |
if 'display_id' not in formdata: |
107 | 141 |
raise APIError('display_id is required') |
... | ... | |
111 | 145 | |
112 | 146 |
demand_id = formdata.pop('display_id') |
113 | 147 | |
114 |
demand, created = Demand.objects.get_or_create(num=demand_id, flow_type=demand_type, resource=self) |
|
115 |
if not created: |
|
116 |
demand.step += 1 |
|
117 | ||
118 |
demand.create_zip(formdata) |
|
119 | ||
120 |
demand.save() |
|
121 | ||
122 |
return {'data': {'demand_id': demand.demand_id}} |
|
148 |
demand_dir = tempfile.mkdtemp(dir=self.tmp_dir) |
|
149 |
try: |
|
150 |
with transaction.atomic(): |
|
151 |
# prevent collision between concurrent requests |
|
152 |
demand, created = Demand.objects.select_for_update().get_or_create( |
|
153 |
num=demand_id, flow_type=demand_type, resource=self) |
|
154 |
if not created: |
|
155 |
demand.step += 1 |
|
156 | ||
157 |
zip_path = demand.create_zip(demand_dir, formdata) |
|
158 |
demand.save() |
|
159 |
os.renames(zip_path, os.path.join(self.inputs_dir, demand.name + '.zip')) |
|
160 |
finally: |
|
161 |
shutil.rmtree(demand_dir) |
|
162 | ||
163 |
return { |
|
164 |
'data': { |
|
165 |
'demand_id': demand.demand_id |
|
166 |
} |
|
167 |
} |
|
123 | 168 | |
124 | 169 |
@endpoint(perm='can_access') |
125 | 170 |
def status(self, request, *args, **kwargs): |
... | ... | |
129 | 174 |
if not demand_id: |
130 | 175 |
raise APIError('demand_id is required') |
131 | 176 | |
132 |
demand = Demand.objects.get(demand_id=demand_id, resource=self) |
|
177 |
with transaction.atomic(): |
|
178 |
demand = self.demand_set.select_for_update().get(demand_id=demand_id) |
|
179 |
status = demand.get_status() |
|
180 |
demand.save() |
|
133 | 181 | |
134 |
status = demand.get_status() |
|
135 | ||
136 |
demand.save() |
|
137 | 182 |
return {'data': status} |
138 | 183 | |
139 | 184 |
@endpoint(perm='can_access') |
140 | 185 |
def applicants(self, request, without=''): |
141 |
return {'data': [item for item in APPLICANTS |
|
142 |
if item.get('id') not in without.split(',')]} |
|
186 |
return { |
|
187 |
'data': [ |
|
188 |
item for item in APPLICANTS if item.get('id') not in without.split(',') |
|
189 |
] |
|
190 |
} |
|
143 | 191 | |
144 | 192 |
@endpoint(perm='can_access') |
145 | 193 |
def certificates(self, request): |
... | ... | |
147 | 195 | |
148 | 196 |
@endpoint(name='certificate-types', perm='can_access') |
149 | 197 |
def certificate_types(self, request, without=''): |
150 |
return {'data': [item for item in CERTIFICATE_TYPES |
|
151 |
if item.get('id') not in without.split(',')]} |
|
198 |
return { |
|
199 |
'data': [ |
|
200 |
item for item in CERTIFICATE_TYPES if item.get('id') not in without.split(',') |
|
201 |
] |
|
202 |
} |
|
152 | 203 | |
153 | 204 | |
205 |
@six.python_2_unicode_compatible |
|
154 | 206 |
class Demand(models.Model): |
155 | 207 |
created_at = models.DateTimeField(auto_now_add=True) |
156 | 208 |
updated_at = models.DateTimeField(auto_now=True) |
... | ... | |
162 | 214 |
step = models.IntegerField(default=0) |
163 | 215 |
demand_id = models.CharField(max_length=128, null=True) |
164 | 216 | |
165 |
def __unicode__(self):
|
|
217 |
def __str__(self):
|
|
166 | 218 |
return '%s - %s - %s' % (self.resource.slug, self.demand_id, self.status) |
167 | 219 | |
168 | 220 |
class Meta: |
... | ... | |
180 | 232 |
def filename(self): |
181 | 233 |
return '%s.zip' % self.name |
182 | 234 | |
183 |
def create_zip(self, formdata): |
|
235 |
def create_zip_ile_la(self, formdata, attached_files, inputs_dir): |
|
236 |
proofs = [('JI', 'justificatif_identite'), ('JD', 'justificatif_domicile')] |
|
237 | ||
238 |
for proof_code, proof_attribute in proofs: |
|
239 | ||
240 |
documents = [value for key, value in formdata.items() |
|
241 |
if (key.startswith(proof_attribute) |
|
242 |
and isinstance(value, dict) |
|
243 |
and 'filename' in value |
|
244 |
and 'content' in value)] |
|
245 |
if not documents: |
|
246 |
raise APIError('%s and all its attributes are required' % proof_attribute) |
|
247 |
for document in documents: |
|
248 |
filename, b64_content = document.get('filename'), document.get('content') |
|
249 |
attached_files.append(mdel.AttachedFile(proof_code, filename, b64_content)) |
|
250 | ||
251 |
# process address additional information |
|
252 |
adresse_complement = [] |
|
253 | ||
254 |
complement_keys = sorted([key for key in formdata if key.startswith('adresse_complement')]) |
|
255 | ||
256 |
for key in complement_keys: |
|
257 |
adresse_complement.append(formdata[key]) |
|
258 | ||
259 |
if adresse_complement: |
|
260 |
formdata['adresse_complement'] = ', '.join(adresse_complement) |
|
261 | ||
262 |
# get contact info |
|
263 |
contacts = [('TEL', 'contact_telephone'), ('EMAIL', 'contact_email')] |
|
264 | ||
265 |
for contact_code, contact_uri in contacts: |
|
266 |
uri = formdata.get(contact_uri) |
|
267 |
if uri: |
|
268 |
formdata['contact_uri'] = uri |
|
269 |
formdata['contact_code'] = contact_code |
|
270 | ||
271 |
# mdel protocol only supports two values here (prem & cci) |
|
272 |
# but we may want the form to have more specific values; |
|
273 |
# we name them (prem|cci)_suffix and squeeze the suffix here. |
|
274 |
if not formdata.get('anterieur_situation_raw', None): |
|
275 |
raise APIError('anterieur_situation_raw is required') |
|
276 | ||
277 |
formdata['anterieur_situation_raw'] = formdata.get('anterieur_situation_raw').split('_')[0] |
|
278 | ||
279 |
doc = mdel.ILEData(self.demand_id, formdata) |
|
280 |
doc.save(inputs_dir) |
|
281 | ||
282 |
for attached_file in attached_files: |
|
283 |
attached_file.save(inputs_dir) |
|
284 | ||
285 |
def create_zip_aec_la(self, formdata, attached_files, inputs_dir): |
|
286 |
# Set date format |
|
287 |
if formdata.get('aec_type_raw') != 'NAISSANCE' and not formdata.get('date_acte'): |
|
288 |
raise APIError('<date_acte> is required') |
|
289 |
if formdata.get('date_acte'): |
|
290 |
formdata['date_acte'] = parse_date(formdata['date_acte']) |
|
291 |
if formdata.get('titulaire_naiss_date'): |
|
292 |
formdata['titulaire_naiss_date'] = parse_date(formdata['titulaire_naiss_date']) |
|
293 |
if formdata.get('titulaire2_naiss_date'): |
|
294 |
formdata['titulaire2_naiss_date'] = parse_date(formdata['titulaire2_naiss_date']) |
|
295 | ||
296 |
# Ensuring that all titles are uppercase |
|
297 |
for key in formdata.keys(): |
|
298 |
if 'civilite' in key: |
|
299 |
formdata[key] = formdata[key].upper() |
|
300 | ||
301 |
# Merging street number and street name |
|
302 |
demandeur_adresse_voie = formdata.get('demandeur_adresse_voie') |
|
303 |
demandeur_adresse_num = formdata.get('demandeur_adresse_num') |
|
304 |
if demandeur_adresse_voie and demandeur_adresse_num: |
|
305 |
formdata['demandeur_adresse_voie'] = '%s %s' % (demandeur_adresse_num, demandeur_adresse_voie) |
|
306 | ||
307 |
# Set foreign address if country is not France |
|
308 |
adresse_keys = ['etage', 'batiment', 'voie', 'code_postal', 'ville'] |
|
309 |
adresse_keys = ['demandeur_adresse_%s' % key for key in adresse_keys] |
|
310 |
demandeur_adresse_pays_raw = formdata.get('demandeur_adresse_pays_raw') |
|
311 |
demandeur_adresse_etrangere = formdata.get('demandeur_adresse_etrangere') |
|
312 |
demandeur_adresse_etrangere_pays_raw = formdata.get('demandeur_adresse_etrangere_pays_raw') |
|
313 |
if (demandeur_adresse_etrangere_pays_raw |
|
314 |
or (demandeur_adresse_pays_raw and demandeur_adresse_pays_raw != 'FRA')): |
|
315 |
formdata.pop('demandeur_adresse_pays_raw', None) |
|
316 |
if not demandeur_adresse_etrangere_pays_raw: |
|
317 |
formdata['demandeur_adresse_etrangere_pays_raw'] = demandeur_adresse_pays_raw |
|
318 |
if demandeur_adresse_etrangere: |
|
319 |
# dismiss french address if the foreign one is filled |
|
320 |
for key in adresse_keys: |
|
321 |
if key in formdata: |
|
322 |
del formdata[key] |
|
323 |
else: |
|
324 |
# build foreign address from french address fields |
|
325 |
adresse_etrangere = [] |
|
326 |
for key in adresse_keys: |
|
327 |
value = formdata.pop(key, '') |
|
328 |
if value: |
|
329 |
if key != 'demandeur_adresse_ville': |
|
330 |
adresse_etrangere.append(value) |
|
331 |
else: |
|
332 |
adresse_etrangere[-1] += ' %s' % value |
|
333 |
formdata['demandeur_adresse_etrangere'] = ', '.join(adresse_etrangere) |
|
334 | ||
335 |
# Set aec_nature if aec_type_raw == DECES |
|
336 |
if formdata.get('aec_type_raw') == 'DECES' and not formdata.get('aec_nature_raw'): |
|
337 |
formdata['aec_nature'] = u'Copie integrale' |
|
338 |
formdata['aec_nature_raw'] = 'COPIE-INTEGRALE' |
|
339 | ||
340 |
# Set motif_demand if none |
|
341 |
if not formdata.get('motif_demande'): |
|
342 |
formdata['motif_demande'] = 'Autre' |
|
343 |
formdata['motif_demande_raw'] = 'Autre' |
|
344 | ||
345 |
# handling requester when 'Autre' |
|
346 |
if formdata.get('qualite_demandeur_autre') and not formdata.get('qualite_demandeur_raw'): |
|
347 |
formdata['qualite_demandeur'] = formdata['qualite_demandeur_autre'] |
|
348 |
formdata['qualite_demandeur_raw'] = 'Autre' |
|
349 | ||
350 |
doc = mdel.AECData(self.demand_id, formdata) |
|
351 |
doc.save(inputs_dir) |
|
352 | ||
353 |
def create_zip(self, inputs_dir, formdata): |
|
184 | 354 |
""" |
185 | 355 |
Creates demand as zip folder |
186 | 356 |
""" |
... | ... | |
189 | 359 |
flow_type = self.flow_type |
190 | 360 |
demand_num = self.num |
191 | 361 | |
192 |
resource_base_dir = mdel.get_resource_base_dir() |
|
193 | ||
194 |
inputs_dir = os.path.join(resource_base_dir, self.resource.slug, |
|
195 |
'inputs', self.name) |
|
196 | ||
197 | 362 |
attached_files = [] |
198 | 363 | |
199 |
if flow_type == 'ILE-LA': |
|
200 | ||
201 |
proofs = [('JI', 'justificatif_identite'), ('JD', 'justificatif_domicile')] |
|
202 | ||
203 |
for proof_code, proof_attribute in proofs: |
|
204 | ||
205 |
documents = [value for key, value in formdata.items() |
|
206 |
if key.startswith(proof_attribute) and isinstance(value, dict) and 'filename' in value and 'content' in value] |
|
207 |
if not documents: |
|
208 |
raise APIError('%s and all its attributes are required' % proof_attribute) |
|
209 |
for document in documents: |
|
210 |
filename, b64_content = document.get('filename'), document.get('content') |
|
211 |
attached_files.append(mdel.AttachedFile(proof_code, filename, b64_content)) |
|
212 | ||
213 |
# process address additional information |
|
214 |
adresse_complement = [] |
|
215 | ||
216 |
complement_keys = sorted([key for key in formdata if key.startswith('adresse_complement')]) |
|
217 | ||
218 |
for key in complement_keys: |
|
219 |
adresse_complement.append(formdata[key]) |
|
220 | ||
221 |
if adresse_complement: |
|
222 |
formdata['adresse_complement'] = ', '.join(adresse_complement) |
|
223 | ||
224 |
# get contact info |
|
225 |
contacts = [('TEL', 'contact_telephone'), ('EMAIL', 'contact_email')] |
|
226 | ||
227 |
for contact_code, contact_uri in contacts: |
|
228 |
uri = formdata.get(contact_uri) |
|
229 |
if uri: |
|
230 |
formdata['contact_uri'] = uri |
|
231 |
formdata['contact_code'] = contact_code |
|
232 | ||
233 |
# mdel protocol only supports two values here (prem & cci) |
|
234 |
# but we may want the form to have more specific values; |
|
235 |
# we name them (prem|cci)_suffix and squeeze the suffix here. |
|
236 |
if not formdata.get('anterieur_situation_raw', None): |
|
237 |
raise APIError('anterieur_situation_raw is required') |
|
238 | ||
239 |
formdata['anterieur_situation_raw'] = formdata.get('anterieur_situation_raw').split('_')[0] |
|
240 | ||
241 |
doc = mdel.ILEData(self.demand_id, formdata) |
|
242 |
doc.save(inputs_dir) |
|
243 | ||
244 |
for attached_file in attached_files: |
|
245 |
attached_file.save(inputs_dir) |
|
246 | ||
247 |
elif flow_type == 'RCO-LA': |
|
248 |
raise APIError('RCO-LA processing not implemented') |
|
249 | ||
250 |
else: |
|
251 |
# Set date format |
|
252 |
if formdata.get('aec_type_raw') != 'NAISSANCE' and not formdata.get('date_acte'): |
|
253 |
raise APIError('<date_acte> is required') |
|
254 |
if formdata.get('date_acte'): |
|
255 |
formdata['date_acte'] = parse_date(formdata['date_acte']) |
|
256 |
if formdata.get('titulaire_naiss_date'): |
|
257 |
formdata['titulaire_naiss_date'] = parse_date(formdata['titulaire_naiss_date']) |
|
258 |
if formdata.get('titulaire2_naiss_date'): |
|
259 |
formdata['titulaire2_naiss_date'] = parse_date(formdata['titulaire2_naiss_date']) |
|
260 | ||
261 |
# Ensuring that all titles are uppercase |
|
262 |
for key in formdata.keys(): |
|
263 |
if 'civilite' in key: |
|
264 |
formdata[key] = formdata[key].upper() |
|
265 | ||
266 |
# Merging street number and street name |
|
267 |
demandeur_adresse_voie = formdata.get('demandeur_adresse_voie') |
|
268 |
demandeur_adresse_num = formdata.get('demandeur_adresse_num') |
|
269 |
if demandeur_adresse_voie and demandeur_adresse_num: |
|
270 |
formdata['demandeur_adresse_voie'] = '%s %s' % (demandeur_adresse_num, demandeur_adresse_voie) |
|
271 | ||
272 |
# Set foreign address if country is not France |
|
273 |
adresse_keys = ['etage', 'batiment', 'voie', 'code_postal', 'ville'] |
|
274 |
adresse_keys = ['demandeur_adresse_%s' % key for key in adresse_keys] |
|
275 |
demandeur_adresse_pays_raw = formdata.get('demandeur_adresse_pays_raw') |
|
276 |
demandeur_adresse_etrangere = formdata.get('demandeur_adresse_etrangere') |
|
277 |
demandeur_adresse_etrangere_pays_raw = formdata.get('demandeur_adresse_etrangere_pays_raw') |
|
278 |
if (demandeur_adresse_etrangere_pays_raw |
|
279 |
or (demandeur_adresse_pays_raw and demandeur_adresse_pays_raw != 'FRA')): |
|
280 |
formdata.pop('demandeur_adresse_pays_raw', None) |
|
281 |
if not demandeur_adresse_etrangere_pays_raw: |
|
282 |
formdata['demandeur_adresse_etrangere_pays_raw'] = demandeur_adresse_pays_raw |
|
283 |
if demandeur_adresse_etrangere: |
|
284 |
# dismiss french address if the foreign one is filled |
|
285 |
for key in adresse_keys: |
|
286 |
if key in formdata: |
|
287 |
del formdata[key] |
|
288 |
else: |
|
289 |
# build foreign address from french address fields |
|
290 |
adresse_etrangere = [] |
|
291 |
for key in adresse_keys: |
|
292 |
value = formdata.pop(key, '') |
|
293 |
if value: |
|
294 |
if key != 'demandeur_adresse_ville': |
|
295 |
adresse_etrangere.append(value) |
|
296 |
else: |
|
297 |
adresse_etrangere[-1] += ' %s' % value |
|
298 |
formdata['demandeur_adresse_etrangere'] = ', '.join(adresse_etrangere) |
|
299 | ||
300 |
# Set aec_nature if aec_type_raw == DECES |
|
301 |
if formdata.get('aec_type_raw') == 'DECES' and not formdata.get('aec_nature_raw'): |
|
302 |
formdata['aec_nature'] = u'Copie integrale' |
|
303 |
formdata['aec_nature_raw'] = 'COPIE-INTEGRALE' |
|
304 | ||
305 |
# Set motif_demand if none |
|
306 |
if not formdata.get('motif_demande'): |
|
307 |
formdata['motif_demande'] = 'Autre' |
|
308 |
formdata['motif_demande_raw'] = 'Autre' |
|
309 | ||
310 |
# handling requester when 'Autre' |
|
311 |
if formdata.get('qualite_demandeur_autre') and not formdata.get('qualite_demandeur_raw'): |
|
312 |
formdata['qualite_demandeur'] = formdata['qualite_demandeur_autre'] |
|
313 |
formdata['qualite_demandeur_raw'] = 'Autre' |
|
314 | ||
315 |
doc = mdel.AECData(self.demand_id, formdata) |
|
316 |
doc.save(inputs_dir) |
|
364 |
# create main document |
|
365 |
try: |
|
366 |
method = getattr(self, 'create_zip_%s' % flow_type.lower().replace('-', '_')) |
|
367 |
except AttributeError: |
|
368 |
raise APIError('flow type %s is unsupported' % flow_type) |
|
369 |
method(formdata, attached_files, inputs_dir=inputs_dir) |
|
317 | 370 | |
318 | 371 |
submission_date = formdata.get('receipt_time', None) |
319 | 372 | |
... | ... | |
339 | 392 | |
340 | 393 |
namespace = {'ns2': 'http://finances.gouv.fr/dgme/pec/message/v1'} |
341 | 394 | |
342 |
resource_base_dir = mdel.get_resource_base_dir() |
|
343 |
output_dir = os.path.join(resource_base_dir, self.resource.slug, 'outputs') |
|
344 | ||
345 |
if not os.path.exists(output_dir): |
|
346 |
os.makedirs(output_dir) |
|
347 | ||
348 | 395 |
# list all demand response zip files |
349 | 396 |
zip_files = {} |
350 |
for zfile in os.listdir(output_dir):
|
|
397 |
for zfile in os.listdir(self.resource.outputs_dir):
|
|
351 | 398 |
if zfile.lower().startswith(self.demand_id.lower()) and zfile.lower().endswith('.zip'): |
352 | 399 |
fname = os.path.splitext(zfile)[0] |
353 | 400 |
try: |
... | ... | |
363 | 410 |
else: |
364 | 411 |
return result |
365 | 412 | |
366 |
path = os.path.join(output_dir, zip_file)
|
|
413 |
path = os.path.join(self.resource.outputs_dir, zip_file)
|
|
367 | 414 | |
368 | 415 |
content = get_file_content_from_zip(path, 'message.xml') |
369 | 416 |
element = mdel.etree.fromstring(content) |
passerelle/apps/mdel/utils.py | ||
---|---|---|
1 |
# Passerelle - uniform access to data and services
|
|
1 |
# passerelle - uniform access to data and services
|
|
2 | 2 |
# Copyright (C) 2016 Entr'ouvert |
3 | 3 |
# |
4 | 4 |
# This program is free software: you can redistribute it and/or modify it |
... | ... | |
14 | 14 |
# You should have received a copy of the GNU Affero General Public License |
15 | 15 |
# along with this program. If not, see <http://www.gnu.org/licenses/>. |
16 | 16 | |
17 |
from __future__ import unicode_literals |
|
18 | ||
17 | 19 |
import os |
18 | 20 |
import zipfile |
19 | 21 |
from xml.etree import ElementTree as etree |
... | ... | |
22 | 24 | |
23 | 25 |
from passerelle.utils.jsonresponse import APIError |
24 | 26 | |
27 | ||
25 | 28 |
def parse_date(date): |
26 | 29 |
try: |
27 | 30 |
parsed_date = django_parse_date(date) |
28 | 31 |
except ValueError as e: |
29 |
raise APIError('Invalid date: %r (%r)' % ( date, e))
|
|
32 |
raise APIError('invalid date: %s (%s)' % (date, e))
|
|
30 | 33 |
if not parsed_date: |
31 | 34 |
raise APIError('date %r not iso-formated' % date) |
32 | 35 |
return parsed_date.isoformat() |
33 | 36 | |
34 | 37 | |
35 | 38 |
class ElementFactory(etree.Element): |
36 | ||
37 | 39 |
def __init__(self, *args, **kwargs): |
38 | 40 |
self.text = kwargs.pop('text', None) |
39 | 41 |
namespace = kwargs.pop('namespace', None) |
... | ... | |
46 | 48 |
super(ElementFactory, self).__init__(*args, **kwargs) |
47 | 49 | |
48 | 50 |
def append(self, element, allow_new=True): |
49 | ||
50 | 51 |
if not allow_new: |
51 | 52 |
if isinstance(element.tag, etree.QName): |
52 | 53 |
found = self.find(element.tag.text) |
53 | 54 |
else: |
54 | 55 |
found = self.find(element.tag) |
55 | ||
56 | 56 |
if found is not None: |
57 | 57 |
return self |
58 | ||
59 | 58 |
super(ElementFactory, self).append(element) |
60 | 59 |
return self |
61 | 60 |
tests/test_mdel.py | ||
---|---|---|
17 | 17 | |
18 | 18 |
from __future__ import unicode_literals |
19 | 19 | |
20 |
import shutil
|
|
20 |
import contextlib
|
|
21 | 21 |
import os |
22 | 22 |
import json |
23 | 23 |
import base64 |
24 | 24 |
import copy |
25 | ||
25 |
import zipfile |
|
26 | 26 |
from xml.etree import ElementTree as etree |
27 | ||
28 |
from distutils.dir_util import copy_tree |
|
27 | 29 |
from lxml import etree as letree |
28 | 30 | |
29 | 31 |
import pytest |
30 | 32 | |
31 | 33 |
from passerelle.apps.mdel.models import MDEL, Demand |
32 |
from passerelle.apps.mdel.mdel import Message, Description, AttachedFile, get_resource_base_dir
|
|
34 |
from passerelle.apps.mdel.mdel import Message, Description, AttachedFile |
|
33 | 35 |
from passerelle.apps.mdel.utils import parse_date |
34 | 36 | |
35 | 37 |
import utils |
... | ... | |
52 | 54 |
xsd = letree.parse(path).getroot() |
53 | 55 |
schema = letree.XMLSchema(xsd) |
54 | 56 |
parser = letree.XMLParser(schema=schema) |
55 |
letree.parse(doc, parser=parser) # Will raise an exception if schema not respected
|
|
57 |
return letree.parse(doc, parser=parser).getroot() # Will raise an exception if schema not respected
|
|
56 | 58 |
return validate_schema |
57 | 59 |
return fixture |
58 | 60 | |
... | ... | |
71 | 73 |
return utils.setup_access_rights(MDEL.objects.create(slug='test')) |
72 | 74 | |
73 | 75 | |
76 |
@contextlib.contextmanager |
|
77 |
def get_zip(mdel, base): |
|
78 |
with zipfile.ZipFile( |
|
79 |
os.path.join( |
|
80 |
mdel.inputs_dir, |
|
81 |
base + '.zip')) as zip_fd: |
|
82 |
yield zip_fd |
|
83 | ||
84 | ||
85 |
@contextlib.contextmanager |
|
86 |
def get_inputs(mdel, base, name): |
|
87 |
with get_zip(mdel, base) as zip_fd: |
|
88 |
yield zip_fd.open(name) |
|
89 | ||
90 | ||
74 | 91 |
def test_message(): |
75 | 92 |
ns = {'mdel': 'http://finances.gouv.fr/dgme/pec/message/v1'} |
76 | 93 |
xmlns = 'http://finances.gouv.fr/dgme/gf/composants/teledemarchexml/donnee/metier' |
... | ... | |
105 | 122 |
assert xml.find('Teledemarche/IdentifiantPlateforme').text == '1' |
106 | 123 | |
107 | 124 | |
108 |
def test_invalid_demand_type(app, ile_payload): |
|
109 |
ILE_PAYLOAD_INVALID = copy.deepcopy(ile_payload) |
|
110 |
ILE_PAYLOAD_INVALID['extra']['demand_type'] = 'whatever' |
|
111 |
resp = app.post_json('/mdel/test/create', params=ILE_PAYLOAD_INVALID, status=200) |
|
112 |
assert resp.json['err_desc'] == "demand_type must be : ['ILE-LA', 'RCO-LA', 'AEC-LA']" |
|
113 | ||
114 | ||
115 | 125 |
def test_invalid_demand_no_form_number(app, ile_payload): |
116 | 126 |
ILE_PAYLOAD_INVALID_NO = copy.deepcopy(ile_payload) |
117 | 127 |
ILE_PAYLOAD_INVALID_NO.pop('display_id') |
... | ... | |
123 | 133 |
RCO_PAYLOAD = copy.deepcopy(ile_payload) |
124 | 134 |
RCO_PAYLOAD['extra']['demand_type'] = 'rco-la' |
125 | 135 |
resp = app.post_json('/mdel/test/create', params=RCO_PAYLOAD, status=200) |
126 |
assert resp.json['err_desc'] == "RCO-LA processing not implemented"
|
|
136 |
assert resp.json['err_desc'] == "demand_type must be one of : ILE-LA, AEC-LA"
|
|
127 | 137 | |
128 | 138 | |
129 |
def test_aec_naissance(app, aec_schema, aec_naissance_payload): |
|
139 |
def test_aec_naissance(app, mdel, aec_schema, aec_naissance_payload):
|
|
130 | 140 |
resp = app.post_json('/mdel/test/create', params=aec_naissance_payload, status=200) |
131 | 141 | |
132 | 142 |
assert resp.json['data']['demand_id'] == '15-4-AEC-LA' |
133 | 143 | |
134 |
doc = os.path.join(get_resource_base_dir(), 'test', 'inputs', '15-4-AEC-LA--0', '15-4-AEC-LA-doc-.xml') |
|
135 |
aec_schema(doc) |
|
136 | ||
137 |
root = etree.parse(doc).getroot() |
|
144 |
with get_inputs(mdel, '15-4-AEC-LA--0', '15-4-AEC-LA-doc-.xml') as fd: |
|
145 |
root = aec_schema(fd) |
|
138 | 146 | |
139 | 147 |
assert root.tag == 'EnveloppeMetierType' |
140 | 148 | |
... | ... | |
178 | 186 |
assert root.find('DemandeActe/Titulaire/Filiation/Pere/Prenoms').text == 'Fritz' |
179 | 187 | |
180 | 188 | |
181 |
def test_aec_naissance_etranger(app, aec_schema, aec_naissance_etranger_payload): |
|
189 |
def test_aec_naissance_etranger(app, mdel, aec_schema, aec_naissance_etranger_payload):
|
|
182 | 190 |
resp = app.post_json('/mdel/test/create', params=aec_naissance_etranger_payload, status=200) |
183 | 191 | |
184 | 192 |
assert resp.json['data']['demand_id'] == '1-4-AEC-LA' |
185 | 193 | |
186 |
doc = os.path.join(get_resource_base_dir(), 'test', 'inputs', '1-4-AEC-LA--0', '1-4-AEC-LA-doc-.xml') |
|
187 |
aec_schema(doc) |
|
188 | ||
189 |
root = etree.parse(doc).getroot() |
|
194 |
with get_inputs(mdel, '1-4-AEC-LA--0', '1-4-AEC-LA-doc-.xml') as fd: |
|
195 |
root = aec_schema(fd) |
|
190 | 196 | |
191 | 197 |
assert root.tag == 'EnveloppeMetierType' |
192 | 198 | |
... | ... | |
197 | 203 |
== '4 rue des coquelicots, 3800 Bern') |
198 | 204 | |
199 | 205 | |
200 |
def test_aec_mariage(app, aec_schema, aec_mariage_payload): |
|
206 |
def test_aec_mariage(app, mdel, aec_schema, aec_mariage_payload):
|
|
201 | 207 |
resp = app.post_json('/mdel/test/create', params=aec_mariage_payload, status=200) |
202 | 208 | |
203 | 209 |
assert resp.json['data']['demand_id'] == '16-1-AEC-LA' |
204 | 210 | |
205 |
doc = os.path.join(get_resource_base_dir(), 'test', 'inputs', '16-1-AEC-LA--0', '16-1-AEC-LA-doc-.xml') |
|
206 | ||
207 |
aec_schema(doc) |
|
208 | ||
209 |
root = etree.parse(doc).getroot() |
|
211 |
with get_inputs(mdel, '16-1-AEC-LA--0', '16-1-AEC-LA-doc-.xml') as fd: |
|
212 |
root = aec_schema(fd) |
|
210 | 213 | |
211 | 214 |
assert root.tag == 'EnveloppeMetierType' |
212 | 215 | |
... | ... | |
255 | 258 |
assert root.find('DemandeActe/Titulaire2/Filiation/Pere/Prenoms').text == 'Pascal' |
256 | 259 | |
257 | 260 | |
258 |
def test_aec_deces(app, aec_schema, aec_deces_payload): |
|
261 |
def test_aec_deces(app, mdel, aec_schema, aec_deces_payload):
|
|
259 | 262 |
resp = app.post_json('/mdel/test/create', params=aec_deces_payload, status=200) |
260 | 263 | |
261 | 264 |
assert resp.json['data']['demand_id'] == '17-1-AEC-LA' |
262 | 265 | |
263 |
doc = os.path.join(get_resource_base_dir(), 'test', 'inputs', '17-1-AEC-LA--0', '17-1-AEC-LA-doc-.xml') |
|
264 |
aec_schema(doc) |
|
265 | ||
266 |
root = etree.parse(doc).getroot() |
|
266 |
with get_inputs(mdel, '17-1-AEC-LA--0', '17-1-AEC-LA-doc-.xml') as fd: |
|
267 |
root = aec_schema(fd) |
|
267 | 268 | |
268 | 269 |
assert root.tag == 'EnveloppeMetierType' |
269 | 270 | |
... | ... | |
309 | 310 |
assert resp.json['err_desc'] == '<date_acte> is required' |
310 | 311 | |
311 | 312 | |
312 |
def test_ile(app, ile_schema, ile_payload): |
|
313 |
def test_ile(app, mdel, ile_schema, ile_payload):
|
|
313 | 314 |
resp = app.post_json('/mdel/test/create', params=ile_payload, status=200) |
314 | 315 |
assert resp.json['data']['demand_id'] == '1-14-ILE-LA' |
315 | 316 | |
316 |
base_doc = os.path.join(get_resource_base_dir(), 'test', 'inputs', '1-14-ILE-LA--0') |
|
317 |
doc = os.path.join(base_doc, '1-14-ILE-LA-doc-.xml') |
|
318 | ||
319 |
ile_schema(doc) |
|
320 | ||
321 |
root = etree.parse(doc).getroot() |
|
317 |
with get_inputs(mdel, '1-14-ILE-LA--0', '1-14-ILE-LA-doc-.xml') as fd: |
|
318 |
root = ile_schema(fd) |
|
322 | 319 | |
323 | 320 |
assert root.tag == 'AvisDInscription' |
324 | 321 | |
... | ... | |
346 | 343 |
assert root.find('SituationElectoraleAnterieure/PaysUeDerniereInscription/DivisionTerritoriale').text == 'Whatever' |
347 | 344 | |
348 | 345 |
# checking that attached files are referenced in -ent-.xml file |
349 |
desc = os.path.join(base_doc, '1-14-ILE-LA-ent-.xml')
|
|
350 |
root = etree.parse(desc).getroot()
|
|
346 |
with get_inputs(mdel, '1-14-ILE-LA--0', '1-14-ILE-LA-ent-.xml') as fd:
|
|
347 |
root = etree.parse(fd).getroot()
|
|
351 | 348 |
ns = {'ns2': 'http://finances.gouv.fr/dgme/gf/composants/teledemarchexml/donnee/metier'} |
352 | 349 |
assert root.tag == '{%(ns2)s}EnteteMetierEnveloppe' % ns |
353 | 350 |
assert root.find('ns2:Teledemarche/ns2:NumeroTeledemarche', namespaces=ns).text == '1-14' |
... | ... | |
377 | 374 |
'1-14-ILE-LA-ent-.xml', 'mdel_passeport_recto.pdf', |
378 | 375 |
'mdel_passeport_verso.pdf', 'mdel_edf.pdf'] |
379 | 376 | |
380 |
for fname in os.listdir(base_doc): |
|
381 |
assert fname in expected_files |
|
377 |
with get_zip(mdel, '1-14-ILE-LA--0') as zip_fd: |
|
378 |
for fname in zip_fd.namelist(): |
|
379 |
assert fname in expected_files |
|
382 | 380 | |
383 | 381 |
# Without anterieur_situation_raw |
384 | 382 |
payload = copy.deepcopy(ile_payload) |
... | ... | |
402 | 400 | |
403 | 401 | |
404 | 402 |
def test_ile_get_status(app, mdel, ile_payload): |
405 |
shutil.copytree(
|
|
403 |
copy_tree(
|
|
406 | 404 |
os.path.join(data_dir, 'test', 'outputs'), |
407 |
os.path.join(get_resource_base_dir(), 'test', 'outputs'))
|
|
405 |
mdel.outputs_dir)
|
|
408 | 406 |
resp = app.post_json('/mdel/test/create', params=ile_payload, status=200) |
409 | 407 |
demand_id = resp.json['data']['demand_id'] |
410 | 408 |
assert demand_id == '1-14-ILE-LA' |
... | ... | |
443 | 441 | |
444 | 442 | |
445 | 443 |
def test_get_status_no_response(app, mdel): |
446 |
shutil.copytree(
|
|
444 |
copy_tree(
|
|
447 | 445 |
os.path.join(data_dir, 'test', 'outputs'), |
448 |
os.path.join(get_resource_base_dir(), 'test', 'outputs'))
|
|
446 |
mdel.outputs_dir)
|
|
449 | 447 |
Demand.objects.create(resource=mdel, num='1-15', flow_type='ILE-LA', demand_id='1-15-ILE-LA') |
450 | 448 | |
451 | 449 |
resp = app.get('/mdel/test/status', params={'demand_id': '1-15-ILE-LA'}, status=200) |
... | ... | |
458 | 456 | |
459 | 457 | |
460 | 458 |
def test_get_not_closed_status(app, mdel): |
461 |
shutil.copytree(
|
|
459 |
copy_tree(
|
|
462 | 460 |
os.path.join(data_dir, 'test', 'outputs'), |
463 |
os.path.join(get_resource_base_dir(), 'test', 'outputs'))
|
|
461 |
mdel.outputs_dir)
|
|
464 | 462 |
Demand.objects.create(resource=mdel, num='15-9', flow_type='AEC-LA', demand_id='15-9-AEC-LA') |
465 | 463 | |
466 | 464 |
resp = app.get('/mdel/test/status', params={'demand_id': '15-9-AEC-LA'}, status=200) |
467 |
- |