Projet

Général

Profil

0007-wip-add-wsse-UsernameToken-support.patch

Benjamin Dauvergne, 25 mars 2022 12:17

Télécharger (5,2 ko)

Voir les différences:

Subject: [PATCH 07/11] wip: add wsse:UsernameToken support

 .../apps/soap/migrations/0001_initial.py      |  8 ++++++
 passerelle/apps/soap/models.py                | 13 ++++++++-
 tests/test_soap.py                            | 28 +++++++++++++++++++
 3 files changed, 48 insertions(+), 1 deletion(-)
passerelle/apps/soap/migrations/0001_initial.py
66 66
                    'zeep_xsd_ignore_sequence_order',
67 67
                    models.BooleanField(default=False, verbose_name='Ignore sequence order'),
68 68
                ),
69
                (
70
                    'zeep_wsse_username',
71
                    models.CharField(max_length=256, blank=True, default='', verbose_name='WSSE Username'),
72
                ),
73
                (
74
                    'zeep_wsse_password',
75
                    models.CharField(max_length=256, blank=True, default='', verbose_name='WSSE Password'),
76
                ),
69 77
                (
70 78
                    'users',
71 79
                    models.ManyToManyField(
passerelle/apps/soap/models.py
24 24
from django.forms import ValidationError
25 25
from django.utils.functional import cached_property
26 26
from django.utils.translation import ugettext_lazy as _
27
from zeep.wsse.username import UsernameToken
27 28

  
28 29
from passerelle.base.models import BaseResource, HTTPResource
29 30
from passerelle.utils.api import endpoint
......
39 40
    zeep_xsd_ignore_sequence_order = models.BooleanField(
40 41
        default=True, verbose_name=_('Ignore sequence order')
41 42
    )
43
    zeep_wsse_username = models.CharField(
44
        max_length=256, blank=True, default='', verbose_name=_('WSSE Username')
45
    )
46
    zeep_wsse_password = models.CharField(
47
        max_length=256, blank=True, default='', verbose_name=_('WSSE Password')
48
    )
42 49
    category = _('Business Process Connectors')
43 50

  
44 51
    class Meta:
......
54 61
    def get_manager_form_class(cls, **kwargs):
55 62
        form_class = super().get_manager_form_class(**kwargs)
56 63
        fields = list(form_class.base_fields.items())
57
        form_class.base_fields = collections.OrderedDict(fields[:2] + fields[-3:] + fields[2:-3])
64
        form_class.base_fields = collections.OrderedDict(fields[:3] + fields[-5:] + fields[3:-5])
58 65
        return form_class
59 66

  
60 67
    @cached_property
61 68
    def client(self):
69
        kwargs = {}
70
        if self.zeep_wsse_username:
71
            kwargs['wsse'] = UsernameToken(self.zeep_wsse_username, self.zeep_wsse_password)
62 72
        return self.soap_client(
63 73
            wsdl_url=self.wsdl_url,
64 74
            settings=zeep.Settings(
65 75
                strict=self.zeep_strict, xsd_ignore_sequence_order=self.zeep_xsd_ignore_sequence_order
66 76
            ),
77
            **kwargs,
67 78
        )
68 79

  
69 80
    @endpoint(
tests/test_soap.py
13 13
# You should have received a copy of the GNU Affero General Public License
14 14
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
15 15

  
16
import base64
16 17
import urllib.parse
18
import xml.etree.ElementTree as ET
17 19

  
18 20
import pytest
19 21
import utils
......
321 323
    assert '>Bill<' in soap.endpoint_mock.handlers[0].call['requests'][-1].body.decode()
322 324
    assert '>Doe<' in soap.endpoint_mock.handlers[0].call['requests'][-1].body.decode()
323 325
    assert resp.json == {'data': soap.OUTPUT_DATA, 'err': 0}
326

  
327

  
328
@pytest.mark.parametrize('soap', [SOAP12], indirect=True)
329
class TestAuthencation:
330
    def test_basic_auth(self, connector, app, caplog, soap):
331
        connector.basic_auth_username = 'username'
332
        connector.basic_auth_password = 'password'
333
        connector.save()
334

  
335
        app.post_json('/soap/test/method/sayHello/', params=soap.INPUT_DATA)
336
        assert (
337
            base64.b64decode(
338
                soap.endpoint_mock.handlers[0].call['requests'][1].headers['Authorization'].split()[1]
339
            )
340
            == b'username:password'
341
        )
342
        assert b'wsse:UsernameToken' not in soap.endpoint_mock.handlers[0].call['requests'][1].body
343

  
344
    def test_username_token(self, connector, app, caplog, soap):
345
        connector.zeep_wsse_username = 'username'
346
        connector.zeep_wsse_password = 'password'
347
        connector.save()
348

  
349
        app.post_json('/soap/test/method/sayHello/', params=soap.INPUT_DATA)
350
        assert 'Authorization' not in soap.endpoint_mock.handlers[0].call['requests'][1].headers
351
        assert b'wsse:UsernameToken' in soap.endpoint_mock.handlers[0].call['requests'][1].body
324
-