From 332a21916e9f021f0250b3afc7b6ca8d27d4d678 Mon Sep 17 00:00:00 2001 From: Benjamin Dauvergne Date: Tue, 1 Dec 2015 11:06:09 +0100 Subject: [PATCH 1/5] add a common soap module based on suds and requests (#9163) The Soap class contains all initialization behaviours needed to create suds Client objects initialized through a service model or through parameters. --- passerelle/soap.py | 119 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 119 insertions(+) create mode 100644 passerelle/soap.py diff --git a/passerelle/soap.py b/passerelle/soap.py new file mode 100644 index 0000000..ca3e73c --- /dev/null +++ b/passerelle/soap.py @@ -0,0 +1,119 @@ +import os.path +import StringIO +import urlparse + +import requests +from suds.client import Client +from suds.transport.http import HttpAuthenticated +from suds.transport import Reply + + +class Transport(HttpAuthenticated): + def __init__(self, username=None, password=None, certificate=None, private_key=None, + verify=True, domains=None, **kwargs): + assert not (bool(certificate) ^ bool(private_key)), ('a private key is mandatory with a ' + 'certificate') + assert username is not None or not password, 'a username is mandatory with a password' + self.username = username + self.password = password + self.certificate = certificate + self.private_key = private_key + self.verify = verify + self.domains = domains + HttpAuthenticated.__init__(self, **kwargs) + + def get_requests_kwargs(self, request): + kwargs = {} + parsed = urlparse.urlparse(request.url) + domain = '%s://%s' % (parsed.scheme, parsed.netloc) + if not self.domains or domain in self.domains: + if self.username is not None: + kwargs['auth'] = (self.username, self.password) + if self.certificate is not None: + kwargs['cert'] = (self.certificate, self.private_key) + kwargs['verify'] = self.verify + return kwargs + + def open(self, request): + # only use our custom handler to fetch service resources, not schemas + # from other namespaces + print self.get_requests_kwargs(request) + resp = requests.get(request.url, headers=request.headers, + **self.get_requests_kwargs(request)) + return StringIO.StringIO(resp.content) + + def send(self, request): + resp = requests.post(request.url, data=request.message, headers=request.headers, + **self.get_requests_kwargs(request)) + return Reply(resp.status_code, resp.headers, resp.content) + + +class Soap(object): + url = None + path = None + username = None + password = None + certificate = None + private_key = None + domains = None + plugins = None + cache = None + + def __init__(self, url=None, path=None, username=None, password=None, certificate=None, + private_key=None, verify=None, instance=None, plugins=None, cache=None): + assert not (bool(certificate) ^ bool(private_key)), ('a private key is mandatory with a ' + 'certificate') + assert username is not None or not password, 'a username is mandatory with a password' + self.domains = set(self.domains) if self.domains else set() + self.plugins = self.plugins or [] + if url is not None: + self.url = url + if path is not None: + self.path = None + if username is not None: + self.username = username + self.password = password + if certificate is not None: + self.certificate = certificate + self.private_key = private_key + if verify is not None: + self.verify = verify + if instance is not None: + self.init_from_instance(instance) + if plugins is not None: + self.plugins.extend(plugins) + if cache is not None: + self.cache = cache + self.update_domains() + + def update_domains(self): + if self.url is not None: + parsed_url = urlparse.urlparse(self.url) + self.domains.add('%s://%s' % (parsed_url.scheme, parsed_url.netloc)) + + def init_from_instance(self, instance): + if hasattr(instance, 'wsdl_url'): + self.url = instance.wsdl_url + if hasattr(instance, 'username'): + self.username = instance.username + self.password = instance.password + if hasattr(instance, 'keystore'): + self.certificate = instance.keystore.path + self.private_key = self.certificate + if hasattr(instance, 'verify_cert'): + self.verify = instance.verify_cert + + def get_client(self): + url = self.url + if self.path: + url = 'file://' + os.path.abspath(self.path) + return Client(url, + transport=Transport( + username=self.username, + password=self.password, + certificate=self.certificate, + private_key=self.private_key, + verify=self.verify, + domains=self.domains), + plugins=self.plugins, + cache=self.cache) -- 2.1.4