Projet

Général

Profil

Bug #12835 » api.py

using pyOpenSSL - Kiril Grancharov, 05 août 2016 10:02

 
1
import base64
2
import hashlib
3
import socket
4

    
5
import requests
6
from OpenSSL import crypto
7

    
8
import dateutil.parser
9
import M2Crypto.X509 as X509
10
import rfc3161
11
from pyasn1.codec.der import decoder, encoder
12
from pyasn1.error import PyAsn1Error
13
from pyasn1.type import univ
14
from pyasn1_modules import rfc2459
15

    
16
__all__ = ('RemoteTimestamper', 'check_timestamp', 'get_hash_oid',
17
           'TimestampingError', 'get_timestamp')
18

    
19
id_attribute_messageDigest = univ.ObjectIdentifier(
20
    (1, 2, 840, 113549, 1, 9, 4,))
21

    
22

    
23
def get_hash_oid(hashname):
24
    return rfc3161.__dict__['id_' + hashname]
25

    
26

    
27
def get_hash_from_oid(oid):
28
    h = rfc3161.oid_to_hash.get(oid)
29
    if h is None:
30
        raise ValueError('unsupported hash algorithm', oid)
31
    return h
32

    
33

    
34
def get_hash_class_from_oid(oid):
35
    h = get_hash_from_oid(oid)
36
    return getattr(hashlib, h)
37

    
38

    
39
class TimestampingError(RuntimeError):
40
    pass
41

    
42

    
43
def get_timestamp(tst):
44
    try:
45
        if not isinstance(tst, rfc3161.TimeStampToken):
46
            tst, substrate = decoder.decode(
47
                tst, asn1Spec=rfc3161.TimeStampToken())
48
            if substrate:
49
                raise ValueError("extra data after tst")
50

    
51
        tstinfo = tst.getComponentByName(
52
            'content').getComponentByPosition(2).getComponentByPosition(1)
53
        tstinfo, substrate = decoder.decode(
54
            tstinfo, asn1Spec=univ.OctetString())
55
        if substrate:
56
            raise ValueError("extra data after tst")
57
        tstinfo, substrate = decoder.decode(
58
            tstinfo, asn1Spec=rfc3161.TSTInfo())
59
        if substrate:
60
            raise ValueError("extra data after tst")
61
        genTime = tstinfo.getComponentByName('genTime')
62
        return dateutil.parser.parse(str(genTime))
63
    except PyAsn1Error, e:
64
        raise ValueError('not a valid TimeStampToken', e)
65

    
66

    
67
def check_timestamp(tst, certificate, data=None, digest=None, hashname=None, nonce=None, attributeSet=False):
68
    hashname = hashname or 'sha1'
69
    hashobj = hashlib.new(hashname)
70
    if digest is None:
71
        if not data:
72
            raise ValueError("method requires data or digest argument")
73
        hashobj.update(data)
74
        digest = hashobj.digest()
75

    
76
    if not isinstance(tst, rfc3161.TimeStampToken):
77
        tst, substrate = decoder.decode(tst, asn1Spec=rfc3161.TimeStampToken())
78
        if substrate:
79
            raise ValueError("extra data after tst")
80
    signed_data = tst.content
81
    certificate = load_certificate(signed_data, certificate)
82
    if nonce is not None and int(tst.tst_info['nonce']) != int(nonce):
83
        raise ValueError('nonce is different or missing')
84
    # check message imprint with respect to locally computed digest
85
    message_imprint = tst.tst_info.message_imprint
86
    if message_imprint.hash_algorithm[0] != get_hash_oid(hashname) or \
87
            str(message_imprint.hashed_message) != digest:
88
        raise ValueError('Message imprint mismatch')
89
    #
90
    if not len(signed_data['signerInfos']):
91
        raise ValueError('No signature')
92
    # We validate only one signature
93
    signer_info = signed_data['signerInfos'][0]
94
    # check content type
95
    if tst.content['contentInfo']['contentType'] != rfc3161.id_ct_TSTInfo:
96
        raise ValueError("Signed content type is wrong: %s != %s" % (
97
            tst.content['contentInfo']['contentType'], rfc3161.id_ct_TSTInfo))
98

    
99
    # check signed data digest
100
    content = str(decoder.decode(str(tst.content['contentInfo']['content']),
101
                                 asn1Spec=univ.OctetString())[0])
102
    # if there is authenticated attributes, they must contain the message
103
    # digest and they are the signed data otherwise the content is the
104
    # signed data
105
    if len(signer_info['authenticatedAttributes']):
106
        authenticated_attributes = signer_info['authenticatedAttributes']
107
        signer_digest_algorithm = signer_info['digestAlgorithm']['algorithm']
108
        signer_hash_class = get_hash_class_from_oid(signer_digest_algorithm)
109
        signer_hash_name = get_hash_from_oid(signer_digest_algorithm)
110
        content_digest = signer_hash_class(content).digest()
111
        setOfAttributes = univ.SetOf()
112
        for authenticated_attribute in authenticated_attributes:
113
            if authenticated_attribute[0] == id_attribute_messageDigest:
114
                try:
115
                    signed_digest = str(decoder.decode(str(authenticated_attribute[1][0]),
116
                                                       asn1Spec=univ.OctetString())[0])
117
                    if signed_digest != content_digest:
118
                        raise ValueError('Content digest != signed digest')
119
                    for i, x in enumerate(authenticated_attributes):
120
                        setOfAttributes.setComponentByPosition(i, x)
121
                    signed_content = encoder.encode(setOfAttributes)
122
                    break
123
                except PyAsn1Error:
124
                    raise
125
                    pass
126
        else:
127
            raise ValueError('No signed digest')
128
    else:
129
        signed_content = content
130
    # check signature
131
    signature = signer_info['encryptedDigest']
132
    crypto.verify(certificate,str(signature),signed_content,signer_hash_name)
133
    return setOfAttributes if attributeSet else signed_content
134

    
135

    
136
def load_certificate(signed_data, certificate=""):
137
    if certificate != "":
138
        try:
139
            certificate = crypto.load_certificate(
140
                crypto.FILETYPE_ASN1,
141
                encoder.encode(signed_data['certificates'][0][0])
142
            )
143
        except:
144
            raise AttributeError("missing certificate")
145
    else:
146
        try:
147
            certificate = crypto.load_certificate(
148
                crypto.FILETYPE_PEM,
149
                certificate
150
            )
151
        except:
152
            certificate = crypto.load_certificate(
153
                crypto.FILETYPE_ASN1,
154
                certificate
155
            )
156

    
157
    return certificate
158

    
159

    
160
class RemoteTimestamper(object):
161

    
162
    def __init__(self, url, certificate=None, capath=None, cafile=None, username=None, password=None, hashname=None, include_tsa_certificate=False, timeout=10):
163
        self.url = url
164
        self.certificate = certificate
165
        self.capath = capath
166
        self.cafile = cafile
167
        self.username = username
168
        self.password = password
169
        self.hashname = hashname or 'sha1'
170
        self.include_tsa_certificate = include_tsa_certificate
171
        self.timeout = timeout
172

    
173
    def check_response(self, response, digest, nonce=None):
174
        '''
175
           Check validity of a TimeStampResponse
176
        '''
177
        tst = response.time_stamp_token
178
        return self.check(tst, digest=digest, nonce=nonce)
179

    
180
    def check(self, tst, data=None, digest=None, nonce=None):
181
        return check_timestamp(tst, digest=digest, data=data, nonce=nonce,
182
                               certificate=self.certificate, hashname=self.hashname)
183

    
184
    def timestamp(self, data=None, digest=None, include_tsa_certificate=None, nonce=None):
185
        return self(data=data, digest=digest,
186
                    include_tsa_certificate=include_tsa_certificate, nonce=nonce)
187

    
188
    def __call__(self, data=None, digest=None, include_tsa_certificate=None, nonce=None):
189
        algorithm_identifier = rfc2459.AlgorithmIdentifier()
190
        algorithm_identifier.setComponentByPosition(
191
            0, get_hash_oid(self.hashname))
192
        message_imprint = rfc3161.MessageImprint()
193
        message_imprint.setComponentByPosition(0, algorithm_identifier)
194
        hashobj = hashlib.new(self.hashname)
195
        if data:
196
            hashobj.update(data)
197
            digest = hashobj.digest()
198
        elif digest:
199
            assert len(digest) == hashobj.digest_size, 'digest length is wrong'
200
        else:
201
            raise ValueError(
202
                'You must pass some data to digest, or the digest')
203
        message_imprint.setComponentByPosition(1, digest)
204
        request = rfc3161.TimeStampReq()
205
        request.setComponentByPosition(0, 'v1')
206
        request.setComponentByPosition(1, message_imprint)
207
        if nonce is not None:
208
            request.setComponentByPosition(3, int(nonce))
209
        request.setComponentByPosition(
210
            4, include_tsa_certificate if include_tsa_certificate is not None else self.include_tsa_certificate)
211
        binary_request = encoder.encode(request)
212
        headers = {'Content-Type': 'application/timestamp-query'}
213
        if self.username != None:
214
            base64string = base64.standard_b64encode(
215
                '%s:%s' % (self.username, self.password))
216
            headers['Authorization'] = "Basic %s" % base64string
217
        try:
218
            response = requests.post(self.url, data=binary_request,
219
                                     timeout=self.timeout, headers=headers)
220
        except request.RequestException, e:
221
            raise TimestampingError(
222
                'Unable to send the request to %r' % self.url, e)
223
        tst_response, substrate = decoder.decode(
224
            response.content, asn1Spec=rfc3161.TimeStampResp())
225
        if substrate:
226
            raise ValueError('Extra data returned')
227
        self.check_response(tst_response, digest, nonce=nonce)
228
        return encoder.encode(tst_response.time_stamp_token)