Project

General

Profile

Download (6.65 KB) Statistics
| Branch: | Tag: | Revision:

calebasse / calebasse / facturation / transmission_utils.py @ 3cfe29c8

1
# -*- coding: utf-8 -*-
2

    
3
import os
4
import sys
5
import re
6
import base64
7
from datetime import datetime
8
import gzip
9
import StringIO
10

    
11
import ldap
12
from M2Crypto import X509, SSL, Rand, SMIME, BIO
13

    
14
MODE_TEST = False
15
MODE_COMPRESS = True
16
MODE_ENCRYPT = True
17

    
18
LDAP_HOST = 'ldap://annuaire.gip-cps.fr'
19

    
20
if MODE_TEST:
21
    LDAP_BASEDN = 'o=gip-cps-test,c=fr'
22
    CAPATH = '/var/lib/calebasse/test-gip-cps.capath/'
23
else:
24
    # production
25
    LDAP_BASEDN = 'o=gip-cps,c=fr'
26
    CAPATH = '/var/lib/calebasse/gip-cps.capath/'
27

    
28
LDAP_BASEDN_RSS = 'ou=339172288100045,l=Sarthe (72),' + LDAP_BASEDN
29
LDAP_X509_ATTR = 'userCertificate;binary'
30
LDAP_CA_ATTRS = {
31
        'cert': ('cACertificate;binary', 'CERTIFICATE'),
32
        'crl': ('certificateRevocationList;binary', 'X509 CRL'),
33
        'delta-crl': ('deltaRevocationList;binary', 'X509 CRL'),
34
    }
35

    
36
RANDFILE = '/var/tmp/randpool.dat'
37

    
38
if MODE_TEST:
39
    MAILPATH = '/var/lib/calebasse/test-mail.out/'
40
    MESSAGE_ID_RIGHT = 'teletransmission-test.aps42.org'
41
else:
42
    # production
43
    MAILPATH = '/var/lib/calebasse/mail.out/'
44
    MESSAGE_ID_RIGHT = 'teletransmission.aps42.org'
45
SENDER = 'teletransmission@aps42.org'
46
VVVVVV = '100500'  # ETS-DT-001-TransportsFlux_SpecsTechCommune_v1.1.pdf
47
NUMERO_EMETTEUR = '00000420788606'
48
EXERCICE = NUMERO_EMETTEUR
49

    
50
#
51
# get a certificate from gip-cps LDAP
52
#
53

    
54
def get_certificate(large_regime, dest_organism):
55
    """
56
    return a M2Crypto.X509 object, containing the certificate of the health center
57

    
58
    example :
59
      x509 = get_certificate('01', '422') # CPAM Saint Etienne
60
      print x509.as_text()
61
      print x509.as_pem()
62
    """
63
    l = ldap.initialize(LDAP_HOST)
64
    cn = large_regime + dest_organism + '@' + dest_organism + '.' + large_regime + '.rss.fr'
65
    results = l.search_s(LDAP_BASEDN_RSS, ldap.SCOPE_SUBTREE, '(cn=' + cn + ')')
66
    if len(results) > 1:
67
        raise LookupError("non unique result for cn=%s" % cn)
68
    if len(results) < 1:
69
        raise LookupError("no result for cn=%s" % cn)
70
    dn = results[0][0]
71
    attrs = results[0][1]
72
    if LDAP_X509_ATTR not in attrs:
73
        raise LookupError("no certificate in dn:%s" % dn)
74
    certificates = {}
75
    for der in results[0][1][LDAP_X509_ATTR]:
76
        x509 = X509.load_cert_der_string(der)
77
        serial = x509.get_serial_number()
78
        startdate =  x509.get_not_after().get_datetime().replace(tzinfo=None)
79
        enddate = x509.get_not_before().get_datetime().replace(tzinfo=None)
80
        now = datetime.utcnow().replace(tzinfo=None)
81
        if startdate >= now >= enddate:
82
            # TODO : add capath + crl validation
83
            # os.execute(openssl verify -CApath CAPATH -crl_check cert.pem)
84
            certificates[serial] = x509
85
    if certificates:
86
        return certificates[max(certificates)]
87
    return None
88

    
89
#
90
# IRIS/B2 mail construction
91
#
92

    
93
def mime(message):
94
    # compress
95
    if MODE_COMPRESS:
96
        sio = StringIO.StringIO()
97
        gzf = gzip.GzipFile(fileobj=sio, mode='wb', filename='B2.gz')
98
        gzf.write(message)
99
        gzf.close()
100
        zmessage = sio.getvalue()
101
        sio.close()
102
    else:
103
        zmessage = message
104
    if MODE_TEST:
105
        content_description = 'IRISTEST/B2'
106
    else:
107
        content_description = 'IRIS/B2'
108
    if MODE_COMPRESS:
109
        content_description += '/Z'
110
    return "Content-Type: application/EDI-consent\n" + \
111
            "Content-Transfer-Encoding: base64\n" + \
112
            "Content-Description: " + content_description + "\n" + \
113
            "\n" + base64.encodestring(zmessage)
114

    
115
def smime(message, x509):
116
    """
117
    output s/mime message (headers+payload), encrypted with x509 certificate
118
    """
119
    # encrypt
120
    if RANDFILE:
121
        Rand.load_file(RANDFILE, -1)
122
    s = SMIME.SMIME()
123
    sk = X509.X509_Stack()
124
    sk.push(x509)
125
    s.set_x509_stack(sk)
126
    s.set_cipher(SMIME.Cipher('des_ede3_cbc'))
127
    bio = BIO.MemoryBuffer(message)
128
    pkcs7 = s.encrypt(bio)
129
    if RANDFILE:
130
        Rand.save_file(RANDFILE)
131
    out = BIO.MemoryBuffer()
132
    s.write(out, pkcs7)
133
    return out.read()
134

    
135
def build_mail(large_regime, dest_organism, b2_filename):
136
    """
137
    build a mail to healt center, with b2-compressed+encrypted information
138
    """
139
    b2_fd = open(b2_filename, 'r')
140
    b2 = b2_fd.read()
141
    b2_fd.close()
142

    
143
    utcnow = datetime.utcnow()
144
    stamp = utcnow.strftime('%Y%m%d%H%M%S') + utcnow.strftime('%f')[:5]
145
    # count invoice in the b2 file = lines start with "5"
146
    nb_invoices = [b2[x*128] for x in range(len(b2)/128)].count('5')
147

    
148
    subject = 'IR%s/%s/%s/%0.5d' % (VVVVVV, EXERCICE, stamp, nb_invoices)
149
    delimiter = '_ENTROUVERT-CALEBASSE-B2-%s' % stamp
150
    mail = {
151
            'From': SENDER,
152
            'To': large_regime + dest_organism + '@' + dest_organism + '.' + large_regime + '.rss.fr',
153
            'Message-ID': '<%s@%s>' % (stamp, MESSAGE_ID_RIGHT),
154
            'Subject': subject,
155
            'Mime-Version': '1.0',
156
            'Content-Type': 'multipart/mixed; boundary="%s"' % delimiter,
157
            }
158
    mime_part = mime(b2)
159
    if MODE_ENCRYPT:
160
        mime_part = smime(mime(b2), get_certificate(large_regime, dest_organism))
161

    
162
    fd = StringIO.StringIO()
163
    for k,v in mail.items():
164
        fd.write('%s: %s\n' % (k,v))
165
    fd.write('\n')
166
    fd.write('--%s\n' % delimiter)
167
    fd.write(mime_part)
168
    fd.write('--%s--\n' % delimiter)
169
    ret = fd.getvalue()
170
    fd.close()
171
    return ret
172

    
173
#
174
# CApath construction
175
#
176

    
177
def der2pem(der, type_='CERTIFICATE'):
178
    return "-----BEGIN %s-----\n%s\n-----END %s-----\n" % \
179
        (type_, base64.encodestring(der).rstrip(), type_)
180

    
181
def build_capath(path=CAPATH):
182
    """
183
    get all pkiCA from the gip-cps.fr ldap, store them in path
184
    note: the gip-cps.fr ldap is limited to 10 objects in a response... by chance, there is less than 10 pkiCA ;)
185
    """
186
    l = ldap.initialize(LDAP_HOST)
187
    results = l.search_s(LDAP_BASEDN,ldap.SCOPE_SUBTREE,'(objectclass=pkiCA)')
188
    for ca in results:
189
        dn = ca[0]
190
        for attr in LDAP_CA_ATTRS:
191
            if LDAP_CA_ATTRS[attr][0] in ca[1]:
192
                n = 0
193
                for der in ca[1][LDAP_CA_ATTRS[attr][0]]:
194
                    filename = os.path.join(path, '%s.%d.%s.pem' % (dn, n, attr))
195
                    print "create ", filename
196
                    fd = open(filename, 'w')
197
                    fd.write(der2pem(der, LDAP_CA_ATTRS[attr][1]))
198
                    fd.close()
199
                    n += 1
200
    os.chdir(path)
201
    os.system('c_rehash .')
202

    
203

    
204
#
205
#
206
#
207

    
208

    
209
if __name__ == '__main__' and (len(sys.argv) > 1):
210
    action = sys.argv[1]
211
    if action == 'build_capath':
212
        build_capath()
213
    if action == 'x509':
214
        x509 = get_certificate(sys.argv[2], sys.argv[3])
215
        print x509.as_text()
216
        print x509.as_pem()
217
    if MODE_TEST and (action == 'test'):
218
        print build_mail('01', '996', 'test.b2')
219

    
(14-14/16)