calebasse/calebasse/facturation/transmission_utils.py @ eef0f61d
| 83321fbd | Thomas NOEL | # -*- coding: utf-8 -*-
|
|
| ea764209 | Thomas NOEL | ||
import os
|
|||
import sys
|
|||
| 128aa0a6 | Thomas NOEL | import re
|
|
import base64
|
|||
from datetime import datetime
|
|||
| eef0f61d | Thomas NOEL | import gzip
|
|
import StringIO
|
|||
| 128aa0a6 | Thomas NOEL | ||
| ea764209 | Thomas NOEL | import ldap
|
|
| 669cf6ef | Thomas NOEL | from M2Crypto import X509, SSL, Rand, SMIME, BIO
|
|
| ea764209 | Thomas NOEL | ||
| 87960f8f | Thomas NOEL | MODE_TEST = False
|
|
| 3f445d67 | Thomas NOEL | MODE_COMPRESS = True
|
|
| eef0f61d | Thomas NOEL | MODE_ENCRYPT = True
|
|
| 3f445d67 | Thomas NOEL | ||
| ea764209 | Thomas NOEL | LDAP_HOST = 'ldap://annuaire.gip-cps.fr'
|
|
| 3f445d67 | Thomas NOEL | ||
if MODE_TEST:
|
|||
LDAP_BASEDN = 'o=gip-cps-test,c=fr'
|
|||
CAPATH = '/var/lib/calebasse/test-gip-cps.capath/'
|
|||
else:
|
|||
# production
|
|||
LDAP_BASEDN = 'o=gip-cps,c=fr'
|
|||
CAPATH = '/var/lib/calebasse/gip-cps.capath/'
|
|||
| ea764209 | Thomas NOEL | LDAP_BASEDN_RSS = 'ou=339172288100045,l=Sarthe (72),' + LDAP_BASEDN
|
|
LDAP_X509_ATTR = 'userCertificate;binary'
|
|||
LDAP_CA_ATTRS = {
|
|||
'cert': ('cACertificate;binary', 'CERTIFICATE'),
|
|||
'crl': ('certificateRevocationList;binary', 'X509 CRL'),
|
|||
'delta-crl': ('deltaRevocationList;binary', 'X509 CRL'),
|
|||
}
|
|||
| 128aa0a6 | Thomas NOEL | ||
RANDFILE = '/var/tmp/randpool.dat'
|
|||
| 3f445d67 | Thomas NOEL | if MODE_TEST:
|
|
MAILPATH = '/var/lib/calebasse/test-mail.out/'
|
|||
MESSAGE_ID_RIGHT = 'teletransmission-test.aps42.org'
|
|||
else:
|
|||
# production
|
|||
MAILPATH = '/var/lib/calebasse/mail.out/'
|
|||
MESSAGE_ID_RIGHT = 'teletransmission.aps42.org'
|
|||
| 128aa0a6 | Thomas NOEL | SENDER = 'teletransmission@aps42.org'
|
|
VVVVVV = '100500' # ETS-DT-001-TransportsFlux_SpecsTechCommune_v1.1.pdf
|
|||
NUMERO_EMETTEUR = '00000420788606'
|
|||
EXERCICE = NUMERO_EMETTEUR
|
|||
#
|
|||
# get a certificate from gip-cps LDAP
|
|||
#
|
|||
| ea764209 | Thomas NOEL | ||
def get_certificate(large_regime, dest_organism):
|
|||
"""
|
|||
return a M2Crypto.X509 object, containing the certificate of the health center
|
|||
example :
|
|||
x509 = get_certificate('01', '422') # CPAM Saint Etienne
|
|||
print x509.as_text()
|
|||
print x509.as_pem()
|
|||
"""
|
|||
l = ldap.initialize(LDAP_HOST)
|
|||
cn = large_regime + dest_organism + '@' + dest_organism + '.' + large_regime + '.rss.fr'
|
|||
results = l.search_s(LDAP_BASEDN_RSS, ldap.SCOPE_SUBTREE, '(cn=' + cn + ')')
|
|||
if len(results) > 1:
|
|||
raise LookupError("non unique result for cn=%s" % cn)
|
|||
if len(results) < 1:
|
|||
raise LookupError("no result for cn=%s" % cn)
|
|||
dn = results[0][0]
|
|||
attrs = results[0][1]
|
|||
if LDAP_X509_ATTR not in attrs:
|
|||
raise LookupError("no certificate in dn:%s" % dn)
|
|||
certificates = {}
|
|||
for der in results[0][1][LDAP_X509_ATTR]:
|
|||
x509 = X509.load_cert_der_string(der)
|
|||
serial = x509.get_serial_number()
|
|||
startdate = x509.get_not_after().get_datetime().replace(tzinfo=None)
|
|||
enddate = x509.get_not_before().get_datetime().replace(tzinfo=None)
|
|||
now = datetime.utcnow().replace(tzinfo=None)
|
|||
if startdate >= now >= enddate:
|
|||
# TODO : add capath + crl validation
|
|||
# os.execute(openssl verify -CApath CAPATH -crl_check cert.pem)
|
|||
certificates[serial] = x509
|
|||
if certificates:
|
|||
return certificates[max(certificates)]
|
|||
return None
|
|||
| 128aa0a6 | Thomas NOEL | #
|
|
# IRIS/B2 mail construction
|
|||
#
|
|||
| 79198d1a | Thomas NOEL | def mime(message):
|
|
| 128aa0a6 | Thomas NOEL | # compress
|
|
| 3f445d67 | Thomas NOEL | if MODE_COMPRESS:
|
|
| eef0f61d | Thomas NOEL | sio = StringIO.StringIO()
|
|
gzf = gzip.GzipFile(fileobj=sio, mode='wb', filename='B2.gz')
|
|||
gzf.write(message)
|
|||
gzf.close()
|
|||
zmessage = sio.getvalue()
|
|||
sio.close()
|
|||
| 3f445d67 | Thomas NOEL | else:
|
|
zmessage = message
|
|||
| 79198d1a | Thomas NOEL | if MODE_TEST:
|
|
content_description = 'IRISTEST/B2'
|
|||
| c9387374 | Thomas NOEL | else:
|
|
| 79198d1a | Thomas NOEL | content_description = 'IRIS/B2'
|
|
if MODE_COMPRESS:
|
|||
content_description += '/Z'
|
|||
return "Content-Type: application/EDI-consent\n" + \
|
|||
"Content-Transfer-Encoding: base64\n" + \
|
|||
"Content-Description: " + content_description + "\n" + \
|
|||
| eef0f61d | Thomas NOEL | "\n" + base64.encodestring(zmessage)
|
|
| 79198d1a | Thomas NOEL | ||
def smime(message, x509):
|
|||
"""
|
|||
output s/mime message (headers+payload), encrypted with x509 certificate
|
|||
"""
|
|||
# encrypt
|
|||
if RANDFILE:
|
|||
Rand.load_file(RANDFILE, -1)
|
|||
s = SMIME.SMIME()
|
|||
sk = X509.X509_Stack()
|
|||
sk.push(x509)
|
|||
s.set_x509_stack(sk)
|
|||
s.set_cipher(SMIME.Cipher('des_ede3_cbc'))
|
|||
bio = BIO.MemoryBuffer(message)
|
|||
pkcs7 = s.encrypt(bio)
|
|||
if RANDFILE:
|
|||
Rand.save_file(RANDFILE)
|
|||
| c236d932 | Thomas NOEL | out = BIO.MemoryBuffer()
|
|
s.write(out, pkcs7)
|
|||
return out.read()
|
|||
| ea764209 | Thomas NOEL | ||
| 48f503d4 | Thomas NOEL | def build_mail(large_regime, dest_organism, b2_filename):
|
|
| 128aa0a6 | Thomas NOEL | """
|
|
| a9f9972f | Thomas NOEL | build a mail to healt center, with b2-compressed+encrypted information
|
|
| 128aa0a6 | Thomas NOEL | """
|
|
| 48f503d4 | Thomas NOEL | b2_fd = open(b2_filename, 'r')
|
|
b2 = b2_fd.read()
|
|||
b2_fd.close()
|
|||
| 79198d1a | Thomas NOEL | stamp = datetime.utcnow().strftime('%Y%m%d%H%M%S')
|
|
| 128aa0a6 | Thomas NOEL | # count invoice in the b2 file = lines start with "5"
|
|
| 48f503d4 | Thomas NOEL | nb_invoices = [b2[x*128] for x in range(len(b2)/128)].count('5')
|
|
| 79198d1a | Thomas NOEL | subject = 'IR%s/%s/%s/%0.5d' % (VVVVVV, EXERCICE, stamp, nb_invoices)
|
|
delimiter = '_ENTROUVERT-CALEBASSE-B2-%s' % stamp
|
|||
| 128aa0a6 | Thomas NOEL | mail = {
|
|
'From': SENDER,
|
|||
'To': large_regime + dest_organism + '@' + dest_organism + '.' + large_regime + '.rss.fr',
|
|||
| 79198d1a | Thomas NOEL | 'Message-ID': '<%s@%s>' % (stamp, MESSAGE_ID_RIGHT),
|
|
| 128aa0a6 | Thomas NOEL | 'Subject': subject,
|
|
| 2c226400 | Thomas NOEL | 'Mime-Version': '1.0',
|
|
'Content-Type': 'multipart/mixed; boundary="%s"' % delimiter,
|
|||
| 128aa0a6 | Thomas NOEL | }
|
|
| eef0f61d | Thomas NOEL | mime_part = mime(b2)
|
|
if MODE_ENCRYPT:
|
|||
mime_part = smime(mime(b2), get_certificate(large_regime, dest_organism))
|
|||
| 128aa0a6 | Thomas NOEL | ||
| 48f503d4 | Thomas NOEL | filename = b2_filename + '-mail'
|
|
| 128aa0a6 | Thomas NOEL | fd = open(filename, 'w')
|
|
for k,v in mail.items():
|
|||
fd.write('%s: %s\n' % (k,v))
|
|||
| 2c226400 | Thomas NOEL | fd.write('\n')
|
|
fd.write('--%s\n' % delimiter)
|
|||
| eef0f61d | Thomas NOEL | fd.write(mime_part)
|
|
fd.write('--%s--\n' % delimiter)
|
|||
| 128aa0a6 | Thomas NOEL | fd.close()
|
|
| 59e8fca9 | Thomas NOEL | return filename
|
|
| 128aa0a6 | Thomas NOEL | ||
#
|
|||
# CApath construction
|
|||
#
|
|||
| ea764209 | Thomas NOEL | def der2pem(der, type_='CERTIFICATE'):
|
|
return "-----BEGIN %s-----\n%s\n-----END %s-----\n" % \
|
|||
(type_, base64.encodestring(der).rstrip(), type_)
|
|||
| 3f445d67 | Thomas NOEL | def build_capath(path=CAPATH):
|
|
| ea764209 | Thomas NOEL | """
|
|
| 128aa0a6 | Thomas NOEL | get all pkiCA from the gip-cps.fr ldap, store them in path
|
|
note: the gip-cps.fr ldap is limited to 10 objects in a response... by chance, there is less than 10 pkiCA ;)
|
|||
| ea764209 | Thomas NOEL | """
|
|
l = ldap.initialize(LDAP_HOST)
|
|||
results = l.search_s(LDAP_BASEDN,ldap.SCOPE_SUBTREE,'(objectclass=pkiCA)')
|
|||
for ca in results:
|
|||
dn = ca[0]
|
|||
for attr in LDAP_CA_ATTRS:
|
|||
if LDAP_CA_ATTRS[attr][0] in ca[1]:
|
|||
| 3f445d67 | Thomas NOEL | n = 0
|
|
| ea764209 | Thomas NOEL | for der in ca[1][LDAP_CA_ATTRS[attr][0]]:
|
|
| 3f445d67 | Thomas NOEL | filename = os.path.join(path, '%s.%d.%s.pem' % (dn, n, attr))
|
|
| ea764209 | Thomas NOEL | print "create ", filename
|
|
fd = open(filename, 'w')
|
|||
fd.write(der2pem(der, LDAP_CA_ATTRS[attr][1]))
|
|||
fd.close()
|
|||
| 3f445d67 | Thomas NOEL | n += 1
|
|
| 48f503d4 | Thomas NOEL | os.chdir(path)
|
|
os.system('c_rehash .')
|
|||
| ea764209 | Thomas NOEL | ||
| b7f22827 | Thomas NOEL | #
|
|
#
|
|||
#
|
|||
| 48f503d4 | Thomas NOEL | ||
if __name__ == '__main__' and (len(sys.argv) > 1):
|
|||
| c9387374 | Thomas NOEL | action = sys.argv[1]
|
|
if action == 'build_capath':
|
|||
build_capath()
|
|||
if action == 'x509':
|
|||
x509 = get_certificate(sys.argv[2], sys.argv[3])
|
|||
print x509.as_text()
|
|||
print x509.as_pem()
|
|||
| 48f503d4 | Thomas NOEL | if MODE_TEST and (action == 'test'):
|
|
print build_mail('01', '996', 'test.b2')
|