0001-add-cryptor-connector-39431.patch
passerelle/apps/cryptor/migrations/0001_initial.py | ||
---|---|---|
1 |
# -*- coding: utf-8 -*- |
|
2 |
# Generated by Django 1.11.18 on 2020-02-21 14:54 |
|
3 |
from __future__ import unicode_literals |
|
4 | ||
5 |
from django.db import migrations, models |
|
6 |
import django.db.models.deletion |
|
7 |
import passerelle.apps.cryptor.models |
|
8 |
import uuid |
|
9 | ||
10 | ||
11 |
class Migration(migrations.Migration): |
|
12 | ||
13 |
initial = True |
|
14 | ||
15 |
dependencies = [ |
|
16 |
('base', '0016_auto_20191002_1443'), |
|
17 |
] |
|
18 | ||
19 |
operations = [ |
|
20 |
migrations.CreateModel( |
|
21 |
name='CryptedFile', |
|
22 |
fields=[ |
|
23 |
('uuid', models.UUIDField(default=uuid.uuid4, editable=False, primary_key=True, serialize=False)), |
|
24 |
('filename', models.CharField(max_length=512)), |
|
25 |
('content_type', models.CharField(max_length=128)), |
|
26 |
('creation_timestamp', models.DateTimeField(auto_now_add=True)), |
|
27 |
], |
|
28 |
), |
|
29 |
migrations.CreateModel( |
|
30 |
name='Cryptor', |
|
31 |
fields=[ |
|
32 |
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), |
|
33 |
('title', models.CharField(max_length=50, verbose_name='Title')), |
|
34 |
('slug', models.SlugField(unique=True, verbose_name='Identifier')), |
|
35 |
('description', models.TextField(verbose_name='Description')), |
|
36 |
('public_key', models.TextField(blank=True, validators=[passerelle.apps.cryptor.models.validate_rsa_key], verbose_name='Encrypt RSA public key (PEM format)')), |
|
37 |
('private_key', models.TextField(blank=True, validators=[passerelle.apps.cryptor.models.validate_rsa_key], verbose_name='Decrypt RSA private key (PEM format)')), |
|
38 |
('redirect_url_base', models.URLField(blank=True, help_text='Base URL for redirect, empty for local', max_length=256, verbose_name='Base URL of decrypt system')), |
|
39 |
('users', models.ManyToManyField(blank=True, related_name='_cryptor_users_+', related_query_name='+', to='base.ApiUser')), |
|
40 |
], |
|
41 |
options={ |
|
42 |
'verbose_name': 'Encryption and Decryption', |
|
43 |
}, |
|
44 |
), |
|
45 |
migrations.AddField( |
|
46 |
model_name='cryptedfile', |
|
47 |
name='resource', |
|
48 |
field=models.ForeignKey(on_delete=django.db.models.deletion.PROTECT, to='cryptor.Cryptor'), |
|
49 |
), |
|
50 |
] |
passerelle/apps/cryptor/models.py | ||
---|---|---|
1 |
# passerelle - uniform access to multiple data sources and services |
|
2 |
# Copyright (C) 2020 Entr'ouvert |
|
3 |
# |
|
4 |
# This program is free software: you can redistribute it and/or modify it |
|
5 |
# under the terms of the GNU Affero General Public License as published |
|
6 |
# by the Free Software Foundation, either version 3 of the License, or |
|
7 |
# (at your option) any later version. |
|
8 |
# |
|
9 |
# This program is distributed in the hope that it will be useful, |
|
10 |
# but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
11 |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
12 |
# GNU Affero General Public License for more details. |
|
13 |
# |
|
14 |
# You should have received a copy of the GNU Affero General Public License |
|
15 |
# along with this program. If not, see <http://www.gnu.org/licenses/>. |
|
16 | ||
17 |
import contextlib |
|
18 |
import base64 |
|
19 |
import binascii |
|
20 |
import json |
|
21 |
import os |
|
22 |
import tempfile |
|
23 |
from uuid import uuid4 |
|
24 | ||
25 |
from Cryptodome.PublicKey import RSA |
|
26 |
from Cryptodome.Random import get_random_bytes |
|
27 |
from Cryptodome.Cipher import AES, PKCS1_OAEP |
|
28 | ||
29 |
from django.core.exceptions import ValidationError |
|
30 |
from django.core.files.storage import default_storage |
|
31 |
from django.db import models |
|
32 |
from django.http import HttpResponse |
|
33 |
from django.utils.six.moves.urllib_parse import urljoin |
|
34 |
from django.utils.translation import ugettext_lazy as _ |
|
35 | ||
36 |
from passerelle.base.models import BaseResource |
|
37 |
from passerelle.utils.api import endpoint |
|
38 |
from passerelle.utils.files import atomic_write |
|
39 |
from passerelle.utils.jsonresponse import APIError |
|
40 | ||
41 | ||
42 |
FILE_SCHEMA = { |
|
43 |
"$schema": "http://json-schema.org/draft-04/schema#", |
|
44 |
"title": "File to encrypt", |
|
45 |
"description": "", |
|
46 |
"type": "object", |
|
47 |
"required": ["file"], |
|
48 |
"properties": { |
|
49 |
"file": { |
|
50 |
"type": "object", |
|
51 |
"required": ["filename", "content_type", "content"], |
|
52 |
"properties": { |
|
53 |
"filename": {"type": "string"}, |
|
54 |
"content_type": {"type": "string"}, |
|
55 |
"content": {"type": "string"}, |
|
56 |
} |
|
57 |
} |
|
58 |
} |
|
59 |
} |
|
60 | ||
61 | ||
62 |
# encrypt and decrypt are borrowed from |
|
63 |
# https://www.pycryptodome.org/en/latest/src/examples.html#encrypt-data-with-rsa |
|
64 | ||
65 |
def write_encrypt(out_file, data, key_pem): |
|
66 |
public_key = RSA.import_key(key_pem) |
|
67 |
session_key = get_random_bytes(16) |
|
68 |
# Encrypt the session key with the public RSA key |
|
69 |
cipher_rsa = PKCS1_OAEP.new(public_key) |
|
70 |
enc_session_key = cipher_rsa.encrypt(session_key) |
|
71 |
# Encrypt the data with the AES session key |
|
72 |
cipher_aes = AES.new(session_key, AES.MODE_EAX) |
|
73 |
ciphertext, tag = cipher_aes.encrypt_and_digest(data) |
|
74 |
# Store in out_file |
|
75 |
out_file.write(enc_session_key) |
|
76 |
out_file.write(cipher_aes.nonce) |
|
77 |
out_file.write(tag) |
|
78 |
out_file.write(ciphertext) |
|
79 | ||
80 | ||
81 |
def read_decrypt(in_file, key_pem): |
|
82 |
private_key = RSA.import_key(key_pem) |
|
83 |
# Get crypt elements from in_file |
|
84 |
enc_session_key = in_file.read(private_key.size_in_bytes()) |
|
85 |
nonce = in_file.read(16) |
|
86 |
tag = in_file.read(16) |
|
87 |
ciphertext = in_file.read() |
|
88 |
# Decrypt the session key with the private RSA key |
|
89 |
cipher_rsa = PKCS1_OAEP.new(private_key) |
|
90 |
session_key = cipher_rsa.decrypt(enc_session_key) |
|
91 |
# Decrypt the data with the AES session key |
|
92 |
cipher_aes = AES.new(session_key, AES.MODE_EAX, nonce) |
|
93 |
decrypted = cipher_aes.decrypt_and_verify(ciphertext, tag) |
|
94 |
return decrypted |
|
95 | ||
96 | ||
97 |
def makedir(dir_name): |
|
98 |
if not os.path.exists(dir_name): |
|
99 |
if default_storage.directory_permissions_mode: |
|
100 |
d_umask = os.umask(0) |
|
101 |
try: |
|
102 |
os.makedirs(dir_name, mode=default_storage.directory_permissions_mode) |
|
103 |
except OSError: |
|
104 |
pass |
|
105 |
finally: |
|
106 |
os.umask(d_umask) |
|
107 |
else: |
|
108 |
os.makedirs(dir_name) |
|
109 | ||
110 | ||
111 |
def validate_rsa_key(key): |
|
112 |
try: |
|
113 |
RSA.import_key(key) |
|
114 |
except ValueError as ex: |
|
115 |
raise ValidationError(_('invalid RSA key (%s)') % ex) |
|
116 | ||
117 | ||
118 |
class Cryptor(BaseResource): |
|
119 |
public_key = models.TextField(blank=True, |
|
120 |
verbose_name=_('Encrypt RSA public key (PEM format)'), |
|
121 |
validators=[validate_rsa_key]) |
|
122 |
private_key = models.TextField(blank=True, |
|
123 |
verbose_name=_('Decrypt RSA private key (PEM format)'), |
|
124 |
validators=[validate_rsa_key]) |
|
125 |
redirect_url_base = models.URLField(max_length=256, blank=True, |
|
126 |
verbose_name=_('Base URL of decrypt system'), |
|
127 |
help_text=_('Base URL for redirect, empty for local')) |
|
128 | ||
129 |
category = _('Misc') |
|
130 | ||
131 |
class Meta: |
|
132 |
verbose_name = _('Encrypt / Decrypt') |
|
133 | ||
134 |
def get_redirect_url_base_display(self): |
|
135 |
if self.redirect_url_base: |
|
136 |
return _('defined') # don't show it, can be sensitive |
|
137 |
return _('this file-decrypt endpoint') |
|
138 | ||
139 |
def get_filename(self, uuid, create=False): |
|
140 |
dirname = os.path.join(default_storage.path(self.get_connector_slug()), |
|
141 |
self.slug, uuid[0:2], uuid[2:4]) |
|
142 |
if create: |
|
143 |
makedir(dirname) |
|
144 |
filename = os.path.join(dirname, uuid) |
|
145 |
return filename |
|
146 | ||
147 | ||
148 |
@endpoint(name='file-encrypt', perm='can_encrypt', |
|
149 |
description=_('Encrypt a file'), |
|
150 |
post={ |
|
151 |
'description': _('File to encrypt'), |
|
152 |
'request_body': {'schema': {'application/json': FILE_SCHEMA}} |
|
153 |
}) |
|
154 |
def file_encrypt(self, request, post_data): |
|
155 |
if not self.public_key: |
|
156 |
raise APIError('missing public key') |
|
157 |
try: |
|
158 |
data = base64.b64decode(post_data['file']['content']) |
|
159 |
except (TypeError, binascii.Error): |
|
160 |
raise APIError('invalid base64 string', http_status=400) |
|
161 | ||
162 |
filename = post_data['file']['filename'] |
|
163 |
content_type = post_data['file']['content_type'] |
|
164 |
cfile = CryptedFile(resource=self, filename=filename, content_type=content_type) |
|
165 |
cfile.save() |
|
166 | ||
167 |
uuid = str(cfile.uuid) |
|
168 | ||
169 |
if self.redirect_url_base: |
|
170 |
redirect_url_base = self.redirect_url_base |
|
171 |
else: |
|
172 |
redirect_url_base = request.build_absolute_uri('%sfile-decrypt/' % ( |
|
173 |
self.get_absolute_url(),)) |
|
174 |
redirect_url = urljoin(redirect_url_base, uuid) |
|
175 | ||
176 |
content_filename = self.get_filename(uuid, create=True) |
|
177 |
metadata_filename = '%s.json' % content_filename |
|
178 |
metadata = { |
|
179 |
'filename': cfile.filename, |
|
180 |
'content_type': cfile.content_type, |
|
181 |
'creation_timestamp': cfile.creation_timestamp.isoformat(), |
|
182 |
'redirect_url': redirect_url, |
|
183 |
} |
|
184 | ||
185 |
tmp_dir = os.path.join(default_storage.path(self.get_connector_slug()), self.slug, 'tmp') |
|
186 |
with atomic_write(content_filename, dir=tmp_dir) as fd: |
|
187 |
write_encrypt(fd, data, self.public_key) |
|
188 |
with atomic_write(metadata_filename, dir=tmp_dir, mode='w') as fd: |
|
189 |
json.dump(metadata, fd, indent=2) |
|
190 | ||
191 |
return {'data': metadata} |
|
192 | ||
193 |
@endpoint(name='file-decrypt', perm='can_decrypt', |
|
194 |
description=_('Decrypt a file'), |
|
195 |
pattern=r'(?P<uuid>[\w-]+)$', |
|
196 |
example_pattern='{uuid}/', |
|
197 |
parameters={ |
|
198 |
'uuid': { |
|
199 |
'description': _('File identifier'), |
|
200 |
'example_value': '12345678-abcd-4321-abcd-123456789012', |
|
201 |
}, |
|
202 |
}) |
|
203 |
def file_decrypt(self, request, uuid): |
|
204 |
if not self.private_key: |
|
205 |
raise APIError('missing private key') |
|
206 |
content_filename = self.get_filename(uuid, create=False) |
|
207 |
metadata_filename = '%s.json' % content_filename |
|
208 |
if not os.path.exists(metadata_filename): |
|
209 |
raise APIError('unknown uuid', http_status=404) |
|
210 | ||
211 |
content = read_decrypt(open(content_filename, 'rb'), self.private_key) |
|
212 | ||
213 |
metadata = json.load(open(metadata_filename, 'r')) |
|
214 |
filename = metadata.get('filename') |
|
215 |
content_type = metadata.get('content_type') |
|
216 | ||
217 |
response = HttpResponse(content_type=content_type) |
|
218 |
response['Content-Disposition'] = 'inline; filename="%s"' % filename |
|
219 |
response.write(content) |
|
220 |
return response |
|
221 | ||
222 | ||
223 |
class CryptedFile(models.Model): |
|
224 |
resource = models.ForeignKey(Cryptor, on_delete=models.PROTECT) |
|
225 |
uuid = models.UUIDField(primary_key=True, default=uuid4, editable=False) |
|
226 |
filename = models.CharField(max_length=512, blank=False) |
|
227 |
content_type = models.CharField(max_length=128) |
|
228 |
creation_timestamp = models.DateTimeField(auto_now_add=True) |
passerelle/settings.py | ||
---|---|---|
134 | 134 |
'passerelle.apps.cityweb', |
135 | 135 |
'passerelle.apps.clicrdv', |
136 | 136 |
'passerelle.apps.cmis', |
137 |
'passerelle.apps.cryptor', |
|
137 | 138 |
'passerelle.apps.csvdatasource', |
138 | 139 |
'passerelle.apps.family', |
139 | 140 |
'passerelle.apps.feeds', |
setup.py | ||
---|---|---|
104 | 104 |
'jsonschema < 3.1', |
105 | 105 |
'zeep >= 3.2', |
106 | 106 |
'pycrypto', |
107 |
'pycryptodomex', |
|
107 | 108 |
'unidecode', |
108 | 109 |
'paramiko', |
109 | 110 |
'pdfrw', |
tests/test_cryptor.py | ||
---|---|---|
1 |
# -*- coding: utf-8 -*- |
|
2 | ||
3 |
import base64 |
|
4 |
import pytest |
|
5 | ||
6 |
from django.contrib.contenttypes.models import ContentType |
|
7 |
from django.core.exceptions import ValidationError |
|
8 |
from django.utils.encoding import force_text |
|
9 | ||
10 |
from passerelle.apps.cryptor.models import Cryptor, CryptedFile |
|
11 |
from passerelle.base.models import ApiUser, AccessRight |
|
12 | ||
13 |
import utils |
|
14 | ||
15 |
PUBLIC_KEY = '''-----BEGIN CERTIFICATE----- |
|
16 |
MIICyjCCAbKgAwIBAgIUQQzM2eFYF+LpUR3t2euAjZAwLCEwDQYJKoZIhvcNAQEL |
|
17 |
BQAwDzENMAsGA1UEAwwEemVwbzAeFw0xOTA2MjIyMjMxMTVaFw0yOTA2MTkyMjMx |
|
18 |
MTVaMA8xDTALBgNVBAMMBHplcG8wggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEK |
|
19 |
AoIBAQC8BM3xDylze0bOm76IjidyhmFqJlnRvcpbeZVTM7r3qYOHqXFG7/GZL4yd |
|
20 |
2bW5eL6TCUT3gLEgegGYGPwCkGPd1cq9h+2M7zvolGToRCvrBpxH5Vu6iRkEYyWN |
|
21 |
yPhc02EUqYlz1FBBYRgyYHQ4jy0vsPH55g536OKLI4rVykszjwD9p0Kh+T2I1D9Z |
|
22 |
bHyA6s0C8goUFZG7kvasFRIXTTPgUBGSnEN/VPSD5vM94Oj5K4t6P9GHd32Jo2O6 |
|
23 |
E5jAHbPR7I4nRBCJuxJEbfpmsaOuMkGQ5rMulk6LXvRZiT9UDCDem1k6uF6tJkZR |
|
24 |
g+Uh5V4ZLCzP7sSHcRN2ftWZqAr7AgMBAAGjHjAcMAkGA1UdEwQCMAAwDwYDVR0R |
|
25 |
BAgwBoIEemVwbzANBgkqhkiG9w0BAQsFAAOCAQEANo54TMbOR5Isd4lix87EM0N+ |
|
26 |
8kxovCLin/szK4+fGfnr0fCUswkhoZ3y6xnmXFX4S2IGLU8bTl+eQVg04VM/7Gg2 |
|
27 |
LvBTtiBmGESbiSaC1fS+DbPBjp1NBpfwbiQFEuQfRMS6ejeF1YMS8Oy9PpeujHDT |
|
28 |
4cX0kPH2GkqOGtpAdKoOD5XzT3yu5IHv7/AWpl8LZ5hr3f1RbzfIzJ19oC5NDXIR |
|
29 |
d1XsjuCCHFbCyfnDmuZuQaGCTCm9f4Z8Ynum6hmSSzNvy3YJLRKXEYYLB3+l2V+t |
|
30 |
SCzQiVzqDeKZnAChvJRditvcKdG36TPHREyCPgkpUTmi0gEjDZDPyBmXhHXHYA== |
|
31 |
-----END CERTIFICATE----- |
|
32 |
''' |
|
33 |
PRIVATE_KEY = '''-----BEGIN PRIVATE KEY----- |
|
34 |
MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQC8BM3xDylze0bO |
|
35 |
m76IjidyhmFqJlnRvcpbeZVTM7r3qYOHqXFG7/GZL4yd2bW5eL6TCUT3gLEgegGY |
|
36 |
GPwCkGPd1cq9h+2M7zvolGToRCvrBpxH5Vu6iRkEYyWNyPhc02EUqYlz1FBBYRgy |
|
37 |
YHQ4jy0vsPH55g536OKLI4rVykszjwD9p0Kh+T2I1D9ZbHyA6s0C8goUFZG7kvas |
|
38 |
FRIXTTPgUBGSnEN/VPSD5vM94Oj5K4t6P9GHd32Jo2O6E5jAHbPR7I4nRBCJuxJE |
|
39 |
bfpmsaOuMkGQ5rMulk6LXvRZiT9UDCDem1k6uF6tJkZRg+Uh5V4ZLCzP7sSHcRN2 |
|
40 |
ftWZqAr7AgMBAAECggEAUQsdHhA0BNQZdEtLuJ7VwBbOfKvlQXQ2enGQ/RkqOUC3 |
|
41 |
Mk3GRxZ8JFSLnyrNmxHBy61OLgUp1F7iuwXh8tT8Rw21YzbpHTutrhXw3PEtoRPr |
|
42 |
X04s2N3pi6uU72W2MITorrhZSDU3FsdcX7KVxh9pEcqKsvYIPIWEyQbb/EVDXwhC |
|
43 |
K4TAsmmhsGaU/BB0WHkbzU5KlZqYQHfnoj5pmTLoeYj81Z8D5T9fceImcyuWLl8t |
|
44 |
OHscbAjNpkS1X2vYNwMLhCAM7YhE63RjFo2G5fxx64wdgs+mllR0PCc3Nli+JPWE |
|
45 |
LQ/KmVMPY6JH506WbZkEVgb9Gfuj6yASpu5zJxf3MQKBgQDwktbzKmK3zwSNwLdx |
|
46 |
zZy4AnQb2PNKuryYts9R+nKqWnE5vJwekBIV9vsASK0Aoh4UWP9cUjQV3gZH3HIR |
|
47 |
9xP8nJwpOLU7c/wPP81HUl7nf68MVP5OzemHY/78fYR8T3Jf/uFdW3AOQrCBjGIV |
|
48 |
Zbjb5SisqCS2ODc7DSuqzliBMwKBgQDIEz7nhXyMilTlgiWw7iRm9RKWykVSjEuR |
|
49 |
gxHAqPOkg/HxacfDYs/O+qteSdW/V7zzZ2QHMyr52BaL5sEz0lC7W3O1nfTS94VM |
|
50 |
YK+MakRBImC5uZvpea+30vJbVLODnKskhsWF8aCqLi5KJvkR5aE0OySKXBur7Czq |
|
51 |
X2/gK6jfGQKBgAbU1J/BE16O1V1FHLBxm0KqZyunRHlZxiM8BbUZPIpT2SU/kttX |
|
52 |
UfwnsEb4yVjcQahoQpAXkX0RefIuc1rJPlsNA240Owk+KOkx8Z1V3HYMbScXfsU0 |
|
53 |
Ga6Li2EWG14AT4okTbf98bel8ycqmlprMg2kezwz5h76h674l8XY6DB7AoGBAKJL |
|
54 |
Aka5gBtchpsZJEvOENc3Spnof60DQrVJVZgrNF+p7BMA1FsIhzsFGQdF603n9MyY |
|
55 |
fIpelijOgROA3g2UN4qTF1wmQhbzUzxuXVgQR0dyhHWDOxZ7b+8z/QXawjcrWaQq |
|
56 |
coVBSCtjhIb/8B/1XftJUk2tg4DE9nYzbkOwBq7ZAoGAPF1KSY9TzY7g8gmQzYuY |
|
57 |
+CCHM3mR9PjSHhJS5VWTwGfw3zZpLptwxzmJAoi1DyrJVhBP17ADVitYK4GquiSB |
|
58 |
z5aZ2AnUBc/xueO2ixL3ROOXYAeakrRAQ38G13ibYe2dQpv6/CTsZJOttnCErn54 |
|
59 |
3k2Y/kDV+c1uNbzyPiK2qaM= |
|
60 |
-----END PRIVATE KEY----- |
|
61 |
''' |
|
62 | ||
63 | ||
64 |
@pytest.fixture |
|
65 |
def cryptor(db): |
|
66 |
return Cryptor.objects.create(slug='test', |
|
67 |
private_key=PRIVATE_KEY, |
|
68 |
public_key=PUBLIC_KEY) |
|
69 | ||
70 | ||
71 |
def test_cryptor_bad_keys(db): |
|
72 |
bad1 = Cryptor(slug='bad1', title='t', description='d', private_key='badkey') |
|
73 |
with pytest.raises(ValidationError) as e1: |
|
74 |
bad1.full_clean() |
|
75 |
assert e1.value.messages == ['invalid RSA key (RSA key format is not supported)'] |
|
76 |
bad2 = Cryptor(slug='bad2', title='t', description='d', public_key='badkey') |
|
77 |
with pytest.raises(ValidationError) as e2: |
|
78 |
bad2.full_clean() |
|
79 |
assert e2.value.messages == ['invalid RSA key (RSA key format is not supported)'] |
|
80 | ||
81 | ||
82 |
def test_cryptor_restricted_access(app, cryptor): |
|
83 |
endpoint = utils.generic_endpoint_url('cryptor', 'file-encrypt', slug=cryptor.slug) |
|
84 |
assert endpoint == '/cryptor/test/file-encrypt' |
|
85 |
resp = app.get(endpoint, status=405) |
|
86 |
resp = app.post_json(endpoint, params={"foo": "bar"}, status=403) |
|
87 |
assert resp.json['err'] == 1 |
|
88 |
assert 'PermissionDenied' in resp.json['err_class'] |
|
89 | ||
90 |
endpoint = utils.generic_endpoint_url('cryptor', 'file-decrypt', slug=cryptor.slug) + '/uuid' |
|
91 |
assert endpoint == '/cryptor/test/file-decrypt/uuid' |
|
92 |
resp = app.post_json(endpoint, params={"foo": "bar"}, status=405) |
|
93 |
resp = app.get(endpoint, status=403) |
|
94 |
assert resp.json['err'] == 1 |
|
95 |
assert 'PermissionDenied' in resp.json['err_class'] |
|
96 | ||
97 | ||
98 |
def test_cryptor_bad_requests(app, cryptor): |
|
99 |
# full opened access |
|
100 |
api = ApiUser.objects.create(username='all', keytype='', key='') |
|
101 |
obj_type = ContentType.objects.get_for_model(cryptor) |
|
102 |
AccessRight.objects.create(codename='can_encrypt', apiuser=api, resource_type=obj_type, |
|
103 |
resource_pk=cryptor.pk) |
|
104 |
AccessRight.objects.create(codename='can_decrypt', apiuser=api, resource_type=obj_type, |
|
105 |
resource_pk=cryptor.pk) |
|
106 | ||
107 |
endpoint = utils.generic_endpoint_url('cryptor', 'file-encrypt', slug=cryptor.slug) |
|
108 |
for bad_payload in ('error', |
|
109 |
{"foo": "bar"}, |
|
110 |
["not", "a", "dict"], |
|
111 |
{"file": {"filename": "f", "content_type": "ct"}}, |
|
112 |
{"file": {"filename": "f", "content_type": "ct", "content": None}}, |
|
113 |
{"file": {"filename": "f", "content_type": "ct", "content": "NotBase64"}}, |
|
114 |
): |
|
115 |
resp = app.post_json(endpoint, params=bad_payload, status=400) |
|
116 |
assert resp.json['err'] == 1 |
|
117 | ||
118 |
endpoint = utils.generic_endpoint_url('cryptor', 'file-decrypt', slug=cryptor.slug) |
|
119 |
endpoint = endpoint + '/bad-uuid' |
|
120 |
resp = app.get(endpoint, status=404) |
|
121 | ||
122 | ||
123 |
def test_cryptor_encrypt_decrypt(app, cryptor): |
|
124 |
api = ApiUser.objects.create(username='all', keytype='', key='') |
|
125 |
obj_type = ContentType.objects.get_for_model(cryptor) |
|
126 |
AccessRight.objects.create(codename='can_encrypt', apiuser=api, resource_type=obj_type, |
|
127 |
resource_pk=cryptor.pk) |
|
128 |
AccessRight.objects.create(codename='can_decrypt', apiuser=api, resource_type=obj_type, |
|
129 |
resource_pk=cryptor.pk) |
|
130 | ||
131 |
# encrypt |
|
132 |
endpoint = utils.generic_endpoint_url('cryptor', 'file-encrypt', slug=cryptor.slug) |
|
133 |
content = force_text(base64.b64encode(b'this is foo and bar')) |
|
134 |
payload = {"file": {"filename": "foo.txt", "content_type": "text/plain", "content": content}} |
|
135 | ||
136 |
resp = app.post_json(endpoint, params=payload, status=200) |
|
137 |
assert resp.json['err'] == 0 |
|
138 |
assert CryptedFile.objects.count() == 1 |
|
139 |
cfile = CryptedFile.objects.first() |
|
140 |
assert resp.json['data']['redirect_url'].endswith( |
|
141 |
'/cryptor/%s/file-decrypt/%s' % (cryptor.slug, cfile.uuid)) |
|
142 |
cfile.delete() |
|
143 | ||
144 |
# encrypt with another redirect url |
|
145 |
cryptor.redirect_url_base = 'https://foo/bar/' |
|
146 |
cryptor.save() |
|
147 |
resp = app.post_json(endpoint, params=payload, status=200) |
|
148 |
assert resp.json['err'] == 0 |
|
149 |
assert CryptedFile.objects.count() == 1 |
|
150 |
cfile = CryptedFile.objects.first() |
|
151 |
assert resp.json['data']['redirect_url'] == 'https://foo/bar/%s' % cfile.uuid |
|
152 | ||
153 |
# remove public key = cannot encrypt |
|
154 |
cryptor.public_key = '' |
|
155 |
cryptor.save() |
|
156 |
resp = app.post_json(endpoint, params=payload, status=200) |
|
157 |
assert resp.json['err'] == 1 |
|
158 |
assert resp.json['err_desc'] == 'missing public key' |
|
159 | ||
160 |
# decrypt |
|
161 |
endpoint = utils.generic_endpoint_url('cryptor', 'file-decrypt', slug=cryptor.slug) |
|
162 |
endpoint = endpoint + '/' + str(cfile.uuid) |
|
163 |
resp = app.get(endpoint, status=200) |
|
164 |
assert resp.content_type == 'text/plain' |
|
165 |
assert resp.content == b'this is foo and bar' |
|
166 |
assert resp.headers['Content-Disposition'] == 'inline; filename="foo.txt"' |
|
167 | ||
168 |
# remove all files does not remove data+metadata(json) files: decrypt still works |
|
169 |
CryptedFile.objects.all().delete() |
|
170 |
assert CryptedFile.objects.count() == 0 |
|
171 |
resp = app.get(endpoint, status=200) |
|
172 |
assert resp.content_type == 'text/plain' |
|
173 |
assert resp.content == b'this is foo and bar' |
|
174 |
assert resp.headers['Content-Disposition'] == 'inline; filename="foo.txt"' |
|
175 | ||
176 |
# remove private key = cannot decrypt |
|
177 |
cryptor.private_key = '' |
|
178 |
cryptor.save() |
|
179 |
endpoint = endpoint + '/' + str(cfile.uuid) |
|
180 |
resp = app.get(endpoint, status=200) |
|
181 |
assert resp.json['err'] == 1 |
|
182 |
assert resp.json['err_desc'] == 'missing private key' |
|
0 |
- |