0006-misc-remove-unused-variable-pylint-error-62099.patch
passerelle/apps/arcgis/models.py | ||
---|---|---|
263 | 263 | |
264 | 264 |
def validate_where(format_string): |
265 | 265 |
formatter = SqlFormatter() |
266 |
for prefix, ref, format_spec, conversion in formatter.parse(format_string):
|
|
266 |
for dummy, ref, dummy, dummy in formatter.parse(format_string):
|
|
267 | 267 |
if ref is None: |
268 | 268 |
pass |
269 | 269 |
elif ref == '': |
passerelle/apps/astregs/models.py | ||
---|---|---|
732 | 732 |
contact = self.call('Contact', 'Creation', Contact=post_data) |
733 | 733 |
# address should be set separatedly |
734 | 734 |
post_data['EncodeKeyContact'] = contact.idContact |
735 |
address = self.call('ContactAdresses', 'Creation', ContactAdresses=post_data)
|
|
735 |
self.call('ContactAdresses', 'Creation', ContactAdresses=post_data) |
|
736 | 736 |
return {'data': serialize_object(contact)} |
737 | 737 | |
738 | 738 |
@endpoint( |
passerelle/apps/atos_genesys/models.py | ||
---|---|---|
519 | 519 |
}, |
520 | 520 |
) |
521 | 521 |
def link_by_id_per(self, request, NameID, id_per): |
522 |
dossier = self.call_select_usager(id_per)
|
|
522 |
self.call_select_usager(id_per) |
|
523 | 523 |
link, created = Link.objects.get_or_create(resource=self, name_id=NameID, id_per=id_per) |
524 | 524 |
return {'link_id': link.pk, 'new': created} |
525 | 525 |
passerelle/apps/base_adresse/models.py | ||
---|---|---|
447 | 447 |
department |
448 | 448 |
) |
449 | 449 |
) |
450 |
except RequestException as e:
|
|
450 |
except RequestException: |
|
451 | 451 |
continue |
452 | 452 |
if ban_gz.status_code != 200: |
453 | 453 |
continue |
passerelle/apps/cartads_cs/migrations/0008_auto_20190923_1712.py | ||
---|---|---|
10 | 10 |
CartaDSSubscriber = apps.get_model('cartads_cs', 'CartaDSSubscriber') |
11 | 11 |
for instance in CartaDSDossier.objects.all(): |
12 | 12 |
if instance.name_id: |
13 |
subscriber, created = CartaDSSubscriber.objects.get_or_create(name_id=instance.name_id)
|
|
13 |
subscriber, _ = CartaDSSubscriber.objects.get_or_create(name_id=instance.name_id)
|
|
14 | 14 |
instance.subscribers.add(subscriber) |
15 | 15 | |
16 | 16 |
passerelle/apps/cartads_cs/models.py | ||
---|---|---|
198 | 198 | |
199 | 199 |
# communes |
200 | 200 |
resp = client.service.GetCommunes(self.get_token(), {}) |
201 |
communes_cache, created = CartaDSDataCache.objects.get_or_create(data_type='communes')
|
|
201 |
communes_cache, dummy = CartaDSDataCache.objects.get_or_create(data_type='communes')
|
|
202 | 202 |
communes_cache.data_values = {'data': [{'id': str(x['Key']), 'text': x['Value']} for x in resp]} |
203 | 203 |
communes_cache.save() |
204 | 204 | |
... | ... | |
208 | 208 |
resp = client.service.GetTypesDossier(self.get_token(), int(commune['id']), {}) |
209 | 209 |
if resp is None: |
210 | 210 |
continue |
211 |
data_cache, created = CartaDSDataCache.objects.get_or_create(
|
|
211 |
data_cache, dummy = CartaDSDataCache.objects.get_or_create(
|
|
212 | 212 |
data_type='types_dossier', data_parameters={'commune_id': int(commune['id'])} |
213 | 213 |
) |
214 | 214 |
data_cache.data_values = {'data': [{'id': str(x['Key']), 'text': x['Value']} for x in resp]} |
... | ... | |
222 | 222 |
resp = client.service.GetObjetsDemande(self.get_token(), type_dossier_id) |
223 | 223 |
if resp is None: |
224 | 224 |
continue |
225 |
data_cache, created = CartaDSDataCache.objects.get_or_create(
|
|
225 |
data_cache, dummy = CartaDSDataCache.objects.get_or_create(
|
|
226 | 226 |
data_type='objets_demande', data_parameters={'type_dossier_id': type_dossier_id} |
227 | 227 |
) |
228 | 228 |
data_cache.data_values = {'data': [{'id': str(x['Key']), 'text': x['Value']} for x in resp]} |
... | ... | |
252 | 252 |
pass |
253 | 253 |
return u'%(Nom)s' % x |
254 | 254 | |
255 |
data_cache, created = CartaDSDataCache.objects.get_or_create(
|
|
255 |
data_cache, dummy = CartaDSDataCache.objects.get_or_create(
|
|
256 | 256 |
data_type='liste_pdf', |
257 | 257 |
data_parameters={ |
258 | 258 |
'type_dossier_id': type_dossier_id, |
... | ... | |
284 | 284 |
resp = client.service.GetPieces(self.get_token(), type_dossier_id, objet_demande_id) |
285 | 285 |
if resp is None: |
286 | 286 |
continue |
287 |
data_cache, created = CartaDSDataCache.objects.get_or_create(
|
|
287 |
data_cache, dummy = CartaDSDataCache.objects.get_or_create(
|
|
288 | 288 |
data_type='pieces', |
289 | 289 |
data_parameters={ |
290 | 290 |
'type_dossier_id': type_dossier_id, |
... | ... | |
459 | 459 |
}, |
460 | 460 |
) |
461 | 461 |
def pieces(self, request, type_dossier_id, objet_demande_id, tracking_code, demolitions=True): |
462 |
cache, created = CartaDSDataCache.objects.get_or_create(
|
|
462 |
cache, dummy = CartaDSDataCache.objects.get_or_create(
|
|
463 | 463 |
data_type='pieces', |
464 | 464 |
data_parameters={ |
465 | 465 |
'type_dossier_id': type_dossier_id, |
... | ... | |
740 | 740 |
# this cannot be verb DELETE as we have no way to set |
741 | 741 |
# Access-Control-Allow-Methods |
742 | 742 |
signer = Signer(salt='cart@ds_cs') |
743 |
tracking_code = signer.unsign(token) |
|
744 | 743 |
CartaDSFile.objects.filter(id=signer.unsign(file_upload)).delete() |
745 | 744 |
return {'err': 0} |
746 | 745 | |
... | ... | |
1058 | 1057 |
piece.save() |
1059 | 1058 | |
1060 | 1059 |
def get_file_status(self, dossier): |
1061 |
extra = None |
|
1062 | 1060 |
response = {} |
1063 | 1061 |
if dossier.deleted: |
1064 | 1062 |
status_id = 'deleted' |
passerelle/apps/choosit/models.py | ||
---|---|---|
86 | 86 |
else: |
87 | 87 |
try: |
88 | 88 |
output = r.json() |
89 |
except ValueError as e:
|
|
89 |
except ValueError: |
|
90 | 90 |
results.append('Choosit error: bad JSON response') |
91 | 91 |
else: |
92 | 92 |
if not isinstance(output, dict): |
passerelle/apps/csvdatasource/models.py | ||
---|---|---|
182 | 182 |
return result |
183 | 183 | |
184 | 184 |
def cache_data(self): |
185 |
# FIXME: why are those dead variables computed ? |
|
186 |
titles = [t.strip() for t in self.columns_keynames.split(',')] |
|
187 |
indexes = [titles.index(t) for t in titles if t] |
|
188 |
caption = [titles[i] for i in indexes] |
|
189 | 185 |
with transaction.atomic(): |
190 | 186 |
TableRow.objects.filter(resource=self).delete() |
191 | 187 |
for block in batch(enumerate(self.get_rows()), 5000): |
passerelle/apps/esirius/models.py | ||
---|---|---|
211 | 211 |
post_data['user']['address'] = {} |
212 | 212 | |
213 | 213 |
post_data['codeRDV'] = id |
214 |
response = self.request('appointments', method='PUT', json=post_data)
|
|
214 |
self.request('appointments', method='PUT', json=post_data) |
|
215 | 215 |
return {'data': {'id': id, 'updated': True}} |
216 | 216 | |
217 | 217 |
@endpoint( |
... | ... | |
245 | 245 |
}, |
246 | 246 |
) |
247 | 247 |
def delete_appointment(self, request, id): |
248 |
response = self.request('appointments/%s/' % id, method='DELETE')
|
|
248 |
self.request('appointments/%s/' % id, method='DELETE') |
|
249 | 249 |
return {'data': {'id': id, 'deleted': True}} |
passerelle/apps/family/loaders/concerto_fondettes.py | ||
---|---|---|
74 | 74 |
invoice['online_payment'] = False |
75 | 75 |
invoice['no_online_payment_reason'] = 'autobilling' |
76 | 76 | |
77 |
obj, created = Invoice.objects.update_or_create(
|
|
77 |
obj, dummy = Invoice.objects.update_or_create(
|
|
78 | 78 |
resource=self.connector, external_id=row['ID_FAC'], defaults=invoice |
79 | 79 |
) |
80 | 80 |
invoice_filename = '%s_%s.pdf' % ( |
passerelle/apps/family/loaders/concerto_orleans.py | ||
---|---|---|
177 | 177 |
'city', |
178 | 178 |
), |
179 | 179 |
) |
180 |
family, created = Family.objects.update_or_create(
|
|
180 |
family, dummy = Family.objects.update_or_create(
|
|
181 | 181 |
external_id=family_data['external_id'], resource=self.connector, defaults=data |
182 | 182 |
) |
183 | 183 | |
... | ... | |
198 | 198 |
invoice_path = os.path.join(invoices_dir, invoice_filename) |
199 | 199 |
# create invoice object only if associated pdf exists |
200 | 200 |
if os.path.exists(invoice_path): |
201 |
invoice, created = Invoice.objects.update_or_create(
|
|
201 |
invoice, dummy = Invoice.objects.update_or_create(
|
|
202 | 202 |
resource=self.connector, |
203 | 203 |
family=family, |
204 | 204 |
external_id=invoice_data['external_id'], |
passerelle/apps/family/loaders/egee_thonon.py | ||
---|---|---|
49 | 49 |
'no_online_payment_reason': None, |
50 | 50 |
'label': external_id, |
51 | 51 |
} |
52 |
obj, created = Invoice.objects.update_or_create(
|
|
52 |
Invoice.objects.update_or_create( |
|
53 | 53 |
resource=self.connector, external_id=external_id, defaults=invoice |
54 | 54 |
) |
55 | 55 |
external_ids.append(external_id) |
passerelle/apps/family/loaders/opus_fondettes.py | ||
---|---|---|
75 | 75 |
invoice['online_payment'] = False |
76 | 76 |
invoice['no_online_payment_reason'] = 'autobilling' |
77 | 77 | |
78 |
obj, created = Invoice.objects.update_or_create(
|
|
78 |
obj, dummy = Invoice.objects.update_or_create(
|
|
79 | 79 |
resource=self.connector, external_id=row['ID_FAC'], defaults=invoice |
80 | 80 |
) |
81 | 81 |
invoice_filename = '%s_%s.pdf' % ( |
passerelle/apps/family/models.py | ||
---|---|---|
228 | 228 |
('complement', 'address_complement'), |
229 | 229 |
), |
230 | 230 |
) |
231 |
family, created = Family.objects.update_or_create(
|
|
231 |
family, dummy = Family.objects.update_or_create(
|
|
232 | 232 |
external_id=family_data['id'], resource=self, defaults=data |
233 | 233 |
) |
234 | 234 |
for adult in family_data.get('adults') or []: |
passerelle/apps/franceconnect_data/fc.py | ||
---|---|---|
103 | 103 |
self.add('fc_token_endpoint_response', response_content) |
104 | 104 |
self.add('fc_access_token', response_content['access_token']) |
105 | 105 |
self.add('fc_id_token', response_content['id_token']) |
106 |
header, payload, signature = self.fc_id_token.split('.')
|
|
106 |
dummy, payload, dummy = self.fc_id_token.split('.')
|
|
107 | 107 |
self.add('fc_id_token_payload', json.loads(base64url_decode(payload.encode()))) |
108 | 108 |
except Exception as e: |
109 | 109 |
raise FranceConnectError('Error in token endpoint response', sub_exception=repr(e)) |
passerelle/apps/gesbac/models.py | ||
---|---|---|
276 | 276 |
with transaction.atomic(): |
277 | 277 |
form = Form.objects.create(resource=self, form_id=form_id, counter=counter) |
278 | 278 |
break |
279 |
except IntegrityError as e:
|
|
279 |
except IntegrityError: |
|
280 | 280 |
continue |
281 | 281 |
else: |
282 | 282 |
raise APIError('fail: more than 20 demands') |
passerelle/apps/mdel_ddpacs/utils.py | ||
---|---|---|
66 | 66 |
"""Zip directory""" |
67 | 67 |
archname = path + '.zip' |
68 | 68 |
with zipfile.ZipFile(archname, 'w', zipfile.ZIP_DEFLATED) as zipf: |
69 |
for root, dirs, files in os.walk(path):
|
|
69 |
for root, dummy, files in os.walk(path):
|
|
70 | 70 |
for f in files: |
71 | 71 |
fpath = os.path.join(root, f) |
72 | 72 |
zipf.write(fpath, os.path.basename(fpath)) |
passerelle/apps/orange/models.py | ||
---|---|---|
107 | 107 | |
108 | 108 |
access_token = self.get_access_token() |
109 | 109 |
group_id = self.group_id_from_name(access_token) |
110 |
response = self.diffusion(access_token, group_id, destinations, text, sender)
|
|
110 |
self.diffusion(access_token, group_id, destinations, text, sender) |
|
111 | 111 |
return None # credit consumed is unknown |
passerelle/apps/ovh/models.py | ||
---|---|---|
161 | 161 |
else: |
162 | 162 |
try: |
163 | 163 |
result = response.json() |
164 |
except ValueError as e:
|
|
164 |
except ValueError: |
|
165 | 165 |
raise APIError('OVH error: bad JSON response') |
166 | 166 |
try: |
167 | 167 |
response.raise_for_status() |
... | ... | |
257 | 257 |
else: |
258 | 258 |
try: |
259 | 259 |
result = response.json() |
260 |
except ValueError as e:
|
|
260 |
except ValueError: |
|
261 | 261 |
raise APIError('OVH error: bad JSON response') |
262 | 262 |
else: |
263 | 263 |
if not isinstance(result, dict): |
passerelle/apps/ovh/views.py | ||
---|---|---|
33 | 33 |
return connector.get_absolute_url() |
34 | 34 |
try: |
35 | 35 |
result = resp.json() |
36 |
except ValueError as e:
|
|
36 |
except ValueError: |
|
37 | 37 |
messages.error(self.request, _('There has been an error requesting token: bad JSON response.')) |
38 | 38 |
return connector.get_absolute_url() |
39 | 39 |
try: |
40 | 40 |
resp.raise_for_status() |
41 |
except RequestException as e:
|
|
41 |
except RequestException: |
|
42 | 42 |
error_text = result.get('message', result) |
43 | 43 |
messages.error(self.request, _('There has been an error requesting token: %s.') % error_text) |
44 | 44 |
return connector.get_absolute_url() |
passerelle/apps/plone_restapi/models.py | ||
---|---|---|
150 | 150 |
return token |
151 | 151 | |
152 | 152 |
def request(self, path='', method='GET', params=None, json=None): |
153 |
scheme, netloc, base_path, query, fragment = urlsplit(self.service_url)
|
|
153 |
scheme, netloc, base_path, dummy, fragment = urlsplit(self.service_url)
|
|
154 | 154 |
url = urlunsplit((scheme, netloc, base_path + '/%s' % path, '', fragment)) |
155 | 155 |
headers = {'Accept': 'application/json'} |
156 | 156 |
kwargs = {'method': method, 'url': url, 'headers': headers, 'params': params, 'json': json} |
... | ... | |
164 | 164 |
if response.status_code != 204: # No Content |
165 | 165 |
try: |
166 | 166 |
json_response = response.json() |
167 |
except ValueError as e:
|
|
167 |
except ValueError: |
|
168 | 168 |
raise APIError('PloneRestApi: bad JSON response') |
169 | 169 |
try: |
170 | 170 |
response.raise_for_status() |
passerelle/apps/solis/models.py | ||
---|---|---|
289 | 289 |
token = self.apa_token(user_id, code) # invalid credentials raise APIError here |
290 | 290 |
information = self.apa_get_information(information='exportDonneesIndividu', token=token) |
291 | 291 |
text = get_template(self.text_template_name).render(information).strip() |
292 |
link, created = SolisAPALink.objects.update_or_create(
|
|
292 |
dummy, created = SolisAPALink.objects.update_or_create(
|
|
293 | 293 |
resource=self, name_id=name_id, user_id=user_id, defaults={'code': code, 'text': text} |
294 | 294 |
) |
295 | 295 |
return {'data': {'user_id': user_id, 'created': created, 'updated': not created}} |
... | ... | |
580 | 580 |
self.rsa_token(user_id, code, dob) # invalid credentials raise APIError here |
581 | 581 |
information = self.rsa_get_information('individu', user_id, code, dob) |
582 | 582 |
text = get_template(self.text_template_name_rsa).render(information).strip() |
583 |
link, created = SolisRSALink.objects.update_or_create(
|
|
583 |
dummy, created = SolisRSALink.objects.update_or_create(
|
|
584 | 584 |
resource=self, name_id=name_id, user_id=user_id, defaults={'code': code, 'dob': dob, 'text': text} |
585 | 585 |
) |
586 | 586 |
return {'data': {'user_id': user_id, 'text': text, 'created': created, 'updated': not created}} |
passerelle/apps/sp_fr/models.py | ||
---|---|---|
562 | 562 |
def variables(self): |
563 | 563 |
yield 'insee_code' |
564 | 564 |
yield 'email' |
565 |
for path, xsd_type in self.xsd.paths():
|
|
565 |
for path, dummy in self.xsd.paths():
|
|
566 | 566 |
names = [simplify(tag.localname) for tag in path] |
567 | 567 |
yield '_'.join(names) |
568 | 568 |
if hasattr(self, 'variables_%s' % self.procedure): |
passerelle/apps/vivaticket/models.py | ||
---|---|---|
252 | 252 |
else: |
253 | 253 |
internal_code = response.json()['InternalCode'] |
254 | 254 |
# update contact data |
255 |
contact_data = response.json() |
|
256 | 255 |
url = urlparse.urljoin(self.url, 'Contact/Put') |
257 | 256 |
response = self.requests.put( |
258 | 257 |
url, |
passerelle/base/migrations/0008_auto_20181118_0717.py | ||
---|---|---|
13 | 13 |
continue |
14 | 14 |
content_type = ContentType.objects.get_for_model(model) |
15 | 15 |
for instance in model.objects.all(): |
16 |
parameters, created = LoggingParameters.objects.get_or_create(
|
|
16 |
parameters, _ = LoggingParameters.objects.get_or_create(
|
|
17 | 17 |
resource_type=content_type, resource_pk=instance.id |
18 | 18 |
) |
19 | 19 |
parameters.log_level = instance.log_level |
passerelle/base/models.py | ||
---|---|---|
279 | 279 | |
280 | 280 |
def get_endpoints_infos(self): |
281 | 281 |
endpoints = [] |
282 |
for name, method in inspect.getmembers(self, predicate=inspect.ismethod):
|
|
282 |
for dummy, method in inspect.getmembers(self, predicate=inspect.ismethod):
|
|
283 | 283 |
if hasattr(method, 'endpoint_info'): |
284 | 284 |
method.endpoint_info.object = self |
285 | 285 |
endpoint_name = method.endpoint_info.name |
... | ... | |
397 | 397 |
app_label, model_name = d['resource_type'].split('.') |
398 | 398 |
try: |
399 | 399 |
model = apps.get_model(app_label, model_name) |
400 |
except LookupError as e:
|
|
400 |
except LookupError: |
|
401 | 401 |
raise BaseResource.UnknownBaseResourceError(app_label) |
402 | 402 |
try: |
403 | 403 |
instance = model.objects.get(slug=d['slug']) |
... | ... | |
632 | 632 |
def handle_job_error(self, job, exc_info): |
633 | 633 |
from passerelle.utils.conversion import exception_to_text |
634 | 634 | |
635 |
(exc_type, exc_value, tb) = exc_info
|
|
635 |
(exc_type, exc_value, dummy) = exc_info
|
|
636 | 636 |
job.status = 'failed' |
637 | 637 |
job.done_timestamp = timezone.now() |
638 | 638 |
job.status_details = { |
... | ... | |
1005 | 1005 |
attr['sourceip'] = sourceip |
1006 | 1006 | |
1007 | 1007 |
if kwargs.get('exc_info'): |
1008 |
(exc_type, exc_value, tb) = sys.exc_info()
|
|
1008 |
(exc_type, exc_value, dummy) = sys.exc_info()
|
|
1009 | 1009 |
attr['extra']['error_summary'] = traceback.format_exception_only(exc_type, exc_value) |
1010 | 1010 | |
1011 | 1011 |
ResourceLog.objects.create(**attr) |
passerelle/contrib/adict/models.py | ||
---|---|---|
39 | 39 |
}, |
40 | 40 |
) |
41 | 41 |
def feature_info(self, request, lat, lon): |
42 |
params = query_args = {'x': lon, 'y': lat, 'srid': '4326'}
|
|
42 |
query_args = {'x': lon, 'y': lat, 'srid': '4326'} |
|
43 | 43 |
query_args['token'] = self.api_token |
44 | 44 |
query_args['sector_type'] = self.sector_type |
45 | 45 |
response = self.requests.get( |
passerelle/contrib/grandlyon_streetsections/models.py | ||
---|---|---|
126 | 126 |
for value in json_loads(sections).get('values'): |
127 | 127 |
if not value.get('codefuv') or not value.get('codetroncon'): |
128 | 128 |
continue |
129 |
section, created = StreetSection.objects.get_or_create(
|
|
129 |
section, dummy = StreetSection.objects.get_or_create(
|
|
130 | 130 |
codefuv=value.get('codefuv'), codetroncon=value.get('codetroncon') |
131 | 131 |
) |
132 | 132 |
for attribute in ('nom', 'nomcommune', 'domanialite', 'codeinsee'): |
passerelle/contrib/greco/models.py | ||
---|---|---|
158 | 158 |
message.attach(part) |
159 | 159 | |
160 | 160 |
message._write_headers = lambda x: None |
161 |
msg_x = message.as_string(unixfrom=False)
|
|
161 |
message.as_string(unixfrom=False) |
|
162 | 162 |
# RFC 2045 defines MIME multipart boundaries: |
163 | 163 |
# * boundary := 0*69<bchars> bcharsnospace |
164 | 164 |
# * dash-boundary := "--" boundary |
passerelle/contrib/solis_apa/integration.py | ||
---|---|---|
57 | 57 |
message['InformationsBancaires'] = get_info_bancaire(fields, wf) |
58 | 58 |
message['Patrimoine'] = get_patrimoine(fields, wf) |
59 | 59 | |
60 |
etablissement, etablissement_date_entree = get_etablissement(fields, wf)
|
|
60 |
etablissement, dummy = get_etablissement(fields, wf)
|
|
61 | 61 | |
62 | 62 |
demande_apa = { |
63 | 63 |
'statutDemande': statut_demande, |
passerelle/contrib/solis_apa/models.py | ||
---|---|---|
61 | 61 |
try: |
62 | 62 |
ret = response.json() |
63 | 63 |
return ret |
64 |
except (ValueError) as e:
|
|
64 |
except (ValueError): |
|
65 | 65 |
raise APIError('Response content is not a valid JSON') |
66 | 66 | |
67 | 67 |
def get_resource_url(self, uri): |
... | ... | |
197 | 197 |
def get_lieux(self, q, commune, departement): |
198 | 198 |
# si commune est un code solis de la forme commune-dep-com |
199 | 199 |
if commune and commune.startswith('commune-'): |
200 |
x, departement, commune = commune.split('-')
|
|
200 |
dummy, departement, commune = commune.split('-')
|
|
201 | 201 |
call = self._conciliation( |
202 | 202 |
conciliation.CONCILIATION_ADRESSE, commune=commune, departement=departement, lieu='%%%s%%' % q |
203 | 203 |
) |
passerelle/contrib/tcl/models.py | ||
---|---|---|
131 | 131 |
response = self.requests.get(url) |
132 | 132 |
response.raise_for_status() |
133 | 133 |
for line_data in response.json()['values']: |
134 |
line, created = Line.objects.get_or_create(
|
|
134 |
line, dummy = Line.objects.get_or_create(
|
|
135 | 135 |
code_titan=line_data['code_titan'], |
136 | 136 |
defaults={'transport_key': key, 'ligne': line_data['ligne']}, |
137 | 137 |
) |
... | ... | |
145 | 145 |
for feature in response.json()['features']: |
146 | 146 |
arret_data = feature['properties'] |
147 | 147 |
arret_data['id_data'] = arret_data.pop('id') |
148 |
stop, created = Stop.objects.get_or_create(id_data=arret_data['id_data'])
|
|
148 |
stop, dummy = Stop.objects.get_or_create(id_data=arret_data['id_data'])
|
|
149 | 149 |
stop.__dict__.update(arret_data) |
150 | 150 |
stop.pmr = bool(stop.pmr == 't') |
151 | 151 |
stop.escalator = bool(stop.escalator == 't') |
passerelle/contrib/toulouse_axel/models.py | ||
---|---|---|
78 | 78 |
def lock(self, request, key, locker): |
79 | 79 |
if not key: |
80 | 80 |
raise APIError('key is empty', err_code='bad-request', http_status=400) |
81 |
lock, created = Lock.objects.get_or_create(resource=self, key=key, defaults={'locker': locker})
|
|
81 |
lock, dummy = Lock.objects.get_or_create(resource=self, key=key, defaults={'locker': locker})
|
|
82 | 82 |
return {'key': key, 'locked': True, 'locker': lock.locker, 'lock_date': lock.lock_date} |
83 | 83 | |
84 | 84 |
@endpoint( |
passerelle/contrib/toulouse_smart/models.py | ||
---|---|---|
475 | 475 |
files = {'media': (self.filename, self.content.open('rb'), self.content_type)} |
476 | 476 |
try: |
477 | 477 |
instance.request(url, method='PUT', files=files) |
478 |
except APIError as e:
|
|
478 |
except APIError: |
|
479 | 479 |
return False |
480 | 480 |
self.content.delete() |
481 | 481 |
return True |
... | ... | |
491 | 491 |
result = JSONField(null=True) |
492 | 492 | |
493 | 493 |
def get_wcs_api(self, base_url): |
494 |
scheme, netloc, path, params, query, fragment = urlparse.urlparse(base_url)
|
|
494 |
scheme, netloc, dummy, dummy, dummy, dummy = urlparse.urlparse(base_url)
|
|
495 | 495 |
services = settings.KNOWN_SERVICES.get('wcs', {}) |
496 | 496 |
service = None |
497 | 497 |
for service in services.values(): |
498 | 498 |
remote_url = service.get('url') |
499 |
r_scheme, r_netloc, r_path, r_params, r_query, r_fragment = urlparse.urlparse(remote_url)
|
|
499 |
r_scheme, r_netloc, dummy, dummy, dummy, dummy = urlparse.urlparse(remote_url)
|
|
500 | 500 |
if r_scheme == scheme and r_netloc == netloc: |
501 | 501 |
break |
502 | 502 |
else: |
passerelle/soap.py | ||
---|---|---|
5 | 5 |
def client_to_jsondict(client): |
6 | 6 |
"""return description of the client, as dict (for json export)""" |
7 | 7 |
res = {} |
8 |
for i, sd in enumerate(client.sd):
|
|
8 |
for sd in client.sd:
|
|
9 | 9 |
d = {} |
10 | 10 |
d['tns'] = sd.wsdl.tns[1] |
11 | 11 |
d['prefixes'] = dict(p for p in sd.prefixes) |
passerelle/views.py | ||
---|---|---|
427 | 427 |
self.init_stuff(request, *args, **kwargs) |
428 | 428 |
self.connector = self.get_object() |
429 | 429 |
self.endpoint = None |
430 |
for name, method in inspect.getmembers(self.connector, inspect.ismethod):
|
|
430 |
for dummy, method in inspect.getmembers(self.connector, inspect.ismethod):
|
|
431 | 431 |
if not hasattr(method, 'endpoint_info'): |
432 | 432 |
continue |
433 | 433 |
if not method.endpoint_info.name == kwargs.get('endpoint'): |
tests/test_airquality.py | ||
---|---|---|
79 | 79 | |
80 | 80 |
def test_airquality_details_unknown_city(app, airquality): |
81 | 81 |
endpoint = tests.utils.generic_endpoint_url('airquality', 'details', slug=airquality.slug) |
82 |
resp = app.get(endpoint + '/fr/paris/', status=404) |
|
82 |
app.get(endpoint + '/fr/paris/', status=404) |
tests/test_api_entreprise.py | ||
---|---|---|
478 | 478 |
assert len(data) == 3 |
479 | 479 |
document = data[0] |
480 | 480 |
assert 'url' in document |
481 |
resp = app.get( |
|
482 |
document['url'], params={'context': 'MSP', 'object': 'demand', 'recipient': 'siret'}, status=200 |
|
483 |
) |
|
481 |
app.get(document['url'], params={'context': 'MSP', 'object': 'demand', 'recipient': 'siret'}, status=200) |
|
484 | 482 |
# try to get document with wrong signature |
485 |
url = document['url'] |
|
486 | 483 |
wrong_url = document['url'] + "wrong/" |
487 |
resp = app.get(wrong_url, status=404)
|
|
484 |
app.get(wrong_url, status=404) |
|
488 | 485 | |
489 | 486 |
# try expired url |
490 | 487 |
freezer.move_to(timezone.now() + timezone.timedelta(days=8)) |
491 |
resp = app.get(document['url'], status=404)
|
|
488 |
app.get(document['url'], status=404) |
|
492 | 489 | |
493 | 490 | |
494 | 491 |
def test_effectifs_annuels_acoss_covid(app, resource, mock_api_entreprise, freezer): |
tests/test_api_particulier.py | ||
---|---|---|
375 | 375 |
) |
376 | 376 |
def test_api_particulier_dont_log_not_found(app, resource, mock, should_log): |
377 | 377 |
with HTTMock(mock): |
378 |
resp = endpoint_get(
|
|
378 |
endpoint_get( |
|
379 | 379 |
'/api-particulier/test/avis-imposition', |
380 | 380 |
app, |
381 | 381 |
resource, |
tests/test_arpege_ecp.py | ||
---|---|---|
81 | 81 |
del hello_response['Data'] |
82 | 82 |
mocked_get.return_value = tests.utils.FakedResponse(content=json.dumps(hello_response), status_code=200) |
83 | 83 |
with pytest.raises(Exception) as error: |
84 |
resp = connector.check_status()
|
|
84 |
connector.check_status() |
|
85 | 85 |
assert str(error.value) == 'Invalid credentials' |
86 | 86 | |
87 | 87 | |
... | ... | |
131 | 131 |
mocked_post.return_value = tests.utils.FakedResponse(content=FAKE_LOGIN_OIDC_RESPONSE, status_code=200) |
132 | 132 |
mocked_get.return_value = tests.utils.FakedResponse(content=FAKE_USER_DEMANDS_RESPONSE, status_code=200) |
133 | 133 |
resp = app.get(endpoint) |
134 |
result = resp.json |
|
135 | 134 |
assert resp.json['data'] |
136 | 135 |
for item in resp.json['data']: |
137 | 136 |
assert item['status'] == 'Deposee' |
tests/test_base_adresse.py | ||
---|---|---|
230 | 230 | |
231 | 231 |
@mock.patch('passerelle.utils.Request.get') |
232 | 232 |
def test_base_adresse_search_qs_lat_lon(mocked_get, app, base_adresse): |
233 |
resp = app.get('/base-adresse/%s/search?q=plop&lat=0&lon=1' % base_adresse.slug)
|
|
233 |
app.get('/base-adresse/%s/search?q=plop&lat=0&lon=1' % base_adresse.slug) |
|
234 | 234 |
assert 'lat=0' in mocked_get.call_args[0][0] |
235 | 235 |
assert 'lon=1' in mocked_get.call_args[0][0] |
236 | 236 | |
... | ... | |
590 | 590 |
def test_base_adresse_cities_region_department(app, base_adresse, miquelon, city): |
591 | 591 |
reg = RegionModel.objects.create(name=u'IdF', code='11', resource=base_adresse) |
592 | 592 |
dep = DepartmentModel.objects.create(name=u'Paris', code='75', region=reg, resource=base_adresse) |
593 |
paris = CityModel.objects.create(
|
|
593 |
CityModel.objects.create( |
|
594 | 594 |
name=u'Paris', |
595 | 595 |
code='75056', |
596 | 596 |
zipcode='75014', |
... | ... | |
670 | 670 | |
671 | 671 |
def test_base_adresse_departments_region(app, base_adresse, department): |
672 | 672 |
reg = RegionModel.objects.create(name=u'IdF', code='11', resource=base_adresse) |
673 |
paris = DepartmentModel.objects.create(name=u'Paris', code='75', region=reg, resource=base_adresse)
|
|
673 |
DepartmentModel.objects.create(name=u'Paris', code='75', region=reg, resource=base_adresse) |
|
674 | 674 | |
675 | 675 |
resp = app.get('/base-adresse/%s/departments?region_code=84' % base_adresse.slug) |
676 | 676 |
result = resp.json['data'] |
... | ... | |
820 | 820 |
@pytest.mark.usefixtures('mock_update_api_geo') |
821 | 821 |
@mock.patch('passerelle.utils.Request.get', side_effect=ConnectionError) |
822 | 822 |
def test_base_adresse_command_update_street_timeout(mocked_get, db, base_adresse): |
823 |
resp = call_command('cron', 'daily')
|
|
823 |
call_command('cron', 'daily') |
|
824 | 824 |
assert mocked_get.call_count == 1 |
825 | 825 |
assert not RegionModel.objects.exists() |
826 | 826 | |
... | ... | |
874 | 874 | |
875 | 875 |
@mock.patch('passerelle.utils.Request.get') |
876 | 876 |
def test_base_adresse_addresses_qs_coordinates(mocked_get, app, base_adresse_coordinates): |
877 |
resp = app.get('/base-adresse/%s/addresses?q=plop' % base_adresse_coordinates.slug)
|
|
877 |
app.get('/base-adresse/%s/addresses?q=plop' % base_adresse_coordinates.slug) |
|
878 | 878 |
assert 'lat=%s' % base_adresse_coordinates.latitude in mocked_get.call_args[0][0] |
879 | 879 |
assert 'lon=%s' % base_adresse_coordinates.longitude in mocked_get.call_args[0][0] |
880 | 880 | |
881 |
resp = app.get('/base-adresse/%s/addresses?q=plop&lat=42&lon=43' % base_adresse_coordinates.slug)
|
|
881 |
app.get('/base-adresse/%s/addresses?q=plop&lat=42&lon=43' % base_adresse_coordinates.slug) |
|
882 | 882 |
assert 'lat=42' in mocked_get.call_args[0][0] |
883 | 883 |
assert 'lon=43' in mocked_get.call_args[0][0] |
884 | 884 | |
... | ... | |
926 | 926 | |
927 | 927 |
@pytest.mark.usefixtures('mock_update_api_geo', 'mock_update_streets') |
928 | 928 |
def test_base_adresse_addresses_clean_cache(app, base_adresse, freezer, mock_api_adresse_data_gouv_fr_search): |
929 |
resp = app.get('/base-adresse/%s/addresses?q=plop' % base_adresse.slug)
|
|
929 |
app.get('/base-adresse/%s/addresses?q=plop' % base_adresse.slug) |
|
930 | 930 |
assert AddressCacheModel.objects.count() == 1 |
931 | 931 | |
932 | 932 |
freezer.move_to(datetime.timedelta(minutes=30)) |
... | ... | |
937 | 937 |
call_command('cron', 'hourly') |
938 | 938 |
assert AddressCacheModel.objects.count() == 0 |
939 | 939 | |
940 |
resp = app.get('/base-adresse/%s/addresses?q=plop' % base_adresse.slug)
|
|
940 |
app.get('/base-adresse/%s/addresses?q=plop' % base_adresse.slug) |
|
941 | 941 |
assert AddressCacheModel.objects.count() == 1 |
942 | 942 | |
943 | 943 |
# asking for the address again resets the timestamp |
944 | 944 |
freezer.move_to(datetime.timedelta(hours=1, seconds=1)) |
945 |
resp = app.get('/base-adresse/%s/addresses?q=plop' % base_adresse.slug)
|
|
945 |
app.get('/base-adresse/%s/addresses?q=plop' % base_adresse.slug) |
|
946 | 946 |
call_command('cron', 'hourly') |
947 | 947 |
assert AddressCacheModel.objects.count() == 1 |
948 | 948 | |
949 | 949 |
freezer.move_to(datetime.timedelta(hours=1, seconds=1)) |
950 |
resp = app.get(
|
|
950 |
app.get( |
|
951 | 951 |
'/base-adresse/%s/addresses?id=%s' |
952 | 952 |
% (base_adresse.slug, '49007_6950_be54bd~47.474633~-0.593775~Rue%20Roger%20Halope%2049000%20Angers') |
953 | 953 |
) |
tests/test_cartads_cs.py | ||
---|---|---|
369 | 369 |
# test_pack |
370 | 370 |
with mock.patch('passerelle.apps.cartads_cs.models.CartaDSCS.soap_client') as client: |
371 | 371 |
client.return_value = mock.Mock(service=FakeService()) |
372 |
with mock.patch('passerelle.apps.cartads_cs.models.FTP') as FTP:
|
|
372 |
with mock.patch('passerelle.apps.cartads_cs.models.FTP'): |
|
373 | 373 |
connector.jobs() |
374 | 374 |
assert Job.objects.filter(method_name='pack', status='completed').count() |
375 | 375 |
assert Job.objects.filter(method_name='send_to_cartads', status='completed').count() |
... | ... | |
416 | 416 |
# test_pack |
417 | 417 |
with mock.patch('passerelle.apps.cartads_cs.models.CartaDSCS.soap_client') as client: |
418 | 418 |
client.return_value = mock.Mock(service=FakeService()) |
419 |
with mock.patch('passerelle.apps.cartads_cs.models.FTP') as FTP:
|
|
419 |
with mock.patch('passerelle.apps.cartads_cs.models.FTP'): |
|
420 | 420 |
connector.jobs() |
421 | 421 |
assert Job.objects.filter(method_name='pack', status='completed').count() |
422 | 422 |
assert Job.objects.filter(method_name='send_to_cartads', status='completed').count() |
... | ... | |
619 | 619 |
def test_list_of_files(connector, app, cached_data): |
620 | 620 |
CartaDSDossier.objects.all().delete() |
621 | 621 |
test_send(connector, app, cached_data) |
622 |
dossier = CartaDSDossier.objects.all()[0] |
|
623 | 622 | |
624 | 623 |
with mock.patch('passerelle.apps.cartads_cs.models.CartaDSCS.soap_client') as client: |
625 | 624 |
client.return_value = mock.Mock(service=FakeService()) |
tests/test_cron.py | ||
---|---|---|
14 | 14 | |
15 | 15 | |
16 | 16 |
def test_cron_error(db, caplog): |
17 |
connector = BaseAdresse.objects.create(slug='base-adresse')
|
|
17 |
BaseAdresse.objects.create(slug='base-adresse') |
|
18 | 18 |
excep = Exception('hello') |
19 | 19 |
with mock.patch( |
20 | 20 |
'passerelle.apps.base_adresse.models.AddressResource.hourly', new=mock.Mock(side_effect=excep) |
tests/test_csv_datasource.py | ||
---|---|---|
152 | 152 | |
153 | 153 | |
154 | 154 |
def test_sheet_name_error(setup, app, filetype, admin_user): |
155 |
csvdata, url = setup('field,,another_field,', filename=filetype, data=get_file_content(filetype))
|
|
155 |
setup('field,,another_field,', filename=filetype, data=get_file_content(filetype)) |
|
156 | 156 |
app = login(app) |
157 | 157 |
resp = app.get('/manage/csvdatasource/test/edit') |
158 | 158 |
edit_form = resp.forms[0] |
... | ... | |
167 | 167 | |
168 | 168 | |
169 | 169 |
def test_unfiltered_data(client, setup, filetype): |
170 |
csvdata, url = setup('field,,another_field,', filename=filetype, data=get_file_content(filetype))
|
|
170 |
_, url = setup('field,,another_field,', filename=filetype, data=get_file_content(filetype))
|
|
171 | 171 |
resp = client.get(url) |
172 | 172 |
result = parse_response(resp) |
173 | 173 |
for item in result: |
... | ... | |
176 | 176 | |
177 | 177 | |
178 | 178 |
def test_empty_file(client, setup): |
179 |
csvdata, url = setup(
|
|
179 |
_, url = setup(
|
|
180 | 180 |
'field,,another_field,', filename='data-empty.ods', data=get_file_content('data-empty.ods') |
181 | 181 |
) |
182 | 182 |
resp = client.get(url) |
... | ... | |
185 | 185 | |
186 | 186 | |
187 | 187 |
def test_view_manage_page(setup, app, filetype, admin_user): |
188 |
csvdata, url = setup(',id,,text,', filename=filetype, data=get_file_content(filetype))
|
|
188 |
csvdata, _ = setup(',id,,text,', filename=filetype, data=get_file_content(filetype))
|
|
189 | 189 |
app = login(app) |
190 | 190 |
app.get(csvdata.get_absolute_url()) |
191 | 191 | |
192 | 192 | |
193 | 193 |
def test_good_filter_data(client, setup, filetype): |
194 | 194 |
filter_criteria = 'Zakia' |
195 |
csvdata, url = setup(',id,,text,', filename=filetype, data=get_file_content(filetype))
|
|
195 |
_, url = setup(',id,,text,', filename=filetype, data=get_file_content(filetype))
|
|
196 | 196 |
filters = {'text': filter_criteria} |
197 | 197 |
resp = client.get(url, filters) |
198 | 198 |
result = parse_response(resp) |
... | ... | |
205 | 205 | |
206 | 206 |
def test_bad_filter_data(client, setup, filetype): |
207 | 207 |
filter_criteria = 'bad' |
208 |
csvdata, url = setup(',id,,text,', filename=filetype, data=get_file_content(filetype))
|
|
208 |
_, url = setup(',id,,text,', filename=filetype, data=get_file_content(filetype))
|
|
209 | 209 |
filters = {'text': filter_criteria} |
210 | 210 |
resp = client.get(url, filters) |
211 | 211 |
result = parse_response(resp) |
... | ... | |
213 | 213 | |
214 | 214 | |
215 | 215 |
def test_useless_filter_data(client, setup, filetype): |
216 |
csvdata, url = setup('id,,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype))
|
|
216 |
_, url = setup('id,,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype))
|
|
217 | 217 |
filters = {'text': 'Ali'} |
218 | 218 |
resp = client.get(url, filters) |
219 | 219 |
result = parse_response(resp) |
... | ... | |
221 | 221 | |
222 | 222 | |
223 | 223 |
def test_columns_keynames_with_spaces(client, setup, filetype): |
224 |
csvdata, url = setup('id , , nom,text , ', filename=filetype, data=get_file_content(filetype))
|
|
224 |
_, url = setup('id , , nom,text , ', filename=filetype, data=get_file_content(filetype))
|
|
225 | 225 |
filters = {'text': 'Yaniss'} |
226 | 226 |
resp = client.get(url, filters) |
227 | 227 |
result = parse_response(resp) |
... | ... | |
229 | 229 | |
230 | 230 | |
231 | 231 |
def test_skipped_header_data(client, setup, filetype): |
232 |
csvdata, url = setup(',id,,text,', filename=filetype, data=get_file_content(filetype), skip_header=True)
|
|
232 |
_, url = setup(',id,,text,', filename=filetype, data=get_file_content(filetype), skip_header=True)
|
|
233 | 233 |
filters = {'q': 'Eliot'} |
234 | 234 |
resp = client.get(url, filters) |
235 | 235 |
result = parse_response(resp) |
... | ... | |
237 | 237 | |
238 | 238 | |
239 | 239 |
def test_data(client, setup, filetype): |
240 |
csvdata, url = setup('fam,id,, text,sexe ', filename=filetype, data=get_file_content(filetype))
|
|
240 |
_, url = setup('fam,id,, text,sexe ', filename=filetype, data=get_file_content(filetype))
|
|
241 | 241 |
filters = {'text': 'Sacha'} |
242 | 242 |
resp = client.get(url, filters) |
243 | 243 |
result = parse_response(resp) |
... | ... | |
246 | 246 | |
247 | 247 |
def test_unicode_filter_data(client, setup, filetype): |
248 | 248 |
filter_criteria = u'Benoît' |
249 |
csvdata, url = setup(',id,,text,', filename=filetype, data=get_file_content(filetype))
|
|
249 |
_, url = setup(',id,,text,', filename=filetype, data=get_file_content(filetype))
|
|
250 | 250 |
filters = {'text': filter_criteria} |
251 | 251 |
resp = client.get(url, filters) |
252 | 252 |
result = parse_response(resp) |
... | ... | |
258 | 258 | |
259 | 259 | |
260 | 260 |
def test_unicode_case_insensitive_filter_data(client, setup, filetype): |
261 |
csvdata, url = setup(',id,,text,', filename=filetype, data=get_file_content(filetype))
|
|
261 |
_, url = setup(',id,,text,', filename=filetype, data=get_file_content(filetype))
|
|
262 | 262 |
filter_criteria = u'anaëlle' |
263 | 263 |
filters = {'text': filter_criteria, 'case-insensitive': ''} |
264 | 264 |
resp = client.get(url, filters) |
... | ... | |
271 | 271 | |
272 | 272 | |
273 | 273 |
def test_data_bom(client, setup): |
274 |
csvdata, url = setup('fam,id,, text,sexe ', data=StringIO(data_bom))
|
|
274 |
_, url = setup('fam,id,, text,sexe ', data=StringIO(data_bom))
|
|
275 | 275 |
filters = {'text': 'Eliot'} |
276 | 276 |
resp = client.get(url, filters) |
277 | 277 |
result = parse_response(resp) |
... | ... | |
279 | 279 | |
280 | 280 | |
281 | 281 |
def test_multi_filter(client, setup, filetype): |
282 |
csvdata, url = setup('fam,id,, text,sexe ', filename=filetype, data=get_file_content(filetype))
|
|
282 |
_, url = setup('fam,id,, text,sexe ', filename=filetype, data=get_file_content(filetype))
|
|
283 | 283 |
filters = {'sexe': 'F'} |
284 | 284 |
resp = client.get(url, filters) |
285 | 285 |
result = parse_response(resp) |
... | ... | |
288 | 288 | |
289 | 289 | |
290 | 290 |
def test_query(client, setup, filetype): |
291 |
csvdata, url = setup('fam,id,, text,sexe ', filename=filetype, data=get_file_content(filetype))
|
|
291 |
_, url = setup('fam,id,, text,sexe ', filename=filetype, data=get_file_content(filetype))
|
|
292 | 292 |
filters = {'q': 'liot'} |
293 | 293 |
resp = client.get(url, filters) |
294 | 294 |
result = parse_response(resp) |
... | ... | |
297 | 297 | |
298 | 298 | |
299 | 299 |
def test_query_insensitive_and_unicode(client, setup, filetype): |
300 |
csvdata, url = setup('fam,id,, text,sexe ', filename=filetype, data=get_file_content(filetype))
|
|
300 |
_, url = setup('fam,id,, text,sexe ', filename=filetype, data=get_file_content(filetype))
|
|
301 | 301 |
filters = {'q': 'elIo', 'case-insensitive': ''} |
302 | 302 |
resp = client.get(url, filters) |
303 | 303 |
result = parse_response(resp) |
... | ... | |
311 | 311 | |
312 | 312 | |
313 | 313 |
def test_query_insensitive_and_filter(client, setup, filetype): |
314 |
csvdata, url = setup('fam,id,,text,sexe', filename=filetype, data=get_file_content(filetype))
|
|
314 |
_, url = setup('fam,id,,text,sexe', filename=filetype, data=get_file_content(filetype))
|
|
315 | 315 |
filters = {'q': 'elIo', 'sexe': 'H', 'case-insensitive': ''} |
316 | 316 |
resp = client.get(url, filters) |
317 | 317 |
result = parse_response(resp) |
... | ... | |
342 | 342 |
def test_on_the_fly_dialect_detection(client, setup): |
343 | 343 |
# fake a connector that was not initialized during .save(), because it's |
344 | 344 |
# been migrated and we didn't do dialect detection at save() time. |
345 |
csvdata, url = setup(data=StringIO(data))
|
|
345 |
_, url = setup(data=StringIO(data))
|
|
346 | 346 |
CsvDataSource.objects.all().update(_dialect_options=None) |
347 | 347 |
resp = client.get(url) |
348 | 348 |
result = json_loads(resp.content) |
... | ... | |
351 | 351 | |
352 | 352 | |
353 | 353 |
def test_missing_columns(client, setup): |
354 |
csvdata, url = setup(data=StringIO(data + 'A;B;C\n'))
|
|
354 |
_, url = setup(data=StringIO(data + 'A;B;C\n'))
|
|
355 | 355 |
resp = client.get(url) |
356 | 356 |
result = json_loads(resp.content) |
357 | 357 |
assert result['err'] == 0 |
... | ... | |
747 | 747 | |
748 | 748 | |
749 | 749 |
def test_download_file(app, setup, filetype, admin_user): |
750 |
csvdata, url = setup('field,,another_field,', filename=filetype, data=get_file_content(filetype))
|
|
750 |
setup('field,,another_field,', filename=filetype, data=get_file_content(filetype)) |
|
751 | 751 |
assert '/login' in app.get('/manage/csvdatasource/test/download/').location |
752 | 752 |
app = login(app) |
753 | 753 |
resp = app.get('/manage/csvdatasource/test/download/', status=200) |
... | ... | |
851 | 851 | |
852 | 852 | |
853 | 853 |
def test_change_csv_command(setup): |
854 |
csv, url = setup(data=StringIO(data))
|
|
854 |
csv, _ = setup(data=StringIO(data))
|
|
855 | 855 |
call_command('change-csv', 'test', os.path.join(TEST_BASE_DIR, 'data-empty.ods')) |
856 | 856 |
csv.refresh_from_db() |
857 | 857 |
assert list(csv.get_rows()) == [] |
tests/test_family.py | ||
---|---|---|
498 | 498 |
test_orleans_data_import_command() |
499 | 499 |
resource = GenericFamily.objects.get() |
500 | 500 |
family = Family.objects.get(external_id='22380') |
501 |
link = FamilyLink.objects.create(resource=resource, family=family, name_id='testnameid1')
|
|
501 |
FamilyLink.objects.create(resource=resource, family=family, name_id='testnameid1') |
|
502 | 502 |
family = Family.objects.get(external_id='1228') |
503 |
link = FamilyLink.objects.create(resource=resource, family=family, name_id='testnameid2')
|
|
503 |
FamilyLink.objects.create(resource=resource, family=family, name_id='testnameid2') |
|
504 | 504 |
links = resource.get_pending_invoices_by_nameid(None) |
505 | 505 |
assert len(links['data']) == 2 |
506 | 506 |
for uuid, invoices in links['data'].items(): |
... | ... | |
510 | 510 | |
511 | 511 |
def test_incorrect_orleans_data(caplog): |
512 | 512 |
filepath = os.path.join(os.path.dirname(__file__), 'data', 'family_incorrect_data_orleans.zip') |
513 |
resource = GenericFamily.objects.create(
|
|
513 |
GenericFamily.objects.create( |
|
514 | 514 |
title='test orleans', |
515 | 515 |
slug='test-orleans', |
516 | 516 |
archive=File(open(filepath, 'rb'), 'family_incorrect_data_orleans.zip'), |
tests/test_generic_endpoint.py | ||
---|---|---|
165 | 165 |
arcgis.log_evel = 'DEBUG' |
166 | 166 |
arcgis.base_url = 'https://example.net/' |
167 | 167 |
arcgis.save() |
168 |
resp = app.get(
|
|
168 |
app.get( |
|
169 | 169 |
'/arcgis/test/mapservice-query', |
170 | 170 |
params={'lon': 6.172122, 'lat': 48.673836, 'service': 'test'}, |
171 | 171 |
status=200, |
... | ... | |
934 | 934 | |
935 | 935 | |
936 | 936 |
def test_generic_endpoint_superuser_access(db, app, admin_user, simple_user): |
937 |
connector = MDEL.objects.create(slug='test')
|
|
937 |
MDEL.objects.create(slug='test') |
|
938 | 938 |
filename = os.path.join(os.path.dirname(__file__), 'data', 'mdel', 'formdata.json') |
939 | 939 |
payload = json.load(open(filename)) |
940 | 940 |
tests/test_gesbac.py | ||
---|---|---|
128 | 128 |
'card_demand_purpose': 1, |
129 | 129 |
'cards_quantity': 1, |
130 | 130 |
} |
131 |
for count in range(20):
|
|
131 |
for i in range(20):
|
|
132 | 132 |
response = app.post_json('/gesbac/test/create-demand/', params=payload) |
133 | 133 |
assert response.json['err'] == 0 |
134 | 134 |
tests/test_greco.py | ||
---|---|---|
211 | 211 | |
212 | 212 |
payload = copy.copy(CREATE_PAYLOAD) |
213 | 213 |
del payload['application'] |
214 |
resp = app.post_json(url, params=payload)
|
|
214 |
app.post_json(url, params=payload) |
|
215 | 215 |
assert mocked_post.call_count == 2 |
216 | 216 | |
217 | 217 |
def to_json(root): |
tests/test_import_export.py | ||
---|---|---|
122 | 122 | |
123 | 123 |
def test_export_csvdatasource(app, setup, filetype): |
124 | 124 | |
125 |
csvdata, url = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype))
|
|
125 |
csvdata, _ = setup('id,whatever,nom,prenom,sexe', filename=filetype, data=get_file_content(filetype))
|
|
126 | 126 |
query = Query(slug='query-1_', resource=csvdata, structure='array') |
127 | 127 |
query.projections = '\n'.join(['id:int(id)', 'prenom:prenom']) |
128 | 128 |
query.save() |
tests/test_iparapheur.py | ||
---|---|---|
136 | 136 |
) |
137 | 137 |
response._content = force_bytes(soap_response) |
138 | 138 |
mocked_post.return_value = response |
139 |
title, ext = filename.split('.')
|
|
139 |
title, _ = filename.split('.')
|
|
140 | 140 |
base64_data = 'VGVzdCBEb2N1bWVudA==' |
141 | 141 |
data = { |
142 | 142 |
'type': typ, |
... | ... | |
469 | 469 |
response = Response() |
470 | 470 |
response.status_code = 502 |
471 | 471 | |
472 |
soap_response = open( |
|
473 |
os.path.join(os.path.dirname(__file__), 'data/iparapheur_get_file_response.xml'), 'rb' |
|
474 |
).read() |
|
475 | 472 |
response._content = '<p>Bad Gateway</p>' |
476 | 473 |
response.raison = 'Bad Gateway' |
477 | 474 |
mocked_post.return_value = response |
tests/test_jsondatastore.py | ||
---|---|---|
138 | 138 |
jsondatastore.text_value_template = '{{foo}}' |
139 | 139 |
jsondatastore.save() |
140 | 140 |
resp = app.post_json('/jsondatastore/foobar/data/create', params={'foo': 'bar'}) |
141 |
uuid = resp.json['id'] |
|
142 | 141 |
resp = app.get('/jsondatastore/foobar/data/') |
143 | 142 |
assert len(resp.json['data']) == 1 |
144 | 143 |
assert resp.json['data'][0]['text'] == 'bar' |
145 | 144 | |
146 | 145 |
# check entries are alphabetically sorted |
147 | 146 |
resp = app.post_json('/jsondatastore/foobar/data/create', params={'foo': 'aaa'}) |
148 |
uuid = resp.json['id'] |
|
149 | 147 |
resp = app.get('/jsondatastore/foobar/data/') |
150 | 148 |
assert len(resp.json['data']) == 2 |
151 | 149 |
assert resp.json['data'][0]['text'] == 'aaa' |
tests/test_manager.py | ||
---|---|---|
529 | 529 |
title='a title', |
530 | 530 |
description='a description', |
531 | 531 |
) |
532 |
csv2 = CsvDataSource.objects.create(
|
|
532 |
CsvDataSource.objects.create( |
|
533 | 533 |
csv_file=File(data, 't.csv'), |
534 | 534 |
columns_keynames='id, text', |
535 | 535 |
slug='test2', |
tests/test_mdel.py | ||
---|---|---|
311 | 311 |
AEC_PAYLOAD = dict(aec_payload) |
312 | 312 |
display_id = AEC_PAYLOAD['display_id'] |
313 | 313 |
AEC_PAYLOAD['fields']['logitud_commentaire_usager'] = 'gentle user comment' |
314 |
resp = app.post_json('/mdel/test/create', params=aec_payload, status=200)
|
|
314 |
app.post_json('/mdel/test/create', params=aec_payload, status=200) |
|
315 | 315 | |
316 | 316 |
# checking that attached files are referenced in -ent-.xml file |
317 | 317 |
basedir = os.path.join(get_resource_base_dir(), 'test', 'inputs', '%s-EtatCivil-0' % display_id) |
... | ... | |
648 | 648 |
from passerelle.utils.jsonresponse import APIError |
649 | 649 | |
650 | 650 |
with pytest.raises(APIError) as error: |
651 |
date = parse_date('2018-02-29')
|
|
651 |
parse_date('2018-02-29') |
|
652 | 652 |
assert 'day is out of range for month' in str(error) |
653 | 653 | |
654 | 654 |
with pytest.raises(APIError) as error: |
655 |
date = parse_date('28-02-2018')
|
|
655 |
parse_date('28-02-2018') |
|
656 | 656 |
for text in ('date', '28-02-2018', 'not iso-formated'): |
657 | 657 |
assert text in str(error) |
658 | 658 |
tests/test_opengis.py | ||
---|---|---|
594 | 594 |
endpoint = tests.utils.generic_endpoint_url('opengis', 'features', slug=connector.slug) |
595 | 595 |
assert endpoint == '/opengis/test/features' |
596 | 596 |
mocked_get.side_effect = server_responses |
597 |
resp = app.get(endpoint, params={'type_names': '...', 'property_name': '...'})
|
|
597 |
app.get(endpoint, params={'type_names': '...', 'property_name': '...'}) |
|
598 | 598 |
assert mocked_get.call_args[1]['params']['request'] == 'GetFeature' |
599 | 599 |
assert mocked_get.call_args[1]['params']['version'] == version |
600 | 600 |
assert typename_label in mocked_get.call_args[1]['params'].keys() |
tests/test_photon.py | ||
---|---|---|
235 | 235 |
photon.latitude = 1.2 |
236 | 236 |
photon.longitude = 2.1 |
237 | 237 |
photon.save() |
238 |
resp = app.get('/photon/%s/addresses?q=plop' % photon.slug)
|
|
238 |
app.get('/photon/%s/addresses?q=plop' % photon.slug) |
|
239 | 239 |
assert 'lat=%s' % photon.latitude in mocked_get.call_args[0][0] |
240 | 240 |
assert 'lon=%s' % photon.longitude in mocked_get.call_args[0][0] |
241 | 241 | |
242 |
resp = app.get('/photon/%s/addresses?q=plop&lat=42&lon=43' % photon.slug)
|
|
242 |
app.get('/photon/%s/addresses?q=plop&lat=42&lon=43' % photon.slug) |
|
243 | 243 |
assert 'lat=42' in mocked_get.call_args[0][0] |
244 | 244 |
assert 'lon=43' in mocked_get.call_args[0][0] |
245 | 245 |
tests/test_plone_restapi.py | ||
---|---|---|
456 | 456 |
def test_create_wrong_payload(app, connector, token): |
457 | 457 |
endpoint = tests.utils.generic_endpoint_url('plone-restapi', 'create', slug=connector.slug) |
458 | 458 |
assert endpoint == '/plone-restapi/my_connector/create' |
459 |
url = connector.service_url + '/braine-l-alleud' |
|
460 | 459 |
payload = 'not json' |
461 | 460 |
resp = app.post(endpoint + '?uri=braine-l-alleud', params=payload, status=400) |
462 | 461 |
assert resp.json['err'] |
... | ... | |
492 | 491 |
def test_update_wrong_payload(app, connector, token): |
493 | 492 |
endpoint = tests.utils.generic_endpoint_url('plone-restapi', 'update', slug=connector.slug) |
494 | 493 |
assert endpoint == '/plone-restapi/my_connector/update' |
495 |
url = connector.service_url + '/braine-l-alleud/dccd85d12cf54b6899dff41e5a56ee7f' |
|
496 | 494 |
query_string = '?uri=braine-l-alleud&uid=dccd85d12cf54b6899dff41e5a56ee7f' |
497 | 495 |
payload = 'not json' |
498 | 496 |
resp = app.post(endpoint + query_string, params=payload, status=400) |
tests/test_proxylogger.py | ||
---|---|---|
22 | 22 | |
23 | 23 |
@pytest.fixture |
24 | 24 |
def connector(): |
25 |
connector, created = Feed.objects.get_or_create(slug='some-slug')
|
|
25 |
connector, _ = Feed.objects.get_or_create(slug='some-slug')
|
|
26 | 26 |
connector.set_log_level('DEBUG') |
27 | 27 |
connector.url = 'http://example.net/' |
28 | 28 |
connector.save() |
... | ... | |
391 | 391 |
raise requests.ConnectionError('timeout') |
392 | 392 | |
393 | 393 |
monkeypatch.setattr(Feed, 'json', json) |
394 |
resp = app.get(endpoint_url, status=500)
|
|
394 |
app.get(endpoint_url, status=500) |
|
395 | 395 |
assert any('Traceback:' in mail.body for mail in mailoutbox) |
396 | 396 | |
397 | 397 |
tests/test_requests.py | ||
---|---|---|
131 | 131 |
settings.LOGGED_RESPONSES_MAX_SIZE = 7 |
132 | 132 |
with HTTMock(http400_mock): |
133 | 133 |
requests = Request(logger=logger) |
134 |
response = requests.post(url, json={'name': 'josh'})
|
|
134 |
requests.post(url, json={'name': 'josh'}) |
|
135 | 135 | |
136 | 136 |
if logger.level == 10: # DEBUG |
137 | 137 |
records = [record for record in caplog.records if record.name == 'requests'] |
... | ... | |
181 | 181 |
logger = logging.getLogger('requests') |
182 | 182 |
logger.setLevel(logging.DEBUG) |
183 | 183 |
requests = Request(logger=logger) |
184 |
response = requests.get('http://example.net/whatever').text
|
|
184 |
requests.get('http://example.net/whatever').text |
|
185 | 185 |
records = [record for record in caplog.records if record.name == 'requests'] |
186 | 186 | |
187 | 187 |
if 'xml' in endpoint_response.headers.get('Content-Type'): |
... | ... | |
267 | 267 |
credentials = {'id': 'id', 'key': 'key', 'algorithm': 'sha256'} |
268 | 268 |
hawk_auth = HawkAuth(**credentials) |
269 | 269 | |
270 |
resp = request.get('http://httpbin.org/get', auth=hawk_auth)
|
|
270 |
request.get('http://httpbin.org/get', auth=hawk_auth) |
|
271 | 271 |
prepared_method = mocked_send.call_args[0][0] |
272 | 272 |
assert 'Authorization' in prepared_method.headers |
273 | 273 |
generated_header = prepared_method.headers['Authorization'] |
... | ... | |
288 | 288 |
assert dict(generated_parts) == dict(expected_parts) |
289 | 289 | |
290 | 290 |
hawk_auth = HawkAuth(ext='extra attribute', **credentials) |
291 |
resp = request.post('http://httpbin.org/post', auth=hawk_auth, json={'key': 'value'})
|
|
291 |
request.post('http://httpbin.org/post', auth=hawk_auth, json={'key': 'value'}) |
|
292 | 292 |
prepared_method = mocked_send.call_args[0][0] |
293 | 293 |
assert 'Authorization' in prepared_method.headers |
294 | 294 |
generated_header = prepared_method.headers['Authorization'] |
tests/test_sms.py | ||
---|---|---|
265 | 265 |
} |
266 | 266 |
with mock.patch.object(OVHSMSGateway, 'send_msg') as send_function: |
267 | 267 |
send_function.return_value = None |
268 |
result = app.post_json(path, params=payload)
|
|
268 |
app.post_json(path, params=payload) |
|
269 | 269 |
connector.jobs() |
270 | 270 |
assert send_function.call_args[1]['text'] == 'a' * connector.max_message_length |
271 | 271 | |
... | ... | |
282 | 282 |
} |
283 | 283 |
with mock.patch.object(OVHSMSGateway, 'send_msg') as send_function: |
284 | 284 |
send_function.return_value = 1 |
285 |
result = app.post_json(path, params=payload)
|
|
285 |
app.post_json(path, params=payload) |
|
286 | 286 |
connector.jobs() |
287 | 287 |
assert SMSLog.objects.filter( |
288 | 288 |
appname=connector.get_connector_slug(), slug=connector.slug, credits=1 |
... | ... | |
290 | 290 | |
291 | 291 |
with mock.patch.object(OVHSMSGateway, 'send_msg') as send_function: |
292 | 292 |
send_function.return_value = 2 |
293 |
result = app.post_json(path, params=payload)
|
|
293 |
app.post_json(path, params=payload) |
|
294 | 294 |
connector.jobs() |
295 | 295 |
assert SMSLog.objects.filter( |
296 | 296 |
appname=connector.get_connector_slug(), slug=connector.slug, credits=2 |
... | ... | |
336 | 336 |
) |
337 | 337 |
with send_patch as send_function: |
338 | 338 |
send_function.return_value = None |
339 |
result = app.post_json(base_path, params=payload)
|
|
339 |
app.post_json(base_path, params=payload) |
|
340 | 340 |
connector.jobs() |
341 | 341 |
assert send_function.call_args[1]['text'] == 'not a spam' |
342 | 342 |
assert send_function.call_args[1]['stop'] == ('nostop' not in path) |
... | ... | |
466 | 466 |
'creditsLeft': 123, |
467 | 467 |
} |
468 | 468 |
ovh_url = connector.API_URL % {'serviceName': 'sms-test42'} |
469 |
with tests.utils.mock_url(ovh_url, resp, 200) as mocked:
|
|
469 |
with tests.utils.mock_url(ovh_url, resp, 200): |
|
470 | 470 |
connector.jobs() |
471 | 471 |
connector.refresh_from_db() |
472 | 472 |
assert connector.credit_left == 123 |
... | ... | |
478 | 478 |
resp = { |
479 | 479 |
'creditsLeft': 456, |
480 | 480 |
} |
481 |
with tests.utils.mock_url(ovh_url, resp, 200) as mocked:
|
|
481 |
with tests.utils.mock_url(ovh_url, resp, 200): |
|
482 | 482 |
connector.hourly() |
483 | 483 |
assert connector.credit_left == 456 |
484 | 484 | |
... | ... | |
504 | 504 |
freezer.move_to('2019-01-01 00:00:00') |
505 | 505 |
resp = {'creditsLeft': 101} |
506 | 506 |
ovh_url = connector.API_URL % {'serviceName': 'sms-test42'} |
507 |
with tests.utils.mock_url(ovh_url, resp, 200) as mocked:
|
|
507 |
with tests.utils.mock_url(ovh_url, resp, 200): |
|
508 | 508 |
connector.hourly() |
509 | 509 |
assert len(mailoutbox) == 0 |
510 | 510 | |
511 | 511 |
resp = {'creditsLeft': 99} |
512 | 512 |
ovh_url = connector.API_URL % {'serviceName': 'sms-test42'} |
513 |
with tests.utils.mock_url(ovh_url, resp, 200) as mocked:
|
|
513 |
with tests.utils.mock_url(ovh_url, resp, 200): |
|
514 | 514 |
connector.hourly() |
515 | 515 |
assert len(mailoutbox) == 1 |
516 | 516 | |
... | ... | |
527 | 527 |
freezer.move_to('2019-01-01 12:00:00') |
528 | 528 |
resp = {'creditsLeft': 99} |
529 | 529 |
ovh_url = connector.API_URL % {'serviceName': 'sms-test42'} |
530 |
with tests.utils.mock_url(ovh_url, resp, 200) as mocked:
|
|
530 |
with tests.utils.mock_url(ovh_url, resp, 200): |
|
531 | 531 |
connector.hourly() |
532 | 532 |
assert len(mailoutbox) == 0 |
533 | 533 | |
534 | 534 |
freezer.move_to('2019-01-02 01:00:07') |
535 |
with tests.utils.mock_url(ovh_url, resp, 200) as mocked:
|
|
535 |
with tests.utils.mock_url(ovh_url, resp, 200): |
|
536 | 536 |
connector.hourly() |
537 | 537 |
assert len(mailoutbox) == 1 |
538 | 538 | |
... | ... | |
584 | 584 |
ovh_request_token_url = 'https://eu.api.ovh.com/1.0/auth/credential' |
585 | 585 |
ovh_response = {'message': 'Invalid application key'} |
586 | 586 | |
587 |
with tests.utils.mock_url(ovh_request_token_url, ovh_response, 401) as mocked:
|
|
587 |
with tests.utils.mock_url(ovh_request_token_url, ovh_response, 401): |
|
588 | 588 |
resp = resp.click('request access').follow() |
589 | 589 |
assert 'error requesting token: Invalid application key.' in resp.text |
590 | 590 | |
591 | 591 |
ovh_response = 'not-json' |
592 |
with tests.utils.mock_url(ovh_request_token_url, ovh_response, 401) as mocked:
|
|
592 |
with tests.utils.mock_url(ovh_request_token_url, ovh_response, 401): |
|
593 | 593 |
resp = resp.click('request access').follow() |
594 | 594 |
assert 'error requesting token: bad JSON response' in resp.text |
595 | 595 | |
... | ... | |
680 | 680 |
assert len(resp.json['data']['series'][0]['data']) == 0 |
681 | 681 | |
682 | 682 |
freezer.move_to('2021-01-01 12:00') |
683 |
for _ in range(5):
|
|
683 |
for i in range(5):
|
|
684 | 684 |
SMSLog.objects.create(appname='ovh', slug='ovhsmsgateway') |
685 | 685 | |
686 | 686 |
freezer.move_to('2021-02-03 13:00') |
687 |
for _ in range(3):
|
|
687 |
for i in range(3):
|
|
688 | 688 |
SMSLog.objects.create(appname='ovh', slug='ovhsmsgateway') |
689 | 689 | |
690 | 690 |
freezer.move_to('2021-02-06 13:00') |
tests/test_templatetags.py | ||
---|---|---|
33 | 33 |
connector_model = app.get_connector_model() |
34 | 34 |
if connector_model is None: |
35 | 35 |
continue |
36 |
for name, method in inspect.getmembers(connector_model, predicate):
|
|
36 |
for _, method in inspect.getmembers(connector_model, predicate):
|
|
37 | 37 |
if not hasattr(method, 'endpoint_info'): |
38 | 38 |
continue |
39 | 39 |
if method.endpoint_info.post and method.endpoint_info.post.get('request_body', {}).get( |
tests/test_toulouse_smart.py | ||
---|---|---|
599 | 599 |
@mock.patch("django.db.models.fields.UUIDField.get_default", return_value=UUID) |
600 | 600 |
def test_create_intervention_inconsistency_id_error(mocked_uuid4, app, freezer, smart): |
601 | 601 |
freezer.move_to('2021-07-08 00:00:00') |
602 |
resp = app.post_json(URL + 'create-intervention/', params=CREATE_INTERVENTION_PAYLOAD)
|
|
602 |
app.post_json(URL + 'create-intervention/', params=CREATE_INTERVENTION_PAYLOAD) |
|
603 | 603 |
wcs_request = smart.wcs_requests.get(uuid=UUID) |
604 | 604 |
assert wcs_request.status == 'registered' |
605 | 605 |
job = Job.objects.get(method_name='create_intervention_job') |
... | ... | |
619 | 619 |
@mock.patch("django.db.models.fields.UUIDField.get_default", return_value=UUID) |
620 | 620 |
def test_create_intervention_content_error(mocked_uuid, app, freezer, smart): |
621 | 621 |
freezer.move_to('2021-07-08 00:00:00') |
622 |
resp = app.post_json(URL + 'create-intervention/', params=CREATE_INTERVENTION_PAYLOAD)
|
|
622 |
app.post_json(URL + 'create-intervention/', params=CREATE_INTERVENTION_PAYLOAD) |
|
623 | 623 |
wcs_request = smart.wcs_requests.get(uuid=UUID) |
624 | 624 |
assert wcs_request.status == 'registered' |
625 | 625 |
assert 'invalid json' in wcs_request.result |
... | ... | |
632 | 632 |
@mock.patch("django.db.models.fields.UUIDField.get_default", return_value=UUID) |
633 | 633 |
def test_create_intervention_client_error(mocked_uuid, app, freezer, smart): |
634 | 634 |
freezer.move_to('2021-07-08 00:00:00') |
635 |
resp = app.post_json(URL + 'create-intervention/', params=CREATE_INTERVENTION_PAYLOAD)
|
|
635 |
app.post_json(URL + 'create-intervention/', params=CREATE_INTERVENTION_PAYLOAD) |
|
636 | 636 |
wcs_request = smart.wcs_requests.get(uuid=UUID) |
637 | 637 |
assert '400 Client Error' in wcs_request.result |
638 | 638 |
assert wcs_request.tries == 1 |
... | ... | |
739 | 739 |
assert CREATE_INTERVENTION_QUERY[ |
740 | 740 |
'notification_url' |
741 | 741 |
] == 'http://testserver/toulouse-smart/test/update-intervention?uuid=%s' % str(UUID) |
742 |
wcs_request = smart.wcs_requests.get(uuid=UUID)
|
|
742 |
smart.wcs_requests.get(uuid=UUID) |
|
743 | 743 | |
744 | 744 |
mocked_push = mock.patch( |
745 | 745 |
"passerelle.contrib.toulouse_smart.models.SmartRequest.push", |
... | ... | |
989 | 989 |
wcs_request = smart.wcs_requests.get(uuid=UUID) |
990 | 990 |
wcs_request.status = 'failed' |
991 | 991 |
wcs_request.save() |
992 |
wcs_request_file = wcs_request.files.get(**job.parameters) |
|
993 |
path = wcs_request_file.content.path |
|
992 |
wcs_request.files.get(**job.parameters) |
|
994 | 993 | |
995 | 994 |
smart.jobs() |
996 | 995 |
job = Job.objects.get(method_name='add_media_job') |
tests/utils.py | ||
---|---|---|
15 | 15 | |
16 | 16 | |
17 | 17 |
def setup_access_rights(obj): |
18 |
api, created = ApiUser.objects.get_or_create(username='all', keytype='', key='')
|
|
18 |
api, _ = ApiUser.objects.get_or_create(username='all', keytype='', key='')
|
|
19 | 19 |
obj_type = ContentType.objects.get_for_model(obj) |
20 | 20 |
AccessRight.objects.create(codename='can_access', apiuser=api, resource_type=obj_type, resource_pk=obj.pk) |
21 | 21 |
return obj |
22 |
- |