0010-misc-fix-consider-using-with-pylint-error-62099.patch
passerelle/apps/cartads_cs/models.py | ||
---|---|---|
799 | 799 |
def pack(self, dossier_id): |
800 | 800 |
dossier = CartaDSDossier.objects.get(id=dossier_id) |
801 | 801 |
zip_filename = os.path.join(default_storage.path('cartads_cs'), '%s.zip' % dossier.tracking_code) |
802 |
zip_file = zipfile.ZipFile(zip_filename, mode='w') |
|
803 |
liste_pdf = self.liste_pdf(None, dossier.type_dossier_id) |
|
804 |
cerfa_id = liste_pdf['data'][0]['id'] |
|
805 |
for cerfa in liste_pdf['data']: |
|
806 |
if cerfa['id'] == 'AUTRES_DEMANDEURS': |
|
807 |
continue |
|
808 |
cerfa_id = cerfa['id'] |
|
809 |
break |
|
810 |
cerfa_id = cerfa_id.replace('*', '-') |
|
811 |
pieces = self.pieces(None, dossier.type_dossier_id, dossier.objet_demande_id, dossier.tracking_code) |
|
812 |
for piece in pieces['data']: |
|
813 |
cnt = 1 |
|
814 |
for file in piece['files']: |
|
815 |
if not file.get('id'): |
|
802 |
with zipfile.ZipFile(zip_filename, mode='w') as zip_file: |
|
803 |
liste_pdf = self.liste_pdf(None, dossier.type_dossier_id) |
|
804 |
cerfa_id = liste_pdf['data'][0]['id'] |
|
805 |
for cerfa in liste_pdf['data']: |
|
806 |
if cerfa['id'] == 'AUTRES_DEMANDEURS': |
|
816 | 807 |
continue |
817 |
cartads_file = CartaDSFile.objects.get(id=file['id']) |
|
818 |
if piece['id'] == 'cerfa-%s-%s' % (dossier.type_dossier_id, dossier.objet_demande_id): |
|
819 |
zip_file.write(cartads_file.uploaded_file.path, '%s.pdf' % cerfa_id) |
|
820 |
elif piece['id'].startswith('cerfa-autres-'): |
|
821 |
zip_file.write( |
|
822 |
cartads_file.uploaded_file.path, |
|
823 |
'Fiches_complementaires/Cerfa_autres_demandeurs_%d.pdf' % cnt, |
|
824 |
) |
|
825 |
else: |
|
826 |
zip_file.write( |
|
827 |
cartads_file.uploaded_file.path, |
|
828 |
'Pieces/%s-%s%s%s' |
|
829 |
% ( |
|
830 |
piece['id'], |
|
831 |
piece['codePiece'], |
|
832 |
cnt, |
|
833 |
os.path.splitext(cartads_file.uploaded_file.path)[-1], |
|
834 |
), |
|
835 |
) |
|
836 |
cnt += 1 |
|
837 |
zip_file.close() |
|
808 |
cerfa_id = cerfa['id'] |
|
809 |
break |
|
810 |
cerfa_id = cerfa_id.replace('*', '-') |
|
811 |
pieces = self.pieces( |
|
812 |
None, dossier.type_dossier_id, dossier.objet_demande_id, dossier.tracking_code |
|
813 |
) |
|
814 |
for piece in pieces['data']: |
|
815 |
cnt = 1 |
|
816 |
for file in piece['files']: |
|
817 |
if not file.get('id'): |
|
818 |
continue |
|
819 |
cartads_file = CartaDSFile.objects.get(id=file['id']) |
|
820 |
if piece['id'] == 'cerfa-%s-%s' % (dossier.type_dossier_id, dossier.objet_demande_id): |
|
821 |
zip_file.write(cartads_file.uploaded_file.path, '%s.pdf' % cerfa_id) |
|
822 |
elif piece['id'].startswith('cerfa-autres-'): |
|
823 |
zip_file.write( |
|
824 |
cartads_file.uploaded_file.path, |
|
825 |
'Fiches_complementaires/Cerfa_autres_demandeurs_%d.pdf' % cnt, |
|
826 |
) |
|
827 |
else: |
|
828 |
zip_file.write( |
|
829 |
cartads_file.uploaded_file.path, |
|
830 |
'Pieces/%s-%s%s%s' |
|
831 |
% ( |
|
832 |
piece['id'], |
|
833 |
piece['codePiece'], |
|
834 |
cnt, |
|
835 |
os.path.splitext(cartads_file.uploaded_file.path)[-1], |
|
836 |
), |
|
837 |
) |
|
838 |
cnt += 1 |
|
838 | 839 |
dossier.zip_ready = True |
839 | 840 |
dossier.save() |
840 | 841 |
self.add_job('send_to_cartads', dossier_id=dossier.id) |
... | ... | |
843 | 844 |
ftp = FTP(self.ftp_server) |
844 | 845 |
ftp.login(self.ftp_username, self.ftp_password) |
845 | 846 |
ftp.cwd(self.ftp_client_name) |
846 |
ftp.storbinary('STOR %s' % os.path.basename(zip_filename), open(zip_filename, 'rb')) |
|
847 |
ftp.quit() |
|
847 |
with open(zip_filename, 'rb') as fd: |
|
848 |
ftp.storbinary('STOR %s' % os.path.basename(zip_filename), fd) |
|
849 |
ftp.quit() |
|
848 | 850 | |
849 | 851 |
def send_to_cartads(self, dossier_id): |
850 | 852 |
dossier = CartaDSDossier.objects.get(id=dossier_id) |
passerelle/apps/cryptor/models.py | ||
---|---|---|
214 | 214 |
if not os.path.exists(metadata_filename): |
215 | 215 |
raise APIError('unknown uuid', http_status=404) |
216 | 216 | |
217 |
content = read_decrypt(open(content_filename, 'rb'), self.private_key) |
|
217 |
with open(content_filename, 'rb') as fd: |
|
218 |
content = read_decrypt(fd, self.private_key) |
|
218 | 219 | |
219 |
metadata = json.load(open(metadata_filename, 'r')) |
|
220 |
with open(metadata_filename, 'r') as fd: |
|
221 |
metadata = json.load(fd) |
|
220 | 222 |
filename = metadata.get('filename') |
221 | 223 |
content_type = metadata.get('content_type') |
222 | 224 |
passerelle/apps/family/management/commands/import_orleans_data.py | ||
---|---|---|
50 | 50 |
storage = DefaultStorage() |
51 | 51 |
lock_filename = storage.path('family-%s/import-orleans-data.lock' % connector.id) |
52 | 52 |
try: |
53 |
fd = open(lock_filename, 'w') |
|
53 |
fd = open(lock_filename, 'w') # pylint: disable=consider-using-with
|
|
54 | 54 |
fcntl.lockf(fd, fcntl.LOCK_EX | fcntl.LOCK_NB) |
55 | 55 |
except IOError: |
56 | 56 |
raise CommandError('Command already running.') |
57 | 57 | |
58 | 58 |
try: |
59 | 59 |
archive_name = os.path.basename(options['archive_file']) |
60 |
connector.archive.save(archive_name, File(open(options['archive_file'], 'rb'))) |
|
60 |
with open(options['archive_file'], 'rb') as archive_fd: |
|
61 |
connector.archive.save(archive_name, File(archive_fd)) |
|
61 | 62 |
except Exception as e: |
62 | 63 |
raise CommandError('Error occured: %s' % e) |
63 | 64 |
finally: |
passerelle/apps/family/models.py | ||
---|---|---|
174 | 174 |
def clean(self): |
175 | 175 |
if self.archive: |
176 | 176 |
try: |
177 |
archive = zipfile.ZipFile(self.archive) |
|
177 |
with zipfile.ZipFile(self.archive) as archive: |
|
178 |
if self.file_format != 'native': |
|
179 |
modname = 'passerelle.apps.family.loaders.%s' % self.file_format |
|
180 |
__import__(modname) |
|
181 |
module = sys.modules[modname] |
|
182 |
module.Loader(self).clean(archive) |
|
178 | 183 |
except zipfile.BadZipfile: |
179 | 184 |
raise ValidationError(_('Invalid zip file.')) |
180 |
if self.file_format != 'native': |
|
181 |
modname = 'passerelle.apps.family.loaders.%s' % self.file_format |
|
182 |
__import__(modname) |
|
183 |
module = sys.modules[modname] |
|
184 |
module.Loader(self).clean(archive) |
|
185 | 185 | |
186 | 186 |
return super().clean() |
187 | 187 | |
... | ... | |
195 | 195 |
if not os.path.exists(invoices_dir): |
196 | 196 |
os.makedirs(invoices_dir) |
197 | 197 | |
198 |
archive = zipfile.ZipFile(self.archive.path) |
|
199 |
if self.file_format != 'native': |
|
200 |
modname = 'passerelle.apps.family.loaders.%s' % self.file_format |
|
201 |
__import__(modname) |
|
202 |
module = sys.modules[modname] |
|
203 |
module.Loader(self).load(archive) |
|
204 |
return |
|
205 | ||
206 |
archive_files = archive.namelist() |
|
207 | ||
208 |
family_files = [d for d in archive_files if d.endswith('.json')] |
|
209 |
families = [] |
|
210 |
invoices = [] |
|
211 |
children = [] |
|
212 |
adults = [] |
|
213 | ||
214 |
for f in family_files: |
|
215 |
family_data = json_loads(archive.read(f)) |
|
216 |
families.append(family_data['id']) |
|
217 |
address = family_data.get('address') or {} |
|
218 |
family_data.update(address) |
|
219 |
data = dict_cherry_pick( |
|
220 |
family_data, |
|
221 |
( |
|
222 |
'login', |
|
223 |
'password', |
|
224 |
'family_quotient', |
|
225 |
('number', 'street_number'), |
|
226 |
('postal_code', 'zipcode'), |
|
227 |
('street', 'street_name'), |
|
228 |
('complement', 'address_complement'), |
|
229 |
), |
|
230 |
) |
|
231 |
family, dummy = Family.objects.update_or_create( |
|
232 |
external_id=family_data['id'], resource=self, defaults=data |
|
233 |
) |
|
234 |
for adult in family_data.get('adults') or []: |
|
235 |
adults.append(adult['id']) |
|
236 |
adult_address = adult.get('address') or {} |
|
237 |
adult.update(adult_address) |
|
198 |
with zipfile.ZipFile(self.archive.path) as archive: |
|
199 |
if self.file_format != 'native': |
|
200 |
modname = 'passerelle.apps.family.loaders.%s' % self.file_format |
|
201 |
__import__(modname) |
|
202 |
module = sys.modules[modname] |
|
203 |
module.Loader(self).load(archive) |
|
204 |
return |
|
205 | ||
206 |
archive_files = archive.namelist() |
|
207 | ||
208 |
family_files = [d for d in archive_files if d.endswith('.json')] |
|
209 |
families = [] |
|
210 |
invoices = [] |
|
211 |
children = [] |
|
212 |
adults = [] |
|
213 | ||
214 |
for f in family_files: |
|
215 |
family_data = json_loads(archive.read(f)) |
|
216 |
families.append(family_data['id']) |
|
217 |
address = family_data.get('address') or {} |
|
218 |
family_data.update(address) |
|
238 | 219 |
data = dict_cherry_pick( |
239 |
adult,
|
|
220 |
family_data,
|
|
240 | 221 |
( |
241 |
'first_name', |
|
242 |
'last_name', |
|
243 |
'phone', |
|
244 |
('mobile', 'cellphone'), |
|
245 |
'sex', |
|
222 |
'login', |
|
223 |
'password', |
|
224 |
'family_quotient', |
|
246 | 225 |
('number', 'street_number'), |
247 | 226 |
('postal_code', 'zipcode'), |
248 | 227 |
('street', 'street_name'), |
249 | 228 |
('complement', 'address_complement'), |
250 |
'country', |
|
251 |
), |
|
252 |
) |
|
253 |
Adult.objects.update_or_create(family=family, external_id=adult['id'], defaults=data) |
|
254 |
# cleanup adults |
|
255 |
Adult.objects.exclude(external_id__in=adults).delete() |
|
256 | ||
257 |
for child in family_data.get('children') or []: |
|
258 |
children.append(child['id']) |
|
259 |
data = dict_cherry_pick(child, ('first_name', 'last_name', 'sex', 'birthdate')) |
|
260 |
Child.objects.get_or_create(family=family, external_id=child['id'], defaults=data) |
|
261 |
# cleanup children |
|
262 |
Child.objects.exclude(external_id__in=children).delete() |
|
263 | ||
264 |
for invoice in family_data['invoices']: |
|
265 |
invoices.append(invoice['id']) |
|
266 |
data = dict_cherry_pick( |
|
267 |
invoice, |
|
268 |
( |
|
269 |
'label', |
|
270 |
('created', 'issue_date'), |
|
271 |
'pay_limit_date', |
|
272 |
'litigation_date', |
|
273 |
'total_amount', |
|
274 |
'payment_date', |
|
275 |
'amount', |
|
276 |
'autobilling', |
|
277 | 229 |
), |
278 | 230 |
) |
279 |
for date_attribute in data.keys(): |
|
280 |
if not date_attribute.endswith('_date'): |
|
281 |
continue |
|
282 |
if date_attribute == 'payment_date': |
|
283 |
data[date_attribute] = get_datetime(data[date_attribute]) |
|
284 |
else: |
|
285 |
data[date_attribute] = get_date(data[date_attribute]) |
|
286 |
data['paid'] = bool(data.get('payment_date')) |
|
287 |
Invoice.objects.update_or_create( |
|
288 |
resource=self, family=family, external_id=invoice['id'], defaults=data |
|
231 |
family, dummy = Family.objects.update_or_create( |
|
232 |
external_id=family_data['id'], resource=self, defaults=data |
|
289 | 233 |
) |
290 |
if 'invoices/%s.pdf' % invoice['id'] in archive_files: |
|
291 |
with open(os.path.join(invoices_dir, '%s.pdf' % invoice['id']), 'wb') as fp: |
|
292 |
fp.write(archive.read('invoices/%s.pdf' % invoice['id'])) |
|
234 |
for adult in family_data.get('adults') or []: |
|
235 |
adults.append(adult['id']) |
|
236 |
adult_address = adult.get('address') or {} |
|
237 |
adult.update(adult_address) |
|
238 |
data = dict_cherry_pick( |
|
239 |
adult, |
|
240 |
( |
|
241 |
'first_name', |
|
242 |
'last_name', |
|
243 |
'phone', |
|
244 |
('mobile', 'cellphone'), |
|
245 |
'sex', |
|
246 |
('number', 'street_number'), |
|
247 |
('postal_code', 'zipcode'), |
|
248 |
('street', 'street_name'), |
|
249 |
('complement', 'address_complement'), |
|
250 |
'country', |
|
251 |
), |
|
252 |
) |
|
253 |
Adult.objects.update_or_create(family=family, external_id=adult['id'], defaults=data) |
|
254 |
# cleanup adults |
|
255 |
Adult.objects.exclude(external_id__in=adults).delete() |
|
256 | ||
257 |
for child in family_data.get('children') or []: |
|
258 |
children.append(child['id']) |
|
259 |
data = dict_cherry_pick(child, ('first_name', 'last_name', 'sex', 'birthdate')) |
|
260 |
Child.objects.get_or_create(family=family, external_id=child['id'], defaults=data) |
|
261 |
# cleanup children |
|
262 |
Child.objects.exclude(external_id__in=children).delete() |
|
263 | ||
264 |
for invoice in family_data['invoices']: |
|
265 |
invoices.append(invoice['id']) |
|
266 |
data = dict_cherry_pick( |
|
267 |
invoice, |
|
268 |
( |
|
269 |
'label', |
|
270 |
('created', 'issue_date'), |
|
271 |
'pay_limit_date', |
|
272 |
'litigation_date', |
|
273 |
'total_amount', |
|
274 |
'payment_date', |
|
275 |
'amount', |
|
276 |
'autobilling', |
|
277 |
), |
|
278 |
) |
|
279 |
for date_attribute in data.keys(): |
|
280 |
if not date_attribute.endswith('_date'): |
|
281 |
continue |
|
282 |
if date_attribute == 'payment_date': |
|
283 |
data[date_attribute] = get_datetime(data[date_attribute]) |
|
284 |
else: |
|
285 |
data[date_attribute] = get_date(data[date_attribute]) |
|
286 |
data['paid'] = bool(data.get('payment_date')) |
|
287 |
Invoice.objects.update_or_create( |
|
288 |
resource=self, family=family, external_id=invoice['id'], defaults=data |
|
289 |
) |
|
290 |
if 'invoices/%s.pdf' % invoice['id'] in archive_files: |
|
291 |
with open(os.path.join(invoices_dir, '%s.pdf' % invoice['id']), 'wb') as fp: |
|
292 |
fp.write(archive.read('invoices/%s.pdf' % invoice['id'])) |
|
293 | 293 | |
294 | 294 |
# cleanup invoices |
295 | 295 |
Invoice.objects.exclude(external_id__in=invoices).delete() |
... | ... | |
526 | 526 |
if not self.has_pdf: |
527 | 527 |
raise Http404(_('PDF file not found')) |
528 | 528 | |
529 |
response = HttpResponse(open(self.pdf_filename(), 'rb').read(), content_type='application/pdf') |
|
529 |
with open(self.pdf_filename(), 'rb') as fd: |
|
530 |
response = HttpResponse(fd.read(), content_type='application/pdf') |
|
530 | 531 |
response['Content-Disposition'] = 'attachment; filename=%s.pdf' % self.external_id |
531 | 532 |
return response |
532 | 533 |
passerelle/apps/sp_fr/models.py | ||
---|---|---|
232 | 232 | |
233 | 233 |
def process(self, fd): |
234 | 234 |
try: |
235 |
archive = zipfile.ZipFile(fd) |
|
236 |
except Exception: |
|
235 |
with zipfile.ZipFile(fd) as archive: |
|
236 |
# sort files |
|
237 |
doc_files = [] |
|
238 |
ent_files = [] |
|
239 |
attachments = {} |
|
240 |
for name in archive.namelist(): |
|
241 |
if ENT_PATTERN.match(name): |
|
242 |
ent_files.append(name) |
|
243 | ||
244 |
if len(ent_files) != 1: |
|
245 |
return False, 'too many/few ent files found: %s' % ent_files |
|
246 | ||
247 |
ent_file = ent_files[0] |
|
248 | ||
249 |
with archive.open(ent_file) as fd: |
|
250 |
document = ET.parse(fd) |
|
251 | ||
252 |
for pj_node in PIECE_JOINTE_XPATH(document): |
|
253 |
code = CODE_XPATH(pj_node)[0].text |
|
254 |
code = 'pj_' + code.lower().replace('-', '_') |
|
255 |
fichier = FICHIER_XPATH(pj_node)[0].text |
|
256 |
attachments.setdefault(code, []).append(fichier) |
|
257 |
for doc_node in DOCUMENTS_XPATH(document): |
|
258 |
code = CODE_XPATH(doc_node)[0].text |
|
259 |
code = 'doc_' + code.lower().replace('-', '_') |
|
260 |
fichier = FICHIER_DONNEES_XPATH(doc_node)[0].text |
|
261 |
attachments.setdefault(code, []).append(fichier) |
|
262 | ||
263 |
doc_files = [ |
|
264 |
value for l in attachments.values() for value in l if value.lower().endswith('.xml') |
|
265 |
] |
|
266 |
if len(doc_files) != 1: |
|
267 |
return False, 'too many/few doc files found: %s' % doc_files |
|
268 | ||
269 |
for key in attachments: |
|
270 |
if len(attachments[key]) > 1: |
|
271 |
return False, 'too many attachments of kind %s: %r' % (key, attachments[key]) |
|
272 |
name = attachments[key][0] |
|
273 |
with archive.open(attachments[key][0]) as zip_fd: |
|
274 |
content = zip_fd.read() |
|
275 |
attachments[key] = { |
|
276 |
'filename': name, |
|
277 |
'content': base64.b64encode(content).decode('ascii'), |
|
278 |
'content_type': 'application/octet-stream', |
|
279 |
} |
|
280 | ||
281 |
if self.procedure == PROCEDURE_RCO and not attachments: |
|
282 |
return False, 'no attachments but RCO requires them' |
|
283 | ||
284 |
doc_file = doc_files[0] |
|
285 | ||
286 |
insee_codes = ROUTAGE_XPATH(document) |
|
287 |
if len(insee_codes) != 1: |
|
288 |
return False, 'too many/few insee codes found: %s' % insee_codes |
|
289 |
insee_code = insee_codes[0] |
|
290 | ||
291 |
email = EMAIL_XPATH(document) |
|
292 |
email = email[0] if email else '' |
|
293 | ||
294 |
data = { |
|
295 |
'insee_code': insee_code, |
|
296 |
'email': email, |
|
297 |
} |
|
298 |
data.update(attachments) |
|
299 | ||
300 |
with archive.open(doc_file) as fd: |
|
301 |
document = ET.parse(fd) |
|
302 |
data.update(self.extract_data(document)) |
|
303 |
if hasattr(self, 'update_data_%s' % self.procedure): |
|
304 |
getattr(self, 'update_data_%s' % self.procedure)(data) |
|
305 |
except zipfile.BadZipfile: |
|
237 | 306 |
return False, 'could not load zipfile' |
238 |
# sort files |
|
239 |
doc_files = [] |
|
240 |
ent_files = [] |
|
241 |
attachments = {} |
|
242 |
for name in archive.namelist(): |
|
243 |
if ENT_PATTERN.match(name): |
|
244 |
ent_files.append(name) |
|
245 | ||
246 |
if len(ent_files) != 1: |
|
247 |
return False, 'too many/few ent files found: %s' % ent_files |
|
248 | ||
249 |
ent_file = ent_files[0] |
|
250 | ||
251 |
with archive.open(ent_file) as fd: |
|
252 |
document = ET.parse(fd) |
|
253 | ||
254 |
for pj_node in PIECE_JOINTE_XPATH(document): |
|
255 |
code = CODE_XPATH(pj_node)[0].text |
|
256 |
code = 'pj_' + code.lower().replace('-', '_') |
|
257 |
fichier = FICHIER_XPATH(pj_node)[0].text |
|
258 |
attachments.setdefault(code, []).append(fichier) |
|
259 |
for doc_node in DOCUMENTS_XPATH(document): |
|
260 |
code = CODE_XPATH(doc_node)[0].text |
|
261 |
code = 'doc_' + code.lower().replace('-', '_') |
|
262 |
fichier = FICHIER_DONNEES_XPATH(doc_node)[0].text |
|
263 |
attachments.setdefault(code, []).append(fichier) |
|
264 | ||
265 |
doc_files = [value for l in attachments.values() for value in l if value.lower().endswith('.xml')] |
|
266 |
if len(doc_files) != 1: |
|
267 |
return False, 'too many/few doc files found: %s' % doc_files |
|
268 | ||
269 |
for key in attachments: |
|
270 |
if len(attachments[key]) > 1: |
|
271 |
return False, 'too many attachments of kind %s: %r' % (key, attachments[key]) |
|
272 |
name = attachments[key][0] |
|
273 |
with archive.open(attachments[key][0]) as zip_fd: |
|
274 |
content = zip_fd.read() |
|
275 |
attachments[key] = { |
|
276 |
'filename': name, |
|
277 |
'content': base64.b64encode(content).decode('ascii'), |
|
278 |
'content_type': 'application/octet-stream', |
|
279 |
} |
|
280 | ||
281 |
if self.procedure == PROCEDURE_RCO and not attachments: |
|
282 |
return False, 'no attachments but RCO requires them' |
|
283 | ||
284 |
doc_file = doc_files[0] |
|
285 | ||
286 |
insee_codes = ROUTAGE_XPATH(document) |
|
287 |
if len(insee_codes) != 1: |
|
288 |
return False, 'too many/few insee codes found: %s' % insee_codes |
|
289 |
insee_code = insee_codes[0] |
|
290 | ||
291 |
email = EMAIL_XPATH(document) |
|
292 |
email = email[0] if email else '' |
|
293 | ||
294 |
data = { |
|
295 |
'insee_code': insee_code, |
|
296 |
'email': email, |
|
297 |
} |
|
298 |
data.update(attachments) |
|
299 | ||
300 |
with archive.open(doc_file) as fd: |
|
301 |
document = ET.parse(fd) |
|
302 |
data.update(self.extract_data(document)) |
|
303 |
if hasattr(self, 'update_data_%s' % self.procedure): |
|
304 |
getattr(self, 'update_data_%s' % self.procedure)(data) |
|
305 | 307 |
return data, None |
306 | 308 | |
307 | 309 |
def transfer(self, data): |
passerelle/base/management/commands/export_site.py | ||
---|---|---|
17 | 17 | |
18 | 18 |
def handle(self, *args, **options): |
19 | 19 |
if options['output']: |
20 |
output = open(options['output'], 'w') |
|
21 |
else: |
|
22 |
output = sys.stdout |
|
20 |
with open(options['output'], 'w') as output: |
|
21 |
json.dump(export_site(slugs=options['slugs']), output, indent=4) |
|
22 |
return |
|
23 |
output = sys.stdout |
|
23 | 24 |
json.dump(export_site(slugs=options['slugs']), output, indent=4) |
passerelle/utils/zip.py | ||
---|---|---|
244 | 244 | |
245 | 245 | |
246 | 246 |
def diff_zip(one, two): |
247 |
differences = [] |
|
248 | ||
249 | 247 |
def compute_diff(one, two, fd_one, fd_two): |
250 | 248 |
content_one = fd_one.read() |
251 | 249 |
content_two = fd_two.read() |
... | ... | |
257 | 255 |
return ['File %s differs' % one] + diff |
258 | 256 |
return 'File %s differs' % one |
259 | 257 | |
258 |
def run(one, two): |
|
259 |
differences = [] |
|
260 |
with zipfile.ZipFile(one) as one_zip, zipfile.ZipFile(two) as two_zip: |
|
261 |
one_nl = set(one_zip.namelist()) |
|
262 |
two_nl = set(two_zip.namelist()) |
|
263 |
for name in one_nl - two_nl: |
|
264 |
differences.append('File %s only in %s' % (name, one)) |
|
265 |
for name in two_nl - one_nl: |
|
266 |
differences.append('File %s only in %s' % (name, two)) |
|
267 |
for name in one_nl & two_nl: |
|
268 |
with one_zip.open(name) as fd_one: |
|
269 |
with two_zip.open(name) as fd_two: |
|
270 |
difference = compute_diff(name, name, fd_one, fd_two) |
|
271 |
if difference: |
|
272 |
differences.append(difference) |
|
273 | ||
274 |
if not differences: |
|
275 |
# check file order in zip |
|
276 |
one_zip_files = [zi.filename for zi in one_zip.infolist()] |
|
277 |
two_zip_files = [zi.filename for zi in two_zip.infolist()] |
|
278 |
if one_zip_files != two_zip_files: |
|
279 |
differences.append('Files are not in the same order') |
|
280 |
return differences |
|
281 | ||
260 | 282 |
if not hasattr(one, 'read'): |
261 |
one = open(one, mode='rb') |
|
283 |
with open(one, mode='rb') as one: |
|
284 |
if not hasattr(two, 'read'): |
|
285 |
with open(two, 'rb') as two: |
|
286 |
return run(one, two) |
|
287 |
with two: |
|
288 |
return run(one, two) |
|
262 | 289 |
with one: |
263 | 290 |
if not hasattr(two, 'read'): |
264 |
two = open(two, 'rb') |
|
291 |
with open(two, 'rb') as two: |
|
292 |
return run(one, two) |
|
265 | 293 |
with two: |
266 |
with zipfile.ZipFile(one) as one_zip: |
|
267 |
with zipfile.ZipFile(two) as two_zip: |
|
268 |
one_nl = set(one_zip.namelist()) |
|
269 |
two_nl = set(two_zip.namelist()) |
|
270 |
for name in one_nl - two_nl: |
|
271 |
differences.append('File %s only in %s' % (name, one)) |
|
272 |
for name in two_nl - one_nl: |
|
273 |
differences.append('File %s only in %s' % (name, two)) |
|
274 |
for name in one_nl & two_nl: |
|
275 |
with one_zip.open(name) as fd_one: |
|
276 |
with two_zip.open(name) as fd_two: |
|
277 |
difference = compute_diff(name, name, fd_one, fd_two) |
|
278 |
if difference: |
|
279 |
differences.append(difference) |
|
280 | ||
281 |
if not differences: |
|
282 |
# check file order in zip |
|
283 |
one_zip_files = [zi.filename for zi in one_zip.infolist()] |
|
284 |
two_zip_files = [zi.filename for zi in two_zip.infolist()] |
|
285 |
if one_zip_files != two_zip_files: |
|
286 |
differences.append('Files are not in the same order') |
|
287 | ||
288 |
return differences |
|
294 |
return run(one, two) |
tests/test_atos_genesys.py | ||
---|---|---|
20 | 20 | |
21 | 21 |
@pytest.fixture |
22 | 22 |
def mock_codifications_ok(): |
23 |
response = open( |
|
24 |
os.path.join(os.path.dirname(__file__), 'data', 'genesys_select_codifications.xml') |
|
25 |
).read() |
|
23 |
with open(os.path.join(os.path.dirname(__file__), 'data', 'genesys_select_codifications.xml')) as fd: |
|
24 |
response = fd.read() |
|
26 | 25 |
with tests.utils.mock_url(FAKE_URL, response) as mock: |
27 | 26 |
yield mock |
28 | 27 | |
... | ... | |
143 | 142 |
assert Link.objects.count() == 0 |
144 | 143 | |
145 | 144 | |
146 |
RESPONSE_SELECT_USAGER = open( |
|
147 |
os.path.join(os.path.dirname(__file__), 'data', 'genesys_select_usager.xml') |
|
148 |
).read() |
|
145 |
with open(os.path.join(os.path.dirname(__file__), 'data', 'genesys_select_usager.xml')) as fd: |
|
146 |
RESPONSE_SELECT_USAGER = fd.read() |
|
149 | 147 | |
150 | 148 | |
151 | 149 |
def test_ws_dossiers(app, genesys): |
tests/test_cartads_cs.py | ||
---|---|---|
290 | 290 |
resp = app.post(data[0]['files'][0]['url'], upload_files=[('files[]', 'test.pdf', b'%PDF...')]) |
291 | 291 |
assert resp.json == [{'error': 'The CERFA should be a PDF file.'}] |
292 | 292 | |
293 |
pdf_contents = open(os.path.join(os.path.dirname(__file__), 'data', 'minimal.pdf'), 'rb').read() |
|
293 |
with open(os.path.join(os.path.dirname(__file__), 'data', 'minimal.pdf'), 'rb') as fd: |
|
294 |
pdf_contents = fd.read() |
|
294 | 295 |
resp = app.post(data[0]['files'][0]['url'], upload_files=[('files[]', 'test.pdf', pdf_contents)]) |
295 | 296 |
assert resp.json == [{'error': 'The CERFA should not be a scanned document.'}] |
296 | 297 | |
297 |
pdf_contents = open(os.path.join(os.path.dirname(__file__), 'data', 'pdf-form.pdf'), 'rb').read() |
|
298 |
with open(os.path.join(os.path.dirname(__file__), 'data', 'pdf-form.pdf'), 'rb') as fd: |
|
299 |
pdf_contents = fd.read() |
|
298 | 300 |
resp = app.post(data[0]['files'][0]['url'], upload_files=[('files[]', 'test.pdf', pdf_contents)]) |
299 | 301 |
cerfa_token = resp.json[0]['token'] |
300 | 302 | |
... | ... | |
554 | 556 |
assert len(data[0]['files']) == 1 |
555 | 557 |
assert list(data[0]['files'][0].keys()) == ['url'] |
556 | 558 | |
557 |
pdf_contents = open(os.path.join(os.path.dirname(__file__), 'data', 'pdf-form.pdf'), 'rb').read() |
|
559 |
with open(os.path.join(os.path.dirname(__file__), 'data', 'pdf-form.pdf'), 'rb') as fd: |
|
560 |
pdf_contents = fd.read() |
|
558 | 561 |
resp = app.post(data[0]['files'][0]['url'], upload_files=[('files[]', 'test.pdf', pdf_contents)]) |
559 | 562 |
assert resp.json[0]['token'] |
560 | 563 |
assert CartaDSFile.objects.filter(tracking_code=dossier.tracking_code, sent_to_cartads=None).count() == 1 |
... | ... | |
594 | 597 |
assert len(piece['files']) == 1 |
595 | 598 |
assert list(piece['files'][0].keys()) == ['url'] |
596 | 599 | |
597 |
pdf_contents = open(os.path.join(os.path.dirname(__file__), 'data', 'pdf-form.pdf'), 'rb').read() |
|
600 |
with open(os.path.join(os.path.dirname(__file__), 'data', 'pdf-form.pdf'), 'rb') as fd: |
|
601 |
pdf_contents = fd.read() |
|
598 | 602 |
resp = app.post(data[0]['files'][0]['url'], upload_files=[('files[]', 'test.pdf', pdf_contents)]) |
599 | 603 |
assert resp.json[0]['token'] |
600 | 604 |
assert CartaDSFile.objects.filter(tracking_code=dossier.tracking_code, sent_to_cartads=None).count() == 1 |
601 | 605 | |
602 |
pdf_contents = open(os.path.join(os.path.dirname(__file__), 'data', 'pdf-form.pdf'), 'rb').read() |
|
606 |
with open(os.path.join(os.path.dirname(__file__), 'data', 'pdf-form.pdf'), 'rb') as fd: |
|
607 |
pdf_contents = fd.read() |
|
603 | 608 |
resp = app.post(data[1]['files'][0]['url'], upload_files=[('files[]', 'test.pdf', pdf_contents)]) |
604 | 609 |
assert resp.json[0]['token'] |
605 | 610 |
assert CartaDSFile.objects.filter(tracking_code=dossier.tracking_code, sent_to_cartads=None).count() == 2 |
tests/test_cityweb.py | ||
---|---|---|
60 | 60 | |
61 | 61 | |
62 | 62 |
def assert_xml_doc(filename, assertions): |
63 |
schema = etree.XMLSchema(etree.parse(open(os.path.join(get_test_base_dir('cityweb'), 'cityweb.xsd')))) |
|
63 |
with open(os.path.join(get_test_base_dir('cityweb'), 'cityweb.xsd')) as fd: |
|
64 |
schema = etree.XMLSchema(etree.parse(fd)) |
|
64 | 65 | |
65 |
content = open(filename).read() |
|
66 |
with open(filename) as fd: |
|
67 |
content = fd.read() |
|
66 | 68 |
xml_content = etree.fromstring(content) |
67 | 69 |
assert len(xml_content.nsmap) == 1 |
68 | 70 |
assert xml_content.nsmap['xs'] == "http://tempuri.org/XMLSchema.xsd" |
tests/test_cmis.py | ||
---|---|---|
529 | 529 |
"""simulate the 3 (ordered) HTTP queries involved""" |
530 | 530 |
response = {'status': '200'} |
531 | 531 |
if method == 'GET' and uri == 'http://example.com/cmisatom': |
532 |
content = open('%s/tests/data/cmis/cmis1.out.xml' % os.getcwd(), 'rb').read() |
|
532 |
with open('%s/tests/data/cmis/cmis1.out.xml' % os.getcwd(), 'rb') as fd: |
|
533 |
content = fd.read() |
|
533 | 534 |
elif ( |
534 | 535 |
method == 'GET' |
535 | 536 |
and uri |
536 | 537 |
== 'http://example.com/cmisatom/test/path?path=/test-eo&filter=&includeAllowableActions=false&includeACL=false&includePolicyIds=false&includeRelationships=&renditionFilter=' |
537 | 538 |
): |
538 |
content = open('%s/tests/data/cmis/cmis2.out.xml' % os.getcwd(), 'rb').read() |
|
539 |
with open('%s/tests/data/cmis/cmis2.out.xml' % os.getcwd(), 'rb') as fd: |
|
540 |
content = fd.read() |
|
539 | 541 |
elif method == 'POST' and uri == 'http://example.com/cmisatom/test/children?id=L3Rlc3QtZW8%3D': |
540 |
expected_input = open('%s/tests/data/cmis/cmis3.in.xml' % os.getcwd(), 'r').read() |
|
542 |
with open('%s/tests/data/cmis/cmis3.in.xml' % os.getcwd(), 'r') as fd: |
|
543 |
expected_input = fd.read() |
|
541 | 544 |
expected_input = expected_input.replace('\n', '') |
542 | 545 |
expected_input = re.sub('> *<', '><', expected_input) |
543 | 546 |
input1 = ET.tostring(ET.XML(expected_input)) |
... | ... | |
556 | 559 | |
557 | 560 |
if input1 != input2: |
558 | 561 |
raise Exception('expect [[%s]] but get [[%s]]' % (body, expected_input)) |
559 |
content = open('%s/tests/data/cmis/cmis3.out.xml' % os.getcwd(), 'rb').read() |
|
562 |
with open('%s/tests/data/cmis/cmis3.out.xml' % os.getcwd(), 'rb') as fd: |
|
563 |
content = fd.read() |
|
560 | 564 |
else: |
561 | 565 |
raise Exception('my fault error, url is not yet mocked: %s' % uri) |
562 | 566 |
return (response, content) |
tests/test_family.py | ||
---|---|---|
375 | 375 |
resource.clean() |
376 | 376 | |
377 | 377 |
filepath = os.path.join(os.path.dirname(__file__), 'data', 'family_data.zip') |
378 |
resource.archive = File(open(filepath), 'family_data.zip') |
|
379 |
with pytest.raises(ValidationError): |
|
380 |
resource.clean() |
|
378 |
with open(filepath) as fd: |
|
379 |
resource.archive = File(fd, 'family_data.zip') |
|
380 |
with pytest.raises(ValidationError): |
|
381 |
resource.clean() |
|
381 | 382 | |
382 | 383 | |
383 | 384 |
def test_orleans_concerto_loader(): |
384 | 385 |
# all related objects will also be deleted |
385 | 386 |
Family.objects.all().delete() |
386 | 387 |
filepath = os.path.join(os.path.dirname(__file__), 'data', 'orleans', 'family_data_orleans.zip') |
387 |
resource = GenericFamily( |
|
388 |
title='test orleans', |
|
389 |
slug='test-orleans', |
|
390 |
archive=File(open(filepath, 'rb'), 'family_data_orleans.zip'), |
|
391 |
file_format='concerto_orleans', |
|
392 |
) |
|
393 |
from passerelle.apps.family.loaders.concerto_orleans import Loader |
|
394 | ||
395 |
loader = Loader(resource) |
|
396 |
loader.archive = zipfile.ZipFile(filepath) |
|
397 | ||
398 |
families = loader.build_families() |
|
399 |
assert len(families) == 18 |
|
400 |
for family in families.values(): |
|
401 |
assert family['external_id'] |
|
402 |
assert family['adults'] |
|
403 |
assert family['login'] |
|
404 |
assert family['password'] |
|
405 |
assert family['zipcode'] |
|
406 |
assert family['city'] |
|
407 |
assert len(family['adults']) > 0 |
|
408 | ||
409 |
for adult in family['adults']: |
|
410 |
assert adult['first_name'] |
|
411 |
assert adult['last_name'] |
|
412 |
assert adult['sex'] |
|
413 |
assert adult['external_id'] |
|
414 |
assert adult['zipcode'] |
|
415 |
assert adult['city'] |
|
416 |
assert len(family['children']) >= 1 |
|
417 |
for child in family['children']: |
|
418 |
assert child['external_id'] |
|
419 |
assert child['first_name'] |
|
420 |
assert child['last_name'] |
|
421 |
assert child['sex'] |
|
422 |
assert 'birthdate' in child |
|
423 | ||
424 |
assert 'invoices' in family |
|
425 |
if family['invoices']: |
|
426 |
for invoice in family['invoices']: |
|
427 |
assert invoice['external_id'] |
|
428 |
assert invoice['label'] |
|
429 |
assert invoice['issue_date'] |
|
430 |
assert invoice['online_payment'] |
|
431 |
assert 'autobilling' in invoice |
|
432 |
assert 'amount' in invoice |
|
433 |
assert 'total_amount' in invoice |
|
434 | ||
435 |
# there are 4 families with invoices in test data |
|
436 |
assert len([f for f in families.values() if f['invoices']]) == 4 |
|
437 |
# and 14 families with no invoices |
|
438 |
assert len([f for f in families.values() if not f['invoices']]) == 14 |
|
439 |
resource.save() |
|
388 |
with open(filepath, 'rb') as fd: |
|
389 |
resource = GenericFamily( |
|
390 |
title='test orleans', |
|
391 |
slug='test-orleans', |
|
392 |
archive=File(fd, 'family_data_orleans.zip'), |
|
393 |
file_format='concerto_orleans', |
|
394 |
) |
|
395 |
from passerelle.apps.family.loaders.concerto_orleans import Loader |
|
396 | ||
397 |
loader = Loader(resource) |
|
398 |
with zipfile.ZipFile(filepath) as z: |
|
399 |
loader.archive = z |
|
400 | ||
401 |
families = loader.build_families() |
|
402 | ||
403 |
assert len(families) == 18 |
|
404 |
for family in families.values(): |
|
405 |
assert family['external_id'] |
|
406 |
assert family['adults'] |
|
407 |
assert family['login'] |
|
408 |
assert family['password'] |
|
409 |
assert family['zipcode'] |
|
410 |
assert family['city'] |
|
411 |
assert len(family['adults']) > 0 |
|
412 | ||
413 |
for adult in family['adults']: |
|
414 |
assert adult['first_name'] |
|
415 |
assert adult['last_name'] |
|
416 |
assert adult['sex'] |
|
417 |
assert adult['external_id'] |
|
418 |
assert adult['zipcode'] |
|
419 |
assert adult['city'] |
|
420 |
assert len(family['children']) >= 1 |
|
421 |
for child in family['children']: |
|
422 |
assert child['external_id'] |
|
423 |
assert child['first_name'] |
|
424 |
assert child['last_name'] |
|
425 |
assert child['sex'] |
|
426 |
assert 'birthdate' in child |
|
427 | ||
428 |
assert 'invoices' in family |
|
429 |
if family['invoices']: |
|
430 |
for invoice in family['invoices']: |
|
431 |
assert invoice['external_id'] |
|
432 |
assert invoice['label'] |
|
433 |
assert invoice['issue_date'] |
|
434 |
assert invoice['online_payment'] |
|
435 |
assert 'autobilling' in invoice |
|
436 |
assert 'amount' in invoice |
|
437 |
assert 'total_amount' in invoice |
|
438 | ||
439 |
# there are 4 families with invoices in test data |
|
440 |
assert len([f for f in families.values() if f['invoices']]) == 4 |
|
441 |
# and 14 families with no invoices |
|
442 |
assert len([f for f in families.values() if not f['invoices']]) == 14 |
|
443 |
resource.save() |
|
440 | 444 | |
441 | 445 |
assert Family.objects.filter(resource=resource).count() == 18 |
442 | 446 |
assert Adult.objects.all().count() == 31 |
... | ... | |
510 | 514 | |
511 | 515 |
def test_incorrect_orleans_data(caplog): |
512 | 516 |
filepath = os.path.join(os.path.dirname(__file__), 'data', 'family_incorrect_data_orleans.zip') |
513 |
GenericFamily.objects.create( |
|
514 |
title='test orleans', |
|
515 |
slug='test-orleans', |
|
516 |
archive=File(open(filepath, 'rb'), 'family_incorrect_data_orleans.zip'), |
|
517 |
file_format='concerto_orleans', |
|
518 |
) |
|
517 |
with open(filepath, 'rb') as fd: |
|
518 |
GenericFamily.objects.create( |
|
519 |
title='test orleans', |
|
520 |
slug='test-orleans', |
|
521 |
archive=File(fd, 'family_incorrect_data_orleans.zip'), |
|
522 |
file_format='concerto_orleans', |
|
523 |
) |
|
519 | 524 |
for record in caplog.records: |
520 | 525 |
assert 'Error occured while importing data:' in record.message |
521 | 526 |
assert record.name == 'passerelle.resource.family.test-orleans' |
tests/test_generic_endpoint.py | ||
---|---|---|
56 | 56 |
@mock.patch('passerelle.apps.mdel.models.Demand.create_zip', lambda x, y: '1-14-ILE-LA') |
57 | 57 |
def test_generic_payload_logging(caplog, app, mdel): |
58 | 58 |
filename = os.path.join(os.path.dirname(__file__), 'data', 'mdel', 'formdata.json') |
59 |
payload = json.load(open(filename)) |
|
59 |
with open(filename) as fd: |
|
60 |
payload = json.load(fd) |
|
60 | 61 |
resp = app.post_json('/mdel/test/create', params=payload, status=200) |
61 | 62 |
assert resp.json['data']['demand_id'] == '1-14-ILE-LA' |
62 | 63 | |
... | ... | |
82 | 83 | |
83 | 84 |
@mock.patch('passerelle.utils.Request.get') |
84 | 85 |
def test_proxy_logger(mocked_get, caplog, app, arcgis): |
85 |
payload = open(os.path.join(os.path.dirname(__file__), 'data', 'nancy_arcgis', 'sigresponse.json')).read() |
|
86 |
with open(os.path.join(os.path.dirname(__file__), 'data', 'nancy_arcgis', 'sigresponse.json')) as fd: |
|
87 |
payload = fd.read() |
|
86 | 88 |
mocked_get.return_value = tests.utils.FakedResponse(content=payload, status_code=200) |
87 | 89 | |
88 | 90 |
# simple logger |
... | ... | |
160 | 162 | |
161 | 163 |
@mock.patch('requests.Session.send') |
162 | 164 |
def test_proxy_logger_transaction_id(mocked_send, app, arcgis): |
163 |
payload = open(os.path.join(os.path.dirname(__file__), 'data', 'nancy_arcgis', 'sigresponse.json')).read() |
|
165 |
with open(os.path.join(os.path.dirname(__file__), 'data', 'nancy_arcgis', 'sigresponse.json')) as fd: |
|
166 |
payload = fd.read() |
|
164 | 167 |
mocked_send.return_value = tests.utils.FakedResponse(content=payload, status_code=200) |
165 | 168 |
arcgis.log_evel = 'DEBUG' |
166 | 169 |
arcgis.base_url = 'https://example.net/' |
... | ... | |
936 | 939 |
def test_generic_endpoint_superuser_access(db, app, admin_user, simple_user): |
937 | 940 |
MDEL.objects.create(slug='test') |
938 | 941 |
filename = os.path.join(os.path.dirname(__file__), 'data', 'mdel', 'formdata.json') |
939 |
payload = json.load(open(filename)) |
|
942 |
with open(filename) as fd: |
|
943 |
payload = json.load(fd) |
|
940 | 944 | |
941 | 945 |
app = login(app, username='user', password='user') |
942 | 946 |
resp = app.post_json('/mdel/test/create', params=payload, status=403) |
tests/test_grenoble_gru.py | ||
---|---|---|
127 | 127 |
with mock.patch('passerelle.utils.Request.post') as request_post: |
128 | 128 |
response = mock.Mock() |
129 | 129 |
types_filename = os.path.join(os.path.dirname(__file__), 'data', 'grenoble_gru_typologies.xml') |
130 |
types = open(types_filename).read() |
|
130 |
with open(types_filename) as fd: |
|
131 |
types = fd.read() |
|
131 | 132 |
response.content = types.replace('Courrier', 'Courrier & autres') |
132 | 133 |
request_post.return_value = response |
133 | 134 |
endpoint = reverse( |
... | ... | |
145 | 146 |
with mock.patch('passerelle.utils.Request.post') as request_post: |
146 | 147 |
response = mock.Mock() |
147 | 148 |
types_filename = os.path.join(os.path.dirname(__file__), 'data', 'grenoble_gru_typologies.xml') |
148 |
response.content = open(types_filename).read() |
|
149 |
with open(types_filename) as fd: |
|
150 |
response.content = fd.read() |
|
149 | 151 |
request_post.return_value = response |
150 | 152 |
endpoint = reverse( |
151 | 153 |
'generic-endpoint', |
... | ... | |
171 | 173 | |
172 | 174 |
def get_typo_response(): |
173 | 175 |
types_filename = os.path.join(os.path.dirname(__file__), 'data', 'grenoble_gru_typologies.xml') |
174 |
types = open(types_filename).read() |
|
176 |
with open(types_filename) as fd: |
|
177 |
types = fd.read() |
|
175 | 178 |
typo_response = mock.Mock() |
176 | 179 |
typo_response.content = types |
177 | 180 |
return typo_response |
... | ... | |
357 | 360 |
with mock.patch('passerelle.utils.Request.post') as request_post: |
358 | 361 |
response = mock.Mock() |
359 | 362 |
json_response = mock.Mock() |
360 |
json_response.return_value = json.load( |
|
361 |
open(os.path.join(os.path.dirname(__file__), 'data', 'grenoble_gru_pavs.json')) |
|
362 |
) |
|
363 |
with open(os.path.join(os.path.dirname(__file__), 'data', 'grenoble_gru_pavs.json')) as fd: |
|
364 |
json_response.return_value = json.load(fd) |
|
363 | 365 |
response.json = json_response |
364 | 366 |
request_post.return_value = response |
365 | 367 |
response = app.get( |
tests/test_import_export.py | ||
---|---|---|
200 | 200 |
assert Bdp.objects.count() == 1 |
201 | 201 |
bdp.delete() |
202 | 202 |
assert Bdp.objects.count() == 0 |
203 |
import_site(json.load(open(f.name)), overwrite=True) |
|
203 |
with open(f.name) as fd: |
|
204 |
import_site(json.load(fd), overwrite=True) |
|
204 | 205 |
assert Bdp.objects.count() == 1 |
205 | 206 | |
206 | 207 |
tests/test_iparapheur.py | ||
---|---|---|
73 | 73 |
else: |
74 | 74 |
raise Exception('my fault error, url is not yet mocked: %s' % url) |
75 | 75 | |
76 |
response._content = open(target_file, 'rb').read() |
|
76 |
with open(target_file, 'rb') as fd: |
|
77 |
response._content = fd.read() |
|
77 | 78 |
response.status_code = 200 |
78 | 79 |
return response |
79 | 80 | |
... | ... | |
346 | 347 |
response = Response() |
347 | 348 |
response.status_code = 200 |
348 | 349 | |
349 |
soap_response = open( |
|
350 |
os.path.join(os.path.dirname(__file__), 'data/iparapheur_get_file_response.xml'), 'rb' |
|
351 |
).read() |
|
350 |
with open(os.path.join(os.path.dirname(__file__), 'data/iparapheur_get_file_response.xml'), 'rb') as fd: |
|
351 |
soap_response = fd.read() |
|
352 | 352 |
response._content = force_bytes(soap_response) |
353 | 353 |
mocked_post.return_value = response |
354 | 354 | |
... | ... | |
398 | 398 |
response = Response() |
399 | 399 |
response.status_code = 200 |
400 | 400 | |
401 |
soap_response = open( |
|
402 |
os.path.join(os.path.dirname(__file__), 'data/iparapheur_get_file_response.xml'), 'rb' |
|
403 |
).read() |
|
401 |
with open(os.path.join(os.path.dirname(__file__), 'data/iparapheur_get_file_response.xml'), 'rb') as fd: |
|
402 |
soap_response = fd.read() |
|
404 | 403 |
response._content = soap_response |
405 | 404 |
mocked_post.return_value = response |
406 | 405 | |
... | ... | |
421 | 420 |
response = Response() |
422 | 421 |
response.status_code = 200 |
423 | 422 | |
424 |
soap_response = open( |
|
425 |
os.path.join(os.path.dirname(__file__), 'data/iparapheur_get_file_response.xml'), 'rb' |
|
426 |
).read() |
|
423 |
with open(os.path.join(os.path.dirname(__file__), 'data/iparapheur_get_file_response.xml'), 'rb') as fd: |
|
424 |
soap_response = fd.read() |
|
427 | 425 |
response._content = soap_response |
428 | 426 |
mocked_post.return_value = response |
429 | 427 | |
... | ... | |
444 | 442 |
response = Response() |
445 | 443 |
response.status_code = 200 |
446 | 444 | |
447 |
soap_response = open( |
|
448 |
os.path.join(os.path.dirname(__file__), 'data/iparapheur_get_file_response.xml'), 'rb' |
|
449 |
).read() |
|
445 |
with open(os.path.join(os.path.dirname(__file__), 'data/iparapheur_get_file_response.xml'), 'rb') as fd: |
|
446 |
soap_response = fd.read() |
|
450 | 447 |
response._content = soap_response |
451 | 448 |
mocked_post.return_value = response |
452 | 449 | |
... | ... | |
521 | 518 |
) |
522 | 519 |
resp = app.get(url) |
523 | 520 |
assert resp.headers['content-type'] == 'text/xml' |
524 |
assert resp.content == open(wsdl_file(), 'rb').read() |
|
521 |
with open(wsdl_file(), 'rb') as fd: |
|
522 |
assert resp.content == fd.read() |
|
525 | 523 | |
526 | 524 | |
527 | 525 |
@mock.patch('passerelle.utils.Request.get', side_effect=ConnectionError('mocked error')) |
... | ... | |
548 | 546 |
""" |
549 | 547 |
response_xmlmime, response_post = Response(), Response() |
550 | 548 |
response_xmlmime.status_code, response_post.status_code = 200, 200 |
551 |
response_xmlmime._content = open(xmlmime(), 'rb').read() |
|
549 |
with open(xmlmime(), 'rb') as fd: |
|
550 |
response_xmlmime._content = fd.read() |
|
552 | 551 |
response_post._content = force_bytes( |
553 | 552 |
"""<?xml version='1.0' encoding='UTF-8'?><S:Envelope xmlns:S="http://schemas.xmlsoap.org/soap/envelope/"><S:Body><echoResponse xmlns="http://www.adullact.org/spring-ws/iparapheur/1.0" xmlns:xmime="http://www.w3.org/2005/05/xmlmime">[publik_test] m'a dit: "ping"!</echoResponse></S:Body></S:Envelope> |
554 | 553 |
""" |
555 | 554 |
) |
556 |
mocked_load.return_value = open(wsdl_file(), 'rb').read() |
|
555 |
with open(wsdl_file(), 'rb') as fd: |
|
556 |
mocked_load.return_value = fd.read() |
|
557 | 557 |
mocked_get.return_value = response_xmlmime |
558 | 558 |
mocked_post.return_value = response_post |
559 | 559 |
url = reverse( |
tests/test_mdel.py | ||
---|---|---|
57 | 57 | |
58 | 58 |
def check_zip_file(zipdir, expected_files): |
59 | 59 |
# check files order in zip |
60 |
zipres = zipfile.ZipFile(zipdir, 'r')
|
|
61 |
files = [f.filename for f in zipres.infolist()] |
|
60 |
with zipfile.ZipFile(zipdir, 'r') as zipres:
|
|
61 |
files = [f.filename for f in zipres.infolist()]
|
|
62 | 62 |
assert sorted(files[: len(expected_files)]) == sorted(expected_files) |
63 | 63 | |
64 | 64 |
tests/test_utils_conversion.py | ||
---|---|---|
4 | 4 | |
5 | 5 | |
6 | 6 |
def test_pdf_to_pdf_do_nothing(): |
7 |
pdf = open(os.path.join(os.path.dirname(__file__), 'data', 'minimal.pdf'), 'rb').read() |
|
7 |
with open(os.path.join(os.path.dirname(__file__), 'data', 'minimal.pdf'), 'rb') as fd: |
|
8 |
pdf = fd.read() |
|
8 | 9 |
assert to_pdf(pdf) == pdf |
9 |
pdf = open(os.path.join(os.path.dirname(__file__), 'data', 'minimal_bom.pdf'), 'rb').read() |
|
10 |
with open(os.path.join(os.path.dirname(__file__), 'data', 'minimal_bom.pdf'), 'rb') as fd: |
|
11 |
pdf = fd.read() |
|
10 | 12 |
assert to_pdf(pdf) == pdf |
11 |
pdf = open(os.path.join(os.path.dirname(__file__), 'data', 'minimal_bomutf8.pdf'), 'rb').read() |
|
13 |
with open(os.path.join(os.path.dirname(__file__), 'data', 'minimal_bomutf8.pdf'), 'rb') as fd: |
|
14 |
pdf = fd.read() |
|
12 | 15 |
assert to_pdf(pdf) == pdf |
tests/test_utils_zip.py | ||
---|---|---|
146 | 146 |
with full_path.open('rb') as fd: |
147 | 147 |
with zipfile.ZipFile(fd) as zi: |
148 | 148 |
assert zi.namelist() == ['coucou-10-part1.xml', 'coucou-10-dôc.xml'] |
149 |
assert ( |
|
150 |
zi.open('coucou-10-part1.xml').read().decode('utf-8') |
|
151 |
== '<?xml version="1.0"?><body>blabla</body>' |
|
152 |
) |
|
153 |
assert zi.open('coucou-10-dôc.xml').read().decode('utf-8') == '<a>Héllo World!</a>' |
|
149 |
with zi.open('coucou-10-part1.xml') as zfd: |
|
150 |
assert zfd.read().decode('utf-8') == '<?xml version="1.0"?><body>blabla</body>' |
|
151 |
with zi.open('coucou-10-dôc.xml') as zfd: |
|
152 |
assert zfd.read().decode('utf-8') == '<a>Héllo World!</a>' |
|
154 | 153 | |
155 | 154 |
with io.BytesIO(z.render_to_bytes()) as fd: |
156 | 155 |
with zipfile.ZipFile(fd) as zi: |
157 | 156 |
assert zi.namelist() == ['coucou-10-part1.xml', 'coucou-10-dôc.xml'] |
158 |
assert ( |
|
159 |
zi.open('coucou-10-part1.xml').read().decode('utf-8') |
|
160 |
== '<?xml version="1.0"?><body>blabla</body>' |
|
161 |
) |
|
162 |
assert zi.open('coucou-10-dôc.xml').read().decode('utf-8') == '<a>Héllo World!</a>' |
|
157 |
with zi.open('coucou-10-part1.xml') as zfd: |
|
158 |
assert zfd.read().decode('utf-8') == '<?xml version="1.0"?><body>blabla</body>' |
|
159 |
with zi.open('coucou-10-dôc.xml') as zfd: |
|
160 |
assert zfd.read().decode('utf-8') == '<a>Héllo World!</a>' |
|
163 | 161 | |
164 | 162 | |
165 | 163 |
def test_xml_error(tpl_builder, dest): |
166 |
- |