Project

General

Profile

Download (6.62 KB) Statistics
| Branch: | Tag: | Revision:
83321fbd Thomas NOEL
# -*- coding: utf-8 -*-
ea764209 Thomas NOEL
import os
import sys
128aa0a6 Thomas NOEL
import re
import base64
from datetime import datetime
eef0f61d Thomas NOEL
import gzip
import StringIO
128aa0a6 Thomas NOEL
ea764209 Thomas NOEL
import ldap
669cf6ef Thomas NOEL
from M2Crypto import X509, SSL, Rand, SMIME, BIO
ea764209 Thomas NOEL
87960f8f Thomas NOEL
MODE_TEST = False
3f445d67 Thomas NOEL
MODE_COMPRESS = True
eef0f61d Thomas NOEL
MODE_ENCRYPT = True
3f445d67 Thomas NOEL
ea764209 Thomas NOEL
LDAP_HOST = 'ldap://annuaire.gip-cps.fr'
3f445d67 Thomas NOEL
if MODE_TEST:
LDAP_BASEDN = 'o=gip-cps-test,c=fr'
CAPATH = '/var/lib/calebasse/test-gip-cps.capath/'
else:
# production
LDAP_BASEDN = 'o=gip-cps,c=fr'
CAPATH = '/var/lib/calebasse/gip-cps.capath/'

ea764209 Thomas NOEL
LDAP_BASEDN_RSS = 'ou=339172288100045,l=Sarthe (72),' + LDAP_BASEDN
LDAP_X509_ATTR = 'userCertificate;binary'
LDAP_CA_ATTRS = {
'cert': ('cACertificate;binary', 'CERTIFICATE'),
'crl': ('certificateRevocationList;binary', 'X509 CRL'),
'delta-crl': ('deltaRevocationList;binary', 'X509 CRL'),
}
128aa0a6 Thomas NOEL
RANDFILE = '/var/tmp/randpool.dat'

3f445d67 Thomas NOEL
if MODE_TEST:
MAILPATH = '/var/lib/calebasse/test-mail.out/'
MESSAGE_ID_RIGHT = 'teletransmission-test.aps42.org'
else:
# production
MAILPATH = '/var/lib/calebasse/mail.out/'
MESSAGE_ID_RIGHT = 'teletransmission.aps42.org'
128aa0a6 Thomas NOEL
SENDER = 'teletransmission@aps42.org'
VVVVVV = '100500' # ETS-DT-001-TransportsFlux_SpecsTechCommune_v1.1.pdf
NUMERO_EMETTEUR = '00000420788606'
EXERCICE = NUMERO_EMETTEUR

#
# get a certificate from gip-cps LDAP
#
ea764209 Thomas NOEL
def get_certificate(large_regime, dest_organism):
"""
return a M2Crypto.X509 object, containing the certificate of the health center

example :
x509 = get_certificate('01', '422') # CPAM Saint Etienne
print x509.as_text()
print x509.as_pem()
"""
l = ldap.initialize(LDAP_HOST)
cn = large_regime + dest_organism + '@' + dest_organism + '.' + large_regime + '.rss.fr'
results = l.search_s(LDAP_BASEDN_RSS, ldap.SCOPE_SUBTREE, '(cn=' + cn + ')')
if len(results) > 1:
raise LookupError("non unique result for cn=%s" % cn)
if len(results) < 1:
raise LookupError("no result for cn=%s" % cn)
dn = results[0][0]
attrs = results[0][1]
if LDAP_X509_ATTR not in attrs:
raise LookupError("no certificate in dn:%s" % dn)
certificates = {}
for der in results[0][1][LDAP_X509_ATTR]:
x509 = X509.load_cert_der_string(der)
serial = x509.get_serial_number()
startdate = x509.get_not_after().get_datetime().replace(tzinfo=None)
enddate = x509.get_not_before().get_datetime().replace(tzinfo=None)
now = datetime.utcnow().replace(tzinfo=None)
if startdate >= now >= enddate:
# TODO : add capath + crl validation
# os.execute(openssl verify -CApath CAPATH -crl_check cert.pem)
certificates[serial] = x509
if certificates:
return certificates[max(certificates)]
return None

128aa0a6 Thomas NOEL
#
# IRIS/B2 mail construction
#

79198d1a Thomas NOEL
def mime(message):
128aa0a6 Thomas NOEL
# compress
3f445d67 Thomas NOEL
if MODE_COMPRESS:
eef0f61d Thomas NOEL
sio = StringIO.StringIO()
gzf = gzip.GzipFile(fileobj=sio, mode='wb', filename='B2.gz')
gzf.write(message)
gzf.close()
zmessage = sio.getvalue()
sio.close()
3f445d67 Thomas NOEL
else:
zmessage = message
79198d1a Thomas NOEL
if MODE_TEST:
content_description = 'IRISTEST/B2'
c9387374 Thomas NOEL
else:
79198d1a Thomas NOEL
content_description = 'IRIS/B2'
if MODE_COMPRESS:
content_description += '/Z'
return "Content-Type: application/EDI-consent\n" + \
"Content-Transfer-Encoding: base64\n" + \
"Content-Description: " + content_description + "\n" + \
eef0f61d Thomas NOEL
"\n" + base64.encodestring(zmessage)
79198d1a Thomas NOEL
def smime(message, x509):
"""
output s/mime message (headers+payload), encrypted with x509 certificate
"""
# encrypt
if RANDFILE:
Rand.load_file(RANDFILE, -1)
s = SMIME.SMIME()
sk = X509.X509_Stack()
sk.push(x509)
s.set_x509_stack(sk)
s.set_cipher(SMIME.Cipher('des_ede3_cbc'))
bio = BIO.MemoryBuffer(message)
pkcs7 = s.encrypt(bio)
if RANDFILE:
Rand.save_file(RANDFILE)
c236d932 Thomas NOEL
out = BIO.MemoryBuffer()
s.write(out, pkcs7)
return out.read()
ea764209 Thomas NOEL
48f503d4 Thomas NOEL
def build_mail(large_regime, dest_organism, b2_filename):
128aa0a6 Thomas NOEL
"""
a9f9972f Thomas NOEL
build a mail to healt center, with b2-compressed+encrypted information
128aa0a6 Thomas NOEL
"""
48f503d4 Thomas NOEL
b2_fd = open(b2_filename, 'r')
b2 = b2_fd.read()
b2_fd.close()

79198d1a Thomas NOEL
stamp = datetime.utcnow().strftime('%Y%m%d%H%M%S')
128aa0a6 Thomas NOEL
# count invoice in the b2 file = lines start with "5"
48f503d4 Thomas NOEL
nb_invoices = [b2[x*128] for x in range(len(b2)/128)].count('5')

79198d1a Thomas NOEL
subject = 'IR%s/%s/%s/%0.5d' % (VVVVVV, EXERCICE, stamp, nb_invoices)
delimiter = '_ENTROUVERT-CALEBASSE-B2-%s' % stamp
128aa0a6 Thomas NOEL
mail = {
'From': SENDER,
'To': large_regime + dest_organism + '@' + dest_organism + '.' + large_regime + '.rss.fr',
79198d1a Thomas NOEL
'Message-ID': '<%s@%s>' % (stamp, MESSAGE_ID_RIGHT),
128aa0a6 Thomas NOEL
'Subject': subject,
2c226400 Thomas NOEL
'Mime-Version': '1.0',
'Content-Type': 'multipart/mixed; boundary="%s"' % delimiter,
128aa0a6 Thomas NOEL
}
eef0f61d Thomas NOEL
mime_part = mime(b2)
if MODE_ENCRYPT:
mime_part = smime(mime(b2), get_certificate(large_regime, dest_organism))
128aa0a6 Thomas NOEL
48f503d4 Thomas NOEL
filename = b2_filename + '-mail'
128aa0a6 Thomas NOEL
fd = open(filename, 'w')
for k,v in mail.items():
fd.write('%s: %s\n' % (k,v))
2c226400 Thomas NOEL
fd.write('\n')
fd.write('--%s\n' % delimiter)
eef0f61d Thomas NOEL
fd.write(mime_part)
fd.write('--%s--\n' % delimiter)
128aa0a6 Thomas NOEL
fd.close()
59e8fca9 Thomas NOEL
return filename
128aa0a6 Thomas NOEL
#
# CApath construction
#

ea764209 Thomas NOEL
def der2pem(der, type_='CERTIFICATE'):
return "-----BEGIN %s-----\n%s\n-----END %s-----\n" % \
(type_, base64.encodestring(der).rstrip(), type_)

3f445d67 Thomas NOEL
def build_capath(path=CAPATH):
ea764209 Thomas NOEL
"""
128aa0a6 Thomas NOEL
get all pkiCA from the gip-cps.fr ldap, store them in path
note: the gip-cps.fr ldap is limited to 10 objects in a response... by chance, there is less than 10 pkiCA ;)
ea764209 Thomas NOEL
"""
l = ldap.initialize(LDAP_HOST)
results = l.search_s(LDAP_BASEDN,ldap.SCOPE_SUBTREE,'(objectclass=pkiCA)')
for ca in results:
dn = ca[0]
for attr in LDAP_CA_ATTRS:
if LDAP_CA_ATTRS[attr][0] in ca[1]:
3f445d67 Thomas NOEL
n = 0
ea764209 Thomas NOEL
for der in ca[1][LDAP_CA_ATTRS[attr][0]]:
3f445d67 Thomas NOEL
filename = os.path.join(path, '%s.%d.%s.pem' % (dn, n, attr))
ea764209 Thomas NOEL
print "create ", filename
fd = open(filename, 'w')
fd.write(der2pem(der, LDAP_CA_ATTRS[attr][1]))
fd.close()
3f445d67 Thomas NOEL
n += 1
48f503d4 Thomas NOEL
os.chdir(path)
os.system('c_rehash .')

ea764209 Thomas NOEL
b7f22827 Thomas NOEL
#
#
#

48f503d4 Thomas NOEL
if __name__ == '__main__' and (len(sys.argv) > 1):
c9387374 Thomas NOEL
action = sys.argv[1]
if action == 'build_capath':
build_capath()
if action == 'x509':
x509 = get_certificate(sys.argv[2], sys.argv[3])
print x509.as_text()
print x509.as_pem()
48f503d4 Thomas NOEL
if MODE_TEST and (action == 'test'):
print build_mail('01', '996', 'test.b2')