Project

General

Profile

Download (6.64 KB) Statistics
| Branch: | Tag: | Revision:

calebasse / calebasse / facturation / transmission_utils.py @ 99a62b84

1
# -*- coding: utf-8 -*-
2

    
3
import os
4
import sys
5
import re
6
import base64
7
from datetime import datetime
8
import gzip
9
import StringIO
10

    
11
import ldap
12
from M2Crypto import X509, SSL, Rand, SMIME, BIO
13

    
14
MODE_TEST = False
15
MODE_COMPRESS = True
16
MODE_ENCRYPT = True
17

    
18
LDAP_HOST = 'ldap://annuaire.sesam-vitale.fr'
19

    
20
LDAP_BASEDN_O = 'o=sesam-vitale,c=fr'
21
LDAP_BASEDN = 'ou=AC-FACTURATION,ou=AC-SESAM-VITALE-2034,' + LDAP_BASEDN_O
22
CAPATH = '/var/lib/calebasse/sesam-vitale.capath/'
23

    
24
LDAP_X509_ATTR = 'userCertificate;binary'
25
LDAP_CA_ATTRS = {
26
        'cert': ('cACertificate;binary', 'CERTIFICATE'),
27
        'crl': ('certificateRevocationList;binary', 'X509 CRL'),
28
        'delta-crl': ('deltaRevocationList;binary', 'X509 CRL'),
29
    }
30

    
31
RANDFILE = '/var/tmp/randpool.dat'
32

    
33
MAILPATH = '/var/lib/calebasse/mail.out/'
34
MESSAGE_ID_RIGHT = 'teletransmission.aps42.org'
35

    
36
if MODE_TEST:
37
    LDAP_BASEDN = 'ou=AC-FACTURATION-TEST,ou=AC-SESAM-VITALE-TEST-2034,' + LDAP_BASEDN_O
38
    CAPATH = '/var/lib/calebasse/sesam-vitale-test.capath/'
39
    MAILPATH = '/var/lib/calebasse/test-mail.out/'
40
    MESSAGE_ID_RIGHT = 'teletransmission-test.aps42.org'
41

    
42
SENDER = 'teletransmission@aps42.org'
43
VVVVVV = '100500'  # ETS-DT-001-TransportsFlux_SpecsTechCommune_v1.1.pdf
44
NUMERO_EMETTEUR = '00000420788606'
45
EXERCICE = NUMERO_EMETTEUR
46

    
47
#
48
# get a certificate from LDAP
49
#
50

    
51
def get_certificate(large_regime, dest_organism):
52
    """
53
    return a M2Crypto.X509 object, containing the certificate of the health center
54

    
55
    example :
56
      x509 = get_certificate('01', '422') # CPAM Saint Etienne
57
      print x509.as_text()
58
      print x509.as_pem()
59
    """
60
    l = ldap.initialize(LDAP_HOST)
61
    cn = large_regime + dest_organism + '@' + dest_organism + '.' + large_regime + '.rss.fr'
62
    results = l.search_s(LDAP_BASEDN, ldap.SCOPE_SUBTREE, '(cn=' + cn + ')')
63
    if len(results) > 1:
64
        raise LookupError("non unique result for cn=%s" % cn)
65
    if len(results) < 1:
66
        raise LookupError("no result for cn=%s" % cn)
67
    dn = results[0][0]
68
    attrs = results[0][1]
69
    if LDAP_X509_ATTR not in attrs:
70
        raise LookupError("no certificate in dn:%s" % dn)
71
    certificates = {}
72
    for der in results[0][1][LDAP_X509_ATTR]:
73
        x509 = X509.load_cert_der_string(der)
74
        serial = x509.get_serial_number()
75
        startdate =  x509.get_not_after().get_datetime().replace(tzinfo=None)
76
        enddate = x509.get_not_before().get_datetime().replace(tzinfo=None)
77
        now = datetime.utcnow().replace(tzinfo=None)
78
        if startdate >= now >= enddate:
79
            # TODO : add capath + crl validation
80
            # os.execute(openssl verify -CApath CAPATH -crl_check cert.pem)
81
            certificates[serial] = x509
82
    if certificates:
83
        return certificates[max(certificates)]
84
    return None
85

    
86
#
87
# IRIS/B2 mail construction
88
#
89

    
90
def mime(message):
91
    # compress
92
    if MODE_COMPRESS:
93
        sio = StringIO.StringIO()
94
        gzf = gzip.GzipFile(fileobj=sio, mode='wb', filename='B2.gz')
95
        gzf.write(message)
96
        gzf.close()
97
        zmessage = sio.getvalue()
98
        sio.close()
99
    else:
100
        zmessage = message
101
    if MODE_TEST:
102
        content_description = 'IRISTEST/B2'
103
    else:
104
        content_description = 'IRIS/B2'
105
    if MODE_COMPRESS:
106
        content_description += '/Z'
107
    return "Content-Type: application/EDI-consent\n" + \
108
            "Content-Transfer-Encoding: base64\n" + \
109
            "Content-Description: " + content_description + "\n" + \
110
            "\n" + base64.encodestring(zmessage)
111

    
112
def smime(message, x509):
113
    """
114
    output s/mime message (headers+payload), encrypted with x509 certificate
115
    """
116
    # encrypt
117
    if RANDFILE:
118
        Rand.load_file(RANDFILE, -1)
119
    s = SMIME.SMIME()
120
    sk = X509.X509_Stack()
121
    sk.push(x509)
122
    s.set_x509_stack(sk)
123
    s.set_cipher(SMIME.Cipher('des_ede3_cbc'))
124
    bio = BIO.MemoryBuffer(message)
125
    pkcs7 = s.encrypt(bio)
126
    if RANDFILE:
127
        Rand.save_file(RANDFILE)
128
    out = BIO.MemoryBuffer()
129
    s.write(out, pkcs7)
130
    return out.read()
131

    
132
def build_mail(large_regime, dest_organism, b2_filename):
133
    """
134
    build a mail to healt center, with b2-compressed+encrypted information
135
    """
136
    b2_fd = open(b2_filename, 'r')
137
    b2 = b2_fd.read()
138
    b2_fd.close()
139

    
140
    utcnow = datetime.utcnow()
141
    stamp = utcnow.strftime('%Y%m%d%H%M%S') + utcnow.strftime('%f')[:5]
142
    # count invoice in the b2 file = lines start with "5"
143
    nb_invoices = [b2[x*128] for x in range(len(b2)/128)].count('5')
144

    
145
    subject = 'IR%s/%s/%s/%0.5d' % (VVVVVV, EXERCICE, stamp, nb_invoices)
146
    delimiter = '_ENTROUVERT-CALEBASSE-B2-%s' % stamp
147
    mail = {
148
            'From': SENDER,
149
            'To': large_regime + dest_organism + '@' + dest_organism + '.' + large_regime + '.rss.fr',
150
            'Message-ID': '<%s@%s>' % (stamp, MESSAGE_ID_RIGHT),
151
            'Subject': subject,
152
            'Mime-Version': '1.0',
153
            'Content-Type': 'multipart/mixed; boundary="%s"' % delimiter,
154
            }
155
    mime_part = mime(b2)
156
    if MODE_ENCRYPT:
157
        mime_part = smime(mime(b2), get_certificate(large_regime, dest_organism))
158

    
159
    fd = StringIO.StringIO()
160
    for k,v in mail.items():
161
        fd.write('%s: %s\n' % (k,v))
162
    fd.write('\n')
163
    fd.write('--%s\n' % delimiter)
164
    fd.write(mime_part)
165
    fd.write('--%s--\n' % delimiter)
166
    ret = fd.getvalue()
167
    fd.close()
168
    return ret
169

    
170
#
171
# CApath construction
172
#
173

    
174
def der2pem(der, type_='CERTIFICATE'):
175
    return "-----BEGIN %s-----\n%s\n-----END %s-----\n" % \
176
        (type_, base64.encodestring(der).rstrip(), type_)
177

    
178
def build_capath(path=CAPATH):
179
    """
180
    get all pkiCA from the ldap, store them in path
181
    note: the sesam-vitale ldap is limited to 10 objects in a response...  by chance, there is less than 10 pkiCA ;)
182
    """
183
    l = ldap.initialize(LDAP_HOST)
184
    results = l.search_s(LDAP_BASEDN, ldap.SCOPE_SUBTREE, '(objectclass=pkiCA)')
185
    for ca in results:
186
        dn = ca[0]
187
        for attr in LDAP_CA_ATTRS:
188
            if LDAP_CA_ATTRS[attr][0] in ca[1]:
189
                n = 0
190
                for der in ca[1][LDAP_CA_ATTRS[attr][0]]:
191
                    filename = os.path.join(path, '%s.%d.%s.pem' % (dn, n, attr))
192
                    print "create ", filename
193
                    fd = open(filename, 'w')
194
                    fd.write(der2pem(der, LDAP_CA_ATTRS[attr][1]))
195
                    fd.close()
196
                    n += 1
197
    os.chdir(path)
198
    os.system('c_rehash .')
199

    
200

    
201
#
202
#
203
#
204

    
205

    
206
if __name__ == '__main__' and (len(sys.argv) > 1):
207
    action = sys.argv[1]
208
    if action == 'build_capath':
209
        build_capath()
210
    if action == 'x509':
211
        x509 = get_certificate(sys.argv[2], sys.argv[3])
212
        print x509.as_text()
213
        print x509.as_pem()
214
    if MODE_TEST and (action == 'test'):
215
        print build_mail('01', '996', 'test.b2')
216

    
(14-14/16)