1
|
import base64
|
2
|
import hashlib
|
3
|
import socket
|
4
|
|
5
|
import requests
|
6
|
from OpenSSL import crypto
|
7
|
|
8
|
import dateutil.parser
|
9
|
import M2Crypto.X509 as X509
|
10
|
import rfc3161
|
11
|
from pyasn1.codec.der import decoder, encoder
|
12
|
from pyasn1.error import PyAsn1Error
|
13
|
from pyasn1.type import univ
|
14
|
from pyasn1_modules import rfc2459
|
15
|
|
16
|
__all__ = ('RemoteTimestamper', 'check_timestamp', 'get_hash_oid',
|
17
|
'TimestampingError', 'get_timestamp')
|
18
|
|
19
|
id_attribute_messageDigest = univ.ObjectIdentifier(
|
20
|
(1, 2, 840, 113549, 1, 9, 4,))
|
21
|
|
22
|
|
23
|
def get_hash_oid(hashname):
|
24
|
return rfc3161.__dict__['id_' + hashname]
|
25
|
|
26
|
|
27
|
def get_hash_from_oid(oid):
|
28
|
h = rfc3161.oid_to_hash.get(oid)
|
29
|
if h is None:
|
30
|
raise ValueError('unsupported hash algorithm', oid)
|
31
|
return h
|
32
|
|
33
|
|
34
|
def get_hash_class_from_oid(oid):
|
35
|
h = get_hash_from_oid(oid)
|
36
|
return getattr(hashlib, h)
|
37
|
|
38
|
|
39
|
class TimestampingError(RuntimeError):
|
40
|
pass
|
41
|
|
42
|
|
43
|
def get_timestamp(tst):
|
44
|
try:
|
45
|
if not isinstance(tst, rfc3161.TimeStampToken):
|
46
|
tst, substrate = decoder.decode(
|
47
|
tst, asn1Spec=rfc3161.TimeStampToken())
|
48
|
if substrate:
|
49
|
raise ValueError("extra data after tst")
|
50
|
|
51
|
tstinfo = tst.getComponentByName(
|
52
|
'content').getComponentByPosition(2).getComponentByPosition(1)
|
53
|
tstinfo, substrate = decoder.decode(
|
54
|
tstinfo, asn1Spec=univ.OctetString())
|
55
|
if substrate:
|
56
|
raise ValueError("extra data after tst")
|
57
|
tstinfo, substrate = decoder.decode(
|
58
|
tstinfo, asn1Spec=rfc3161.TSTInfo())
|
59
|
if substrate:
|
60
|
raise ValueError("extra data after tst")
|
61
|
genTime = tstinfo.getComponentByName('genTime')
|
62
|
return dateutil.parser.parse(str(genTime))
|
63
|
except PyAsn1Error, e:
|
64
|
raise ValueError('not a valid TimeStampToken', e)
|
65
|
|
66
|
|
67
|
def check_timestamp(tst, certificate, data=None, digest=None, hashname=None, nonce=None, attributeSet=False):
|
68
|
hashname = hashname or 'sha1'
|
69
|
hashobj = hashlib.new(hashname)
|
70
|
if digest is None:
|
71
|
if not data:
|
72
|
raise ValueError("method requires data or digest argument")
|
73
|
hashobj.update(data)
|
74
|
digest = hashobj.digest()
|
75
|
|
76
|
if not isinstance(tst, rfc3161.TimeStampToken):
|
77
|
tst, substrate = decoder.decode(tst, asn1Spec=rfc3161.TimeStampToken())
|
78
|
if substrate:
|
79
|
raise ValueError("extra data after tst")
|
80
|
signed_data = tst.content
|
81
|
certificate = load_certificate(signed_data, certificate)
|
82
|
if nonce is not None and int(tst.tst_info['nonce']) != int(nonce):
|
83
|
raise ValueError('nonce is different or missing')
|
84
|
# check message imprint with respect to locally computed digest
|
85
|
message_imprint = tst.tst_info.message_imprint
|
86
|
if message_imprint.hash_algorithm[0] != get_hash_oid(hashname) or \
|
87
|
str(message_imprint.hashed_message) != digest:
|
88
|
raise ValueError('Message imprint mismatch')
|
89
|
#
|
90
|
if not len(signed_data['signerInfos']):
|
91
|
raise ValueError('No signature')
|
92
|
# We validate only one signature
|
93
|
signer_info = signed_data['signerInfos'][0]
|
94
|
# check content type
|
95
|
if tst.content['contentInfo']['contentType'] != rfc3161.id_ct_TSTInfo:
|
96
|
raise ValueError("Signed content type is wrong: %s != %s" % (
|
97
|
tst.content['contentInfo']['contentType'], rfc3161.id_ct_TSTInfo))
|
98
|
|
99
|
# check signed data digest
|
100
|
content = str(decoder.decode(str(tst.content['contentInfo']['content']),
|
101
|
asn1Spec=univ.OctetString())[0])
|
102
|
# if there is authenticated attributes, they must contain the message
|
103
|
# digest and they are the signed data otherwise the content is the
|
104
|
# signed data
|
105
|
if len(signer_info['authenticatedAttributes']):
|
106
|
authenticated_attributes = signer_info['authenticatedAttributes']
|
107
|
signer_digest_algorithm = signer_info['digestAlgorithm']['algorithm']
|
108
|
signer_hash_class = get_hash_class_from_oid(signer_digest_algorithm)
|
109
|
signer_hash_name = get_hash_from_oid(signer_digest_algorithm)
|
110
|
content_digest = signer_hash_class(content).digest()
|
111
|
setOfAttributes = univ.SetOf()
|
112
|
for authenticated_attribute in authenticated_attributes:
|
113
|
if authenticated_attribute[0] == id_attribute_messageDigest:
|
114
|
try:
|
115
|
signed_digest = str(decoder.decode(str(authenticated_attribute[1][0]),
|
116
|
asn1Spec=univ.OctetString())[0])
|
117
|
if signed_digest != content_digest:
|
118
|
raise ValueError('Content digest != signed digest')
|
119
|
for i, x in enumerate(authenticated_attributes):
|
120
|
setOfAttributes.setComponentByPosition(i, x)
|
121
|
signed_content = encoder.encode(setOfAttributes)
|
122
|
break
|
123
|
except PyAsn1Error:
|
124
|
raise
|
125
|
pass
|
126
|
else:
|
127
|
raise ValueError('No signed digest')
|
128
|
else:
|
129
|
signed_content = content
|
130
|
# check signature
|
131
|
signature = signer_info['encryptedDigest']
|
132
|
crypto.verify(certificate,str(signature),signed_content,signer_hash_name)
|
133
|
return setOfAttributes if attributeSet else signed_content
|
134
|
|
135
|
|
136
|
def load_certificate(signed_data, certificate=""):
|
137
|
if certificate != "":
|
138
|
try:
|
139
|
certificate = crypto.load_certificate(
|
140
|
crypto.FILETYPE_ASN1,
|
141
|
encoder.encode(signed_data['certificates'][0][0])
|
142
|
)
|
143
|
except:
|
144
|
raise AttributeError("missing certificate")
|
145
|
else:
|
146
|
try:
|
147
|
certificate = crypto.load_certificate(
|
148
|
crypto.FILETYPE_PEM,
|
149
|
certificate
|
150
|
)
|
151
|
except:
|
152
|
certificate = crypto.load_certificate(
|
153
|
crypto.FILETYPE_ASN1,
|
154
|
certificate
|
155
|
)
|
156
|
|
157
|
return certificate
|
158
|
|
159
|
|
160
|
class RemoteTimestamper(object):
|
161
|
|
162
|
def __init__(self, url, certificate=None, capath=None, cafile=None, username=None, password=None, hashname=None, include_tsa_certificate=False, timeout=10):
|
163
|
self.url = url
|
164
|
self.certificate = certificate
|
165
|
self.capath = capath
|
166
|
self.cafile = cafile
|
167
|
self.username = username
|
168
|
self.password = password
|
169
|
self.hashname = hashname or 'sha1'
|
170
|
self.include_tsa_certificate = include_tsa_certificate
|
171
|
self.timeout = timeout
|
172
|
|
173
|
def check_response(self, response, digest, nonce=None):
|
174
|
'''
|
175
|
Check validity of a TimeStampResponse
|
176
|
'''
|
177
|
tst = response.time_stamp_token
|
178
|
return self.check(tst, digest=digest, nonce=nonce)
|
179
|
|
180
|
def check(self, tst, data=None, digest=None, nonce=None):
|
181
|
return check_timestamp(tst, digest=digest, data=data, nonce=nonce,
|
182
|
certificate=self.certificate, hashname=self.hashname)
|
183
|
|
184
|
def timestamp(self, data=None, digest=None, include_tsa_certificate=None, nonce=None):
|
185
|
return self(data=data, digest=digest,
|
186
|
include_tsa_certificate=include_tsa_certificate, nonce=nonce)
|
187
|
|
188
|
def __call__(self, data=None, digest=None, include_tsa_certificate=None, nonce=None):
|
189
|
algorithm_identifier = rfc2459.AlgorithmIdentifier()
|
190
|
algorithm_identifier.setComponentByPosition(
|
191
|
0, get_hash_oid(self.hashname))
|
192
|
message_imprint = rfc3161.MessageImprint()
|
193
|
message_imprint.setComponentByPosition(0, algorithm_identifier)
|
194
|
hashobj = hashlib.new(self.hashname)
|
195
|
if data:
|
196
|
hashobj.update(data)
|
197
|
digest = hashobj.digest()
|
198
|
elif digest:
|
199
|
assert len(digest) == hashobj.digest_size, 'digest length is wrong'
|
200
|
else:
|
201
|
raise ValueError(
|
202
|
'You must pass some data to digest, or the digest')
|
203
|
message_imprint.setComponentByPosition(1, digest)
|
204
|
request = rfc3161.TimeStampReq()
|
205
|
request.setComponentByPosition(0, 'v1')
|
206
|
request.setComponentByPosition(1, message_imprint)
|
207
|
if nonce is not None:
|
208
|
request.setComponentByPosition(3, int(nonce))
|
209
|
request.setComponentByPosition(
|
210
|
4, include_tsa_certificate if include_tsa_certificate is not None else self.include_tsa_certificate)
|
211
|
binary_request = encoder.encode(request)
|
212
|
headers = {'Content-Type': 'application/timestamp-query'}
|
213
|
if self.username != None:
|
214
|
base64string = base64.standard_b64encode(
|
215
|
'%s:%s' % (self.username, self.password))
|
216
|
headers['Authorization'] = "Basic %s" % base64string
|
217
|
try:
|
218
|
response = requests.post(self.url, data=binary_request,
|
219
|
timeout=self.timeout, headers=headers)
|
220
|
except request.RequestException, e:
|
221
|
raise TimestampingError(
|
222
|
'Unable to send the request to %r' % self.url, e)
|
223
|
tst_response, substrate = decoder.decode(
|
224
|
response.content, asn1Spec=rfc3161.TimeStampResp())
|
225
|
if substrate:
|
226
|
raise ValueError('Extra data returned')
|
227
|
self.check_response(tst_response, digest, nonce=nonce)
|
228
|
return encoder.encode(tst_response.time_stamp_token)
|