From b548fb2043b19872b0b986da45fc0d383a8bff9b Mon Sep 17 00:00:00 2001 From: Benjamin Dauvergne Date: Wed, 23 Mar 2022 08:12:31 +0100 Subject: [PATCH 6/9] wip: add wsse:UsernameToken support --- .../apps/soap/migrations/0001_initial.py | 8 ++++++ passerelle/apps/soap/models.py | 13 ++++++++- tests/test_soap.py | 28 +++++++++++++++++++ 3 files changed, 48 insertions(+), 1 deletion(-) diff --git a/passerelle/apps/soap/migrations/0001_initial.py b/passerelle/apps/soap/migrations/0001_initial.py index 74938ed4..6285d7a7 100644 --- a/passerelle/apps/soap/migrations/0001_initial.py +++ b/passerelle/apps/soap/migrations/0001_initial.py @@ -66,6 +66,14 @@ class Migration(migrations.Migration): 'zeep_xsd_ignore_sequence_order', models.BooleanField(default=False, verbose_name='Ignore sequence order'), ), + ( + 'zeep_wsse_username', + models.CharField(max_length=256, blank=True, default='', verbose_name='WSSE Username'), + ), + ( + 'zeep_wsse_password', + models.CharField(max_length=256, blank=True, default='', verbose_name='WSSE Password'), + ), ( 'users', models.ManyToManyField( diff --git a/passerelle/apps/soap/models.py b/passerelle/apps/soap/models.py index ad713ad6..e653a434 100644 --- a/passerelle/apps/soap/models.py +++ b/passerelle/apps/soap/models.py @@ -24,6 +24,7 @@ from django.db import models from django.forms import ValidationError from django.utils.functional import cached_property from django.utils.translation import ugettext_lazy as _ +from zeep.wsse.username import UsernameToken from passerelle.base.models import BaseResource, HTTPResource from passerelle.utils.api import endpoint @@ -39,6 +40,12 @@ class SOAPConnector(BaseResource, HTTPResource): zeep_xsd_ignore_sequence_order = models.BooleanField( default=True, verbose_name=_('Ignore sequence order') ) + zeep_wsse_username = models.CharField( + max_length=256, blank=True, default='', verbose_name=_('WSSE Username') + ) + zeep_wsse_password = models.CharField( + max_length=256, blank=True, default='', verbose_name=_('WSSE Password') + ) category = _('Business Process Connectors') class Meta: @@ -54,16 +61,20 @@ class SOAPConnector(BaseResource, HTTPResource): def get_manager_form_class(cls, **kwargs): form_class = super().get_manager_form_class(**kwargs) fields = list(form_class.base_fields.items()) - form_class.base_fields = collections.OrderedDict(fields[:2] + fields[-3:] + fields[2:-3]) + form_class.base_fields = collections.OrderedDict(fields[:3] + fields[-5:] + fields[3:-5]) return form_class @cached_property def client(self): + kwargs = {} + if self.zeep_wsse_username: + kwargs['wsse'] = UsernameToken(self.zeep_wsse_username, self.zeep_wsse_password) return self.soap_client( wsdl_url=self.wsdl_url, settings=zeep.Settings( strict=self.zeep_strict, xsd_ignore_sequence_order=self.zeep_xsd_ignore_sequence_order ), + **kwargs, ) @endpoint( diff --git a/tests/test_soap.py b/tests/test_soap.py index ebcb2592..6695f5a6 100644 --- a/tests/test_soap.py +++ b/tests/test_soap.py @@ -13,7 +13,9 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . +import base64 import urllib.parse +import xml.etree.ElementTree as ET import pytest import utils @@ -321,3 +323,29 @@ def test_say_hello_method_ok_post_json(connector, app, caplog, soap): assert '>Bill<' in soap.endpoint_mock.handlers[0].call['requests'][-1].body.decode() assert '>Doe<' in soap.endpoint_mock.handlers[0].call['requests'][-1].body.decode() assert resp.json == {'data': soap.OUTPUT_DATA, 'err': 0} + + +@pytest.mark.parametrize('soap', [SOAP12], indirect=True) +class TestAuthencation: + def test_basic_auth(self, connector, app, caplog, soap): + connector.basic_auth_username = 'username' + connector.basic_auth_password = 'password' + connector.save() + + app.post_json('/soap/test/method/sayHello/', params=soap.INPUT_DATA) + assert ( + base64.b64decode( + soap.endpoint_mock.handlers[0].call['requests'][1].headers['Authorization'].split()[1] + ) + == b'username:password' + ) + assert b'wsse:UsernameToken' not in soap.endpoint_mock.handlers[0].call['requests'][1].body + + def test_username_token(self, connector, app, caplog, soap): + connector.zeep_wsse_username = 'username' + connector.zeep_wsse_password = 'password' + connector.save() + + app.post_json('/soap/test/method/sayHello/', params=soap.INPUT_DATA) + assert 'Authorization' not in soap.endpoint_mock.handlers[0].call['requests'][1].headers + assert b'wsse:UsernameToken' in soap.endpoint_mock.handlers[0].call['requests'][1].body -- 2.35.1