Projet

Général

Profil

0001-atos_genesys-make-search-endpoint-a-datasource-of-co.patch

Benjamin Dauvergne, 29 mai 2019 13:04

Télécharger (12,6 ko)

Voir les différences:

Subject: [PATCH] atos_genesys: make search endpoint a datasource of contact
 informations (#33492)

The datasource is only provisionned if the search find one and only one
dossier, otherwise it's empty. We also return if for the given NameID
the person is already paired.
 passerelle/apps/atos_genesys/models.py | 109 ++++++++++++++++++---
 tests/test_atos_genesys.py             | 125 ++++++++++++++++++++++---
 2 files changed, 205 insertions(+), 29 deletions(-)
passerelle/apps/atos_genesys/models.py
269 269
        d['DROITS'] = droits
270 270
        # create CIVILITE
271 271
        for identification in d.get('IDENTIFICATION', []):
272
            sexe = identification['SEXE']
273
            identification['CIVILITE'] = {'M': u'Monsieur', 'F': u'Madame'}.get(sexe)
272
            sexe = identification.get('SEXE', '')
273
            identification['CIVILITE'] = {'M': u'Monsieur', 'F': u'Madame'}.get(sexe, '')
274 274
        return d
275 275

  
276 276
    @endpoint(name='dossiers',
......
361 361
                      'example_value': '1987-10-23',
362 362
                  }
363 363
              })
364
    def search(self, request, first_name, last_name, date_of_birth):
364
    def search(self, request, first_name, last_name, date_of_birth, NameID=None):
365 365
        try:
366
            date_of_birth = datetime.datetime.strptime(date_of_birth, '%Y-%m-%d')
366
            date_of_birth = datetime.datetime.strptime(date_of_birth, '%Y-%m-%d').date()
367 367
        except (ValueError, TypeError):
368 368
            raise APIError('invalid date_of_birth: %r' % date_of_birth)
369 369
        beneficiaires = self.call_cherche_beneficiaire(
370 370
            prenom=first_name,
371 371
            nom=last_name,
372 372
            dob=date_of_birth)
373
        data = []
374
        dossiers = []
375
        # get dossiers of found beneficiaries
373 376
        for beneficiaire in beneficiaires:
374
            ref_per = beneficiaire.get('REF_PER')
375
            if not ref_per:
377
            id_per = beneficiaire.get('ID_PER')
378
            if not id_per:
379
                self.logger.warning('no ID_PER')
380
                continue
381
            try:
382
                dob = beneficiaire['DATE_NAISSANCE']
383
            except KeyError:
384
                self.logger.warning('id_per %s: no DATE_NAISSANCE', id_per)
385
                continue
386
            try:
387
                dob = datetime.datetime.strptime(dob, '%d/%m/%Y').date()
388
            except (ValueError, TypeError):
389
                self.logger.warning('id_per %s: invalid DATE_NAISSANCE', id_per)
390
                continue
391
            if dob != date_of_birth:
392
                self.logger.debug('ignoring id_per %s different dob %s != %s', id_per, dob, date_of_birth)
393
                continue
394
            dossier = self.call_select_usager(id_per)
395
            try:
396
                identification = dossier['IDENTIFICATION'][0]
397
            except KeyError:
398
                self.logger.debug('id_per %s: dossier is empty', id_per)
399
                continue
400
            if not identification['ID_PER'] == id_per:
401
                self.logger.warning('id_per %s: ID_PER differs', id_per)
376 402
                continue
377
            dossier = self.call_select_usager_by_ref(ref_per)
378
            identification = dossier['IDENTIFICATION'][0]
379
            beneficiaire['ID_PER'] = identification['ID_PER']
380
            beneficiaire['TEL_FIXE'] = identification.get('TEL_FIXE', '')
381
            beneficiaire['TEL_MOBILE'] = identification.get('TEL_MOBILE', '')
382
            beneficiaire['MAIL'] = identification.get('MAIL', '')
383
        return {'data': beneficiaires}
403
            dossiers.append(dossier)
404

  
405
        # there must be only one
406
        if len(dossiers) == 0:
407
            raise APIError('not-found')
408
        if len(dossiers) > 1:
409
            raise APIError('too-many')
410

  
411
        # get contact informations
412
        identification = dossiers[0]['IDENTIFICATION'][0]
413
        id_per = identification['ID_PER']
414
        nom = identification.get('NOM', '')
415
        prenom = identification.get('PRENOM', '')
416
        nom_naissance = identification.get('NOM_NAISSANCE', '')
417
        tel1 = ''.join(c for c in identification.get('TEL_MOBILE', '') if c.isdigit())
418
        tel2 = ''.join(c for c in identification.get('TEL_FIXE', '') if c.isdigit())
419
        email = identification.get('MAIL', '').strip()
420
        if tel1 and tel1[:2] in ('06', '07'):
421
            data.append({
422
                'id': 'tel1',
423
                'text': 'par SMS vers ' + tel1[:2] + '*****' + tel1[-3:],
424
                'phone': tel1,
425

  
426
                'id_per': id_per,
427
                'nom': nom,
428
                'prenom': prenom,
429
                'nom_naissance': nom_naissance,
430
            })
431
        if tel2 and tel2[:2] in ('06', '07'):
432
            data.append({
433
                'id': 'tel2',
434
                'text': 'par SMS vers ' + tel2[:2] + '*****' + tel2[-3:],
435
                'phone': tel2,
436

  
437
                'id_per': id_per,
438
                'nom': nom,
439
                'prenom': prenom,
440
                'nom_naissance': nom_naissance,
441
            })
442
        if email:
443
            data.append({
444
                'id': 'email1',
445
                'text': 'par courriel vers ' + email[:2] + '***@***' + email[-3:],
446
                'email': email,
447

  
448
                'id_per': id_per,
449
                'nom': nom,
450
                'prenom': prenom,
451
                'nom_naissance': nom_naissance,
452
            })
453
        if len(data) == 0:
454
            self.logger.debug('id_per %s: no contact information, ignored', id_per)
455
            raise APIError('no-contacts')
456
        try:
457
            link = NameID and Link.objects.get(resource=self, name_id=NameID, id_per=id_per)
458
        except Link.DoesNotExist:
459
            link = None
460
        return {
461
            'data': data,
462
            'already_paired': link is not None,
463
            'link_id': link and link.id,
464
        }
384 465

  
385 466
    @endpoint(name='link-by-id-per',
386 467
              methods=['post'],
387 468
              description=_('Create link with an extranet account'),
388 469
              perm='can_access',
389 470
              parameters={
390
                  'NameID':{
471
                  'NameID': {
391 472
                      'description': _('Publik NameID'),
392 473
                      'example_value': 'xyz24d934',
393 474
                  },
tests/test_atos_genesys.py
231 231
    assert rlc() == 2
232 232
    assert f.calls == 2
233 233

  
234
RESPONSE_SEARCH = '''<?xml version="1.0" encoding="UTF-8"?><return><ROWSET>
235
  <ROW num="1">
236
    <NOMPER>John</NOMPER>
237
    <PRENOMPER>Doe</PRENOMPER>
238
    <DATE_NAISSANCE>01/01/1925</DATE_NAISSANCE>
239
    <REF_PER>951858</REF_PER>
240
    <LIEU_NAIS>ANTIBES (006)</LIEU_NAIS>
241
    <PAY_NAIS>FRANCE</PAY_NAIS>
242
    <ID_PER>1234</ID_PER>
243
  </ROW>
244
</ROWSET>
245
</return>'''
234 246

  
235
def test_ws_search(app, genesys):
236
    url = utils.generic_endpoint_url('atos-genesys', 'search', slug=genesys.slug)
237

  
238
    RESPONSE_SEARCH = '''<?xml version="1.0" encoding="UTF-8"?><return><ROWSET>
247
RESPONSE_SEARCH_TOO_MANY = '''<?xml version="1.0" encoding="UTF-8"?><return><ROWSET>
239 248
  <ROW num="1">
240 249
    <NOMPER>John</NOMPER>
241 250
    <PRENOMPER>Doe</PRENOMPER>
......
243 252
    <REF_PER>951858</REF_PER>
244 253
    <LIEU_NAIS>ANTIBES (006)</LIEU_NAIS>
245 254
    <PAY_NAIS>FRANCE</PAY_NAIS>
255
    <ID_PER>1234</ID_PER>
256
  </ROW>
257
  <ROW num="2">
258
    <NOMPER>Johnny</NOMPER>
259
    <PRENOMPER>Doe</PRENOMPER>
260
    <DATE_NAISSANCE>01/01/1925</DATE_NAISSANCE>
261
    <REF_PER>951858</REF_PER>
262
    <LIEU_NAIS>ANTIBES (006)</LIEU_NAIS>
263
    <PAY_NAIS>FRANCE</PAY_NAIS>
264
    <ID_PER>1234</ID_PER>
246 265
  </ROW>
247 266
</ROWSET>
248 267
</return>'''
249 268

  
269
RESPONSE_SELECT_USAGER_NO_CONTACTS = '''<?xml version="1.0"?>
270
    <return><ROWSET>
271
      <ROW num="1">
272
        <IDENTIFICATION>
273
          <IDENTIFICATION_ROW num="1">
274
            <ID_PER>1234</ID_PER>
275
          </IDENTIFICATION_ROW>
276
        </IDENTIFICATION>
277
      </ROW>
278
    </ROWSET></return>'''
279

  
280

  
281
def test_ws_search(app, genesys):
282
    url = utils.generic_endpoint_url('atos-genesys', 'search', slug=genesys.slug)
283

  
250 284
    with utils.mock_url(FAKE_URL + 'WSUsagerPublik/services/PublikService/chercheBeneficiaire', RESPONSE_SEARCH):
251
        with utils.mock_url(FAKE_URL + 'WSUsagerPublik/services/PublikService/selectUsagerByRef',
285
        with utils.mock_url(FAKE_URL + 'WSUsagerPublik/services/PublikService/selectUsager',
252 286
                            RESPONSE_SELECT_USAGER):
253 287
            response = app.get(url + '?' + urlencode({
254 288
                'first_name': 'John',
......
256 290
                'date_of_birth': '1925-01-01',
257 291
            }))
258 292
    assert response.json['err'] == 0
259
    assert len(response.json['data']) == 1
260
    data = response.json['data'][0]
261
    assert data['REF_PER'] == '951858'
262
    assert data['ID_PER'] == '1234'
263
    assert data['NOMPER'] == 'John'
264
    assert data['PRENOMPER'] == 'Doe'
265
    assert data['DATE_NAISSANCE'] == '01/01/1925'
266
    assert data['TEL_FIXE'] == '06.44.44.44.44'
267
    assert data['TEL_MOBILE'] == '06.55.55.55.55'
268
    assert data['MAIL'] == 'test@sirus.fr'
293
    assert response.json['already_paired'] is False
294
    assert response.json['link_id'] is None
295
    assert len(response.json['data']) == 3
296
    data = response.json['data']
297
    assert data == [
298
        {
299
            u'id': u'tel1',
300
            u'id_per': u'1234',
301
            u'nom': u'DOE',
302
            u'nom_naissance': u'TEST',
303
            u'phone': u'0655555555',
304
            u'prenom': u'John',
305
            u'text': u'par SMS vers 06*****555'},
306
        {
307
            u'id': u'tel2',
308
            u'id_per': u'1234',
309
            u'nom': u'DOE',
310
            u'nom_naissance': u'TEST',
311
            u'phone': u'0644444444',
312
            u'prenom': u'John',
313
            u'text': u'par SMS vers 06*****444'
314
        },
315
        {
316
            u'email': u'test@sirus.fr',
317
            u'id': u'email1',
318
            u'id_per': u'1234',
319
            u'nom': u'DOE',
320
            u'nom_naissance': u'TEST',
321
            u'prenom': u'John',
322
            u'text': u'par courriel vers te***@***.fr'
323
        }
324
    ]
325

  
326
    with utils.mock_url(FAKE_URL + 'WSUsagerPublik/services/PublikService/chercheBeneficiaire', RESPONSE_SEARCH):
327
        with utils.mock_url(FAKE_URL + 'WSUsagerPublik/services/PublikService/selectUsager',
328
                            RESPONSE_SELECT_USAGER):
329
            response = app.get(url + '?' + urlencode({
330
                'first_name': 'John',
331
                'last_name': 'Doe',
332
                'date_of_birth': '1925-01-02',
333
            }))
334
            assert response.json['err'] == 1
335
            assert response.json['err_desc'] == 'not-found'
336

  
337
    with utils.mock_url(FAKE_URL + 'WSUsagerPublik/services/PublikService/chercheBeneficiaire',
338
                        RESPONSE_SEARCH):
339
        with utils.mock_url(FAKE_URL + 'WSUsagerPublik/services/PublikService/selectUsager',
340
                            RESPONSE_SELECT_USAGER_NO_CONTACTS):
341
            response = app.get(url + '?' + urlencode({
342
                'first_name': 'John',
343
                'last_name': 'Doe',
344
                'date_of_birth': '1925-01-01',
345
            }))
346
            assert response.json['err'] == 1
347
            assert response.json['err_desc'] == 'no-contacts'
269 348

  
270 349

  
271 350
def test_ws_link_by_id_per(app, genesys):
......
285 364
    data = response.json
286 365
    assert data['new']
287 366
    assert data['link_id'] == link.pk
367

  
368
    url = utils.generic_endpoint_url('atos-genesys', 'search', slug=genesys.slug)
369

  
370
    with utils.mock_url(FAKE_URL + 'WSUsagerPublik/services/PublikService/chercheBeneficiaire', RESPONSE_SEARCH):
371
        with utils.mock_url(FAKE_URL + 'WSUsagerPublik/services/PublikService/selectUsager',
372
                            RESPONSE_SELECT_USAGER):
373
            response = app.get(url + '?' + urlencode({
374
                'first_name': 'John',
375
                'last_name': 'Doe',
376
                'date_of_birth': '1925-01-01',
377
                'NameID': 'zob',
378
            }))
379
    assert response.json['err'] == 0
380
    assert response.json['already_paired'] is True
381
    assert response.json['link_id'] == link.id
382
    assert len(response.json['data']) == 3
288
-