1
|
# -*- coding: utf-8 -*-
|
2
|
|
3
|
import os
|
4
|
import sys
|
5
|
import re
|
6
|
import base64
|
7
|
from datetime import datetime
|
8
|
import gzip
|
9
|
import StringIO
|
10
|
|
11
|
import ldap
|
12
|
from M2Crypto import X509, SSL, Rand, SMIME, BIO
|
13
|
|
14
|
MODE_TEST = False
|
15
|
MODE_COMPRESS = True
|
16
|
MODE_ENCRYPT = True
|
17
|
|
18
|
LDAP_HOST = 'ldap://annuaire.sesam-vitale.fr'
|
19
|
|
20
|
LDAP_BASEDN_O = 'o=sesam-vitale,c=fr'
|
21
|
LDAP_BASEDN = 'ou=AC-FACTURATION,ou=AC-SESAM-VITALE-2034,' + LDAP_BASEDN_O
|
22
|
CAPATH = '/var/lib/calebasse/sesam-vitale.capath/'
|
23
|
|
24
|
LDAP_X509_ATTR = 'userCertificate;binary'
|
25
|
LDAP_CA_ATTRS = {
|
26
|
'cert': ('cACertificate;binary', 'CERTIFICATE'),
|
27
|
'crl': ('certificateRevocationList;binary', 'X509 CRL'),
|
28
|
'delta-crl': ('deltaRevocationList;binary', 'X509 CRL'),
|
29
|
}
|
30
|
|
31
|
RANDFILE = '/var/tmp/randpool.dat'
|
32
|
|
33
|
MAILPATH = '/var/lib/calebasse/mail.out/'
|
34
|
MESSAGE_ID_RIGHT = 'teletransmission.aps42.org'
|
35
|
|
36
|
if MODE_TEST:
|
37
|
LDAP_BASEDN = 'ou=AC-FACTURATION-TEST,ou=AC-SESAM-VITALE-TEST-2034,' + LDAP_BASEDN_O
|
38
|
CAPATH = '/var/lib/calebasse/sesam-vitale-test.capath/'
|
39
|
MAILPATH = '/var/lib/calebasse/test-mail.out/'
|
40
|
MESSAGE_ID_RIGHT = 'teletransmission-test.aps42.org'
|
41
|
|
42
|
SENDER = 'teletransmission@aps42.org'
|
43
|
VVVVVV = '100500' # ETS-DT-001-TransportsFlux_SpecsTechCommune_v1.1.pdf
|
44
|
NUMERO_EMETTEUR = '00000420788606'
|
45
|
EXERCICE = NUMERO_EMETTEUR
|
46
|
|
47
|
#
|
48
|
# get a certificate from LDAP
|
49
|
#
|
50
|
|
51
|
def get_certificate(large_regime, dest_organism):
|
52
|
"""
|
53
|
return a M2Crypto.X509 object, containing the certificate of the health center
|
54
|
|
55
|
example :
|
56
|
x509 = get_certificate('01', '422') # CPAM Saint Etienne
|
57
|
print x509.as_text()
|
58
|
print x509.as_pem()
|
59
|
"""
|
60
|
l = ldap.initialize(LDAP_HOST)
|
61
|
cn = large_regime + dest_organism + '@' + dest_organism + '.' + large_regime + '.rss.fr'
|
62
|
results = l.search_s(LDAP_BASEDN, ldap.SCOPE_SUBTREE, '(cn=' + cn + ')')
|
63
|
if len(results) > 1:
|
64
|
raise LookupError("non unique result for cn=%s" % cn)
|
65
|
if len(results) < 1:
|
66
|
raise LookupError("no result for cn=%s" % cn)
|
67
|
dn = results[0][0]
|
68
|
attrs = results[0][1]
|
69
|
if LDAP_X509_ATTR not in attrs:
|
70
|
raise LookupError("no certificate in dn:%s" % dn)
|
71
|
certificates = {}
|
72
|
for der in results[0][1][LDAP_X509_ATTR]:
|
73
|
x509 = X509.load_cert_der_string(der)
|
74
|
serial = x509.get_serial_number()
|
75
|
startdate = x509.get_not_after().get_datetime().replace(tzinfo=None)
|
76
|
enddate = x509.get_not_before().get_datetime().replace(tzinfo=None)
|
77
|
now = datetime.utcnow().replace(tzinfo=None)
|
78
|
if startdate >= now >= enddate:
|
79
|
# TODO : add capath + crl validation
|
80
|
# os.execute(openssl verify -CApath CAPATH -crl_check cert.pem)
|
81
|
certificates[serial] = x509
|
82
|
if certificates:
|
83
|
return certificates[max(certificates)]
|
84
|
return None
|
85
|
|
86
|
#
|
87
|
# IRIS/B2 mail construction
|
88
|
#
|
89
|
|
90
|
def mime(message):
|
91
|
# compress
|
92
|
if MODE_COMPRESS:
|
93
|
sio = StringIO.StringIO()
|
94
|
gzf = gzip.GzipFile(fileobj=sio, mode='wb', filename='B2.gz')
|
95
|
gzf.write(message)
|
96
|
gzf.close()
|
97
|
zmessage = sio.getvalue()
|
98
|
sio.close()
|
99
|
else:
|
100
|
zmessage = message
|
101
|
if MODE_TEST:
|
102
|
content_description = 'IRISTEST/B2'
|
103
|
else:
|
104
|
content_description = 'IRIS/B2'
|
105
|
if MODE_COMPRESS:
|
106
|
content_description += '/Z'
|
107
|
return "Content-Type: application/EDI-consent\n" + \
|
108
|
"Content-Transfer-Encoding: base64\n" + \
|
109
|
"Content-Description: " + content_description + "\n" + \
|
110
|
"\n" + base64.encodestring(zmessage)
|
111
|
|
112
|
def smime(message, x509):
|
113
|
"""
|
114
|
output s/mime message (headers+payload), encrypted with x509 certificate
|
115
|
"""
|
116
|
# encrypt
|
117
|
if RANDFILE:
|
118
|
Rand.load_file(RANDFILE, -1)
|
119
|
s = SMIME.SMIME()
|
120
|
sk = X509.X509_Stack()
|
121
|
sk.push(x509)
|
122
|
s.set_x509_stack(sk)
|
123
|
s.set_cipher(SMIME.Cipher('des_ede3_cbc'))
|
124
|
bio = BIO.MemoryBuffer(message)
|
125
|
pkcs7 = s.encrypt(bio)
|
126
|
if RANDFILE:
|
127
|
Rand.save_file(RANDFILE)
|
128
|
out = BIO.MemoryBuffer()
|
129
|
s.write(out, pkcs7)
|
130
|
return out.read()
|
131
|
|
132
|
def build_mail(large_regime, dest_organism, b2_filename):
|
133
|
"""
|
134
|
build a mail to healt center, with b2-compressed+encrypted information
|
135
|
"""
|
136
|
b2_fd = open(b2_filename, 'r')
|
137
|
b2 = b2_fd.read()
|
138
|
b2_fd.close()
|
139
|
|
140
|
utcnow = datetime.utcnow()
|
141
|
stamp = utcnow.strftime('%Y%m%d%H%M%S') + utcnow.strftime('%f')[:5]
|
142
|
# count invoice in the b2 file = lines start with "5"
|
143
|
nb_invoices = [b2[x*128] for x in range(len(b2)/128)].count('5')
|
144
|
|
145
|
subject = 'IR%s/%s/%s/%0.5d' % (VVVVVV, EXERCICE, stamp, nb_invoices)
|
146
|
delimiter = '_ENTROUVERT-CALEBASSE-B2-%s' % stamp
|
147
|
mail = {
|
148
|
'From': SENDER,
|
149
|
'To': large_regime + dest_organism + '@' + dest_organism + '.' + large_regime + '.rss.fr',
|
150
|
'Message-ID': '<%s@%s>' % (stamp, MESSAGE_ID_RIGHT),
|
151
|
'Subject': subject,
|
152
|
'Mime-Version': '1.0',
|
153
|
'Content-Type': 'multipart/mixed; boundary="%s"' % delimiter,
|
154
|
}
|
155
|
mime_part = mime(b2)
|
156
|
if MODE_ENCRYPT:
|
157
|
mime_part = smime(mime(b2), get_certificate(large_regime, dest_organism))
|
158
|
|
159
|
fd = StringIO.StringIO()
|
160
|
for k,v in mail.items():
|
161
|
fd.write('%s: %s\n' % (k,v))
|
162
|
fd.write('\n')
|
163
|
fd.write('--%s\n' % delimiter)
|
164
|
fd.write(mime_part)
|
165
|
fd.write('--%s--\n' % delimiter)
|
166
|
ret = fd.getvalue()
|
167
|
fd.close()
|
168
|
return ret
|
169
|
|
170
|
#
|
171
|
# CApath construction
|
172
|
#
|
173
|
|
174
|
def der2pem(der, type_='CERTIFICATE'):
|
175
|
return "-----BEGIN %s-----\n%s\n-----END %s-----\n" % \
|
176
|
(type_, base64.encodestring(der).rstrip(), type_)
|
177
|
|
178
|
def build_capath(path=CAPATH):
|
179
|
"""
|
180
|
get all pkiCA from the ldap, store them in path
|
181
|
note: the sesam-vitale ldap is limited to 10 objects in a response... by chance, there is less than 10 pkiCA ;)
|
182
|
"""
|
183
|
l = ldap.initialize(LDAP_HOST)
|
184
|
results = l.search_s(LDAP_BASEDN, ldap.SCOPE_SUBTREE, '(objectclass=pkiCA)')
|
185
|
for ca in results:
|
186
|
dn = ca[0]
|
187
|
for attr in LDAP_CA_ATTRS:
|
188
|
if LDAP_CA_ATTRS[attr][0] in ca[1]:
|
189
|
n = 0
|
190
|
for der in ca[1][LDAP_CA_ATTRS[attr][0]]:
|
191
|
filename = os.path.join(path, '%s.%d.%s.pem' % (dn, n, attr))
|
192
|
print "create ", filename
|
193
|
fd = open(filename, 'w')
|
194
|
fd.write(der2pem(der, LDAP_CA_ATTRS[attr][1]))
|
195
|
fd.close()
|
196
|
n += 1
|
197
|
os.chdir(path)
|
198
|
os.system('c_rehash .')
|
199
|
|
200
|
|
201
|
#
|
202
|
#
|
203
|
#
|
204
|
|
205
|
|
206
|
if __name__ == '__main__' and (len(sys.argv) > 1):
|
207
|
action = sys.argv[1]
|
208
|
if action == 'build_capath':
|
209
|
build_capath()
|
210
|
if action == 'x509':
|
211
|
x509 = get_certificate(sys.argv[2], sys.argv[3])
|
212
|
print x509.as_text()
|
213
|
print x509.as_pem()
|
214
|
if MODE_TEST and (action == 'test'):
|
215
|
print build_mail('01', '996', 'test.b2')
|
216
|
|