Project

General

Profile

Download (6.27 KB) Statistics
| Branch: | Tag: | Revision:
#!/usr/bin/env python

import os
import sys
import re
import base64
from datetime import datetime
import zlib

import ldap
from M2Crypto import X509, SSL, Rand, SMIME, BIO


MODE_TEST = True
MODE_COMPRESS = True
MODE_ENCRYPT = True

LDAP_HOST = 'ldap://annuaire.gip-cps.fr'

if MODE_TEST:
LDAP_BASEDN = 'o=gip-cps-test,c=fr'
CAPATH = '/var/lib/calebasse/test-gip-cps.capath/'
else:
# production
LDAP_BASEDN = 'o=gip-cps,c=fr'
CAPATH = '/var/lib/calebasse/gip-cps.capath/'

LDAP_BASEDN_RSS = 'ou=339172288100045,l=Sarthe (72),' + LDAP_BASEDN
LDAP_X509_ATTR = 'userCertificate;binary'
LDAP_CA_ATTRS = {
'cert': ('cACertificate;binary', 'CERTIFICATE'),
'crl': ('certificateRevocationList;binary', 'X509 CRL'),
'delta-crl': ('deltaRevocationList;binary', 'X509 CRL'),
}

RANDFILE = '/var/tmp/randpool.dat'

if MODE_TEST:
MAILPATH = '/var/lib/calebasse/test-mail.out/'
MESSAGE_ID_RIGHT = 'teletransmission-test.aps42.org'
else:
# production
MAILPATH = '/var/lib/calebasse/mail.out/'
MESSAGE_ID_RIGHT = 'teletransmission.aps42.org'
SENDER = 'teletransmission@aps42.org'
VVVVVV = '100500' # ETS-DT-001-TransportsFlux_SpecsTechCommune_v1.1.pdf
NUMERO_EMETTEUR = '00000420788606'
EXERCICE = NUMERO_EMETTEUR

#
# get a certificate from gip-cps LDAP
#

def get_certificate(large_regime, dest_organism):
"""
return a M2Crypto.X509 object, containing the certificate of the health center

example :
x509 = get_certificate('01', '422') # CPAM Saint Etienne
print x509.as_text()
print x509.as_pem()
"""
l = ldap.initialize(LDAP_HOST)
cn = large_regime + dest_organism + '@' + dest_organism + '.' + large_regime + '.rss.fr'
results = l.search_s(LDAP_BASEDN_RSS, ldap.SCOPE_SUBTREE, '(cn=' + cn + ')')
if len(results) > 1:
raise LookupError("non unique result for cn=%s" % cn)
if len(results) < 1:
raise LookupError("no result for cn=%s" % cn)
dn = results[0][0]
attrs = results[0][1]
if LDAP_X509_ATTR not in attrs:
raise LookupError("no certificate in dn:%s" % dn)
certificates = {}
for der in results[0][1][LDAP_X509_ATTR]:
x509 = X509.load_cert_der_string(der)
serial = x509.get_serial_number()
startdate = x509.get_not_after().get_datetime().replace(tzinfo=None)
enddate = x509.get_not_before().get_datetime().replace(tzinfo=None)
now = datetime.utcnow().replace(tzinfo=None)
if startdate >= now >= enddate:
# TODO : add capath + crl validation
# os.execute(openssl verify -CApath CAPATH -crl_check cert.pem)
certificates[serial] = x509
if certificates:
return certificates[max(certificates)]
return None

#
# IRIS/B2 mail construction
#

def smime_payload(message, x509):
"""
output s/mime message (headers+payload), compressed & encrypted with x509 certificate
"""
# compress
if MODE_COMPRESS:
zmessage = zlib.compress(message)
else:
zmessage = message
if MODE_ENCRYPT:
# encrypt
if RANDFILE:
Rand.load_file(RANDFILE, -1)
s = SMIME.SMIME()
sk = X509.X509_Stack()
sk.push(x509)
s.set_x509_stack(sk)
s.set_cipher(SMIME.Cipher('des_ede3_cbc'))
bio = BIO.MemoryBuffer(zmessage)
pkcs7 = s.encrypt(bio)
out = BIO.MemoryBuffer()
s.write(out, pkcs7)
if RANDFILE:
Rand.save_file(RANDFILE)
return out.read()
else:
# FIXME
raise NotImplementedError('MODE_ENCRYPT=False is not implemented')

def build_mail(large_regime, dest_organism, b2):
"""
build a mail to healt center, with b2-compressed+encrypted information
"""
now = datetime.utcnow().strftime('%Y%m%d%H%M%S')
n = 0
while n < 99999:
n += 1
message_id = '%s%0.5d' % (now, n)
filename = os.path.join(MAILPATH, message_id)
if not os.path.exists(filename):
break
else:
raise Exception('too much mails with prefix %s in %s' % (now, MAILPATH))
# count invoice in the b2 file = lines start with "5"
nb_invoices = len(re.findall('^5', b2, re.MULTILINE))
subject = 'IR%s/%s/%s/%0.5d' % (VVVVVV, EXERCICE, message_id, nb_invoices)
mail = {
'From': SENDER,
'To': large_regime + dest_organism + '@' + dest_organism + '.' + large_regime + '.rss.fr',
'Message-ID': '<%s@%s>' % (message_id, MESSAGE_ID_RIGHT),
'Subject': subject,
}
if MODE_TEST:
mail['Content-Description'] = 'IRISTEST/B2'
else:
mail['Content-Description'] = 'IRIS/B2'
if MODE_COMPRESS:
mail['Content-Description'] += '/Z'
x509 = get_certificate(large_regime, dest_organism)
smime = smime_payload(b2, x509)

fd = open(filename, 'w')
for k,v in mail.items():
fd.write('%s: %s\n' % (k,v))
fd.write(smime)
fd.close()
return filename

#
# CApath construction
#

def der2pem(der, type_='CERTIFICATE'):
return "-----BEGIN %s-----\n%s\n-----END %s-----\n" % \
(type_, base64.encodestring(der).rstrip(), type_)

def build_capath(path=CAPATH):
"""
get all pkiCA from the gip-cps.fr ldap, store them in path
note: the gip-cps.fr ldap is limited to 10 objects in a response... by chance, there is less than 10 pkiCA ;)
"""
l = ldap.initialize(LDAP_HOST)
results = l.search_s(LDAP_BASEDN,ldap.SCOPE_SUBTREE,'(objectclass=pkiCA)')
for ca in results:
dn = ca[0]
for attr in LDAP_CA_ATTRS:
if LDAP_CA_ATTRS[attr][0] in ca[1]:
n = 0
for der in ca[1][LDAP_CA_ATTRS[attr][0]]:
filename = os.path.join(path, '%s.%d.%s.pem' % (dn, n, attr))
print "create ", filename
fd = open(filename, 'w')
fd.write(der2pem(der, LDAP_CA_ATTRS[attr][1]))
fd.close()
n += 1

#
#
#

if __name__ == '__main__':
action = sys.argv[1]
if action == 'build_capath':
build_capath()
if action == 'x509':
x509 = get_certificate(sys.argv[2], sys.argv[3])
print x509.as_text()
print x509.as_pem()
# stupid tests...
# print build_mail('01','996','000....b2-power')
# print build_mail('01','997','000....b2-power')

(9-9/11)