Project

General

Profile

Download (6.76 KB) Statistics
| Branch: | Tag: | Revision:
# -*- coding: utf-8 -*-

import os
import sys
import re
import base64
from datetime import datetime
import zlib

import ldap
from M2Crypto import X509, SSL, Rand, SMIME, BIO

MODE_TEST = False
MODE_COMPRESS = True

LDAP_HOST = 'ldap://annuaire.gip-cps.fr'

if MODE_TEST:
LDAP_BASEDN = 'o=gip-cps-test,c=fr'
CAPATH = '/var/lib/calebasse/test-gip-cps.capath/'
else:
# production
LDAP_BASEDN = 'o=gip-cps,c=fr'
CAPATH = '/var/lib/calebasse/gip-cps.capath/'

LDAP_BASEDN_RSS = 'ou=339172288100045,l=Sarthe (72),' + LDAP_BASEDN
LDAP_X509_ATTR = 'userCertificate;binary'
LDAP_CA_ATTRS = {
'cert': ('cACertificate;binary', 'CERTIFICATE'),
'crl': ('certificateRevocationList;binary', 'X509 CRL'),
'delta-crl': ('deltaRevocationList;binary', 'X509 CRL'),
}

RANDFILE = '/var/tmp/randpool.dat'

if MODE_TEST:
MAILPATH = '/var/lib/calebasse/test-mail.out/'
MESSAGE_ID_RIGHT = 'teletransmission-test.aps42.org'
else:
# production
MAILPATH = '/var/lib/calebasse/mail.out/'
MESSAGE_ID_RIGHT = 'teletransmission.aps42.org'
SENDER = 'teletransmission@aps42.org'
VVVVVV = '100500' # ETS-DT-001-TransportsFlux_SpecsTechCommune_v1.1.pdf
NUMERO_EMETTEUR = '00000420788606'
EXERCICE = NUMERO_EMETTEUR

#
# get a certificate from gip-cps LDAP
#

def get_certificate(large_regime, dest_organism):
"""
return a M2Crypto.X509 object, containing the certificate of the health center

example :
x509 = get_certificate('01', '422') # CPAM Saint Etienne
print x509.as_text()
print x509.as_pem()
"""
l = ldap.initialize(LDAP_HOST)
cn = large_regime + dest_organism + '@' + dest_organism + '.' + large_regime + '.rss.fr'
results = l.search_s(LDAP_BASEDN_RSS, ldap.SCOPE_SUBTREE, '(cn=' + cn + ')')
if len(results) > 1:
raise LookupError("non unique result for cn=%s" % cn)
if len(results) < 1:
raise LookupError("no result for cn=%s" % cn)
dn = results[0][0]
attrs = results[0][1]
if LDAP_X509_ATTR not in attrs:
raise LookupError("no certificate in dn:%s" % dn)
certificates = {}
for der in results[0][1][LDAP_X509_ATTR]:
x509 = X509.load_cert_der_string(der)
serial = x509.get_serial_number()
startdate = x509.get_not_after().get_datetime().replace(tzinfo=None)
enddate = x509.get_not_before().get_datetime().replace(tzinfo=None)
now = datetime.utcnow().replace(tzinfo=None)
if startdate >= now >= enddate:
# TODO : add capath + crl validation
# os.execute(openssl verify -CApath CAPATH -crl_check cert.pem)
certificates[serial] = x509
if certificates:
return certificates[max(certificates)]
return None

#
# IRIS/B2 mail construction
#

def mime(message):
# compress
if MODE_COMPRESS:
zmessage = zlib.compress(message)
else:
zmessage = message
if MODE_TEST:
content_description = 'IRISTEST/B2'
else:
content_description = 'IRIS/B2'
if MODE_COMPRESS:
content_description += '/Z'
return "Content-Type: application/EDI-consent\n" + \
"Content-Transfer-Encoding: base64\n" + \
"Content-Description: " + content_description + "\n" + \
"\n" + base64.encodestring(zmessage).rstrip()

def smime(message, x509):
"""
output s/mime message (headers+payload), encrypted with x509 certificate
"""
# encrypt
if RANDFILE:
Rand.load_file(RANDFILE, -1)
s = SMIME.SMIME()
sk = X509.X509_Stack()
sk.push(x509)
s.set_x509_stack(sk)
s.set_cipher(SMIME.Cipher('des_ede3_cbc'))
bio = BIO.MemoryBuffer(message)
pkcs7 = s.encrypt(bio)
if RANDFILE:
Rand.save_file(RANDFILE)

# hack: get only the encrypted part of the m2crypto output
out_bio = BIO.MemoryBuffer()
pkcs7.write(out_bio)
out = out_bio.read()
out = out[len('-----BEGIN PKCS7-----')+1:-(len('-----END PKCS7-----')+1)]
return 'Content-Type=application/pkcs7-mime; smime-type=enveloped-data; name="smime.p7m"\n' + \
'Content-Transfer-Encoding: base64\n' + \
'Content-Disposition: attachment; filename="smime.p7m"\n\n' + out

def build_mail(large_regime, dest_organism, b2_filename):
"""
build a mail to healt center, with b2-compressed+encrypted information
"""
b2_fd = open(b2_filename, 'r')
b2 = b2_fd.read()
b2_fd.close()

stamp = datetime.utcnow().strftime('%Y%m%d%H%M%S')
# count invoice in the b2 file = lines start with "5"
nb_invoices = [b2[x*128] for x in range(len(b2)/128)].count('5')

subject = 'IR%s/%s/%s/%0.5d' % (VVVVVV, EXERCICE, stamp, nb_invoices)
delimiter = '_ENTROUVERT-CALEBASSE-B2-%s' % stamp
mail = {
'From': SENDER,
'To': large_regime + dest_organism + '@' + dest_organism + '.' + large_regime + '.rss.fr',
'Message-ID': '<%s@%s>' % (stamp, MESSAGE_ID_RIGHT),
'Subject': subject,
'Mime-Version': '1.0',
'Content-Type': 'multipart/mixed; boundary="%s"' % delimiter,
}
smime_part = smime(mime(b2), get_certificate(large_regime, dest_organism))

filename = b2_filename + '-mail'
fd = open(filename, 'w')
for k,v in mail.items():
fd.write('%s: %s\n' % (k,v))
fd.write('\n')
fd.write('--%s\n' % delimiter)
fd.write(smime_part)
fd.write('--%s\n' % delimiter)
fd.close()
return filename

#
# CApath construction
#

def der2pem(der, type_='CERTIFICATE'):
return "-----BEGIN %s-----\n%s\n-----END %s-----\n" % \
(type_, base64.encodestring(der).rstrip(), type_)

def build_capath(path=CAPATH):
"""
get all pkiCA from the gip-cps.fr ldap, store them in path
note: the gip-cps.fr ldap is limited to 10 objects in a response... by chance, there is less than 10 pkiCA ;)
"""
l = ldap.initialize(LDAP_HOST)
results = l.search_s(LDAP_BASEDN,ldap.SCOPE_SUBTREE,'(objectclass=pkiCA)')
for ca in results:
dn = ca[0]
for attr in LDAP_CA_ATTRS:
if LDAP_CA_ATTRS[attr][0] in ca[1]:
n = 0
for der in ca[1][LDAP_CA_ATTRS[attr][0]]:
filename = os.path.join(path, '%s.%d.%s.pem' % (dn, n, attr))
print "create ", filename
fd = open(filename, 'w')
fd.write(der2pem(der, LDAP_CA_ATTRS[attr][1]))
fd.close()
n += 1
os.chdir(path)
os.system('c_rehash .')


#
#
#


if __name__ == '__main__' and (len(sys.argv) > 1):
action = sys.argv[1]
if action == 'build_capath':
build_capath()
if action == 'x509':
x509 = get_certificate(sys.argv[2], sys.argv[3])
print x509.as_text()
print x509.as_pem()
if MODE_TEST and (action == 'test'):
print build_mail('01', '996', 'test.b2')

(12-12/14)