0001-upload_storage-add-frontoffice-backoffice-link-prese.patch
tests/test_upload_storage.py | ||
---|---|---|
1 | 1 |
# -*- coding: utf-8 -*- |
2 | 2 | |
3 | 3 |
import os |
4 |
import json |
|
4 | 5 |
import mock |
5 | 6 |
import pytest |
6 | 7 | |
... | ... | |
31 | 32 |
pub.write_cfg() |
32 | 33 |
open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w').write(''' |
33 | 34 |
[storage-remote] |
34 |
label = test out storage
|
|
35 |
label = remote storage
|
|
35 | 36 |
class = wcs.qommon.upload_storage.RemoteOpaqueUploadStorage |
36 |
ws = https://crypto.example.net/ws/ |
|
37 |
ws = https://crypto.example.net/ws1/ |
|
38 | ||
39 |
[storage-remote-bo] |
|
40 |
label = remote storage backoffice only |
|
41 |
class = wcs.qommon.upload_storage.RemoteOpaqueUploadStorage |
|
42 |
ws = https://crypto.example.net/ws2/ |
|
43 |
frontoffice_redirect = false |
|
37 | 44 | |
38 | 45 |
[api-secrets] |
39 | 46 |
crypto.example.net = 1234 |
... | ... | |
92 | 99 |
assert formdef.fields[0].storage == formdef.fields[1].storage == 'default' |
93 | 100 | |
94 | 101 |
assert 'remote' in pub.get_site_storages() |
102 | ||
95 | 103 |
formdef.fields[1].storage = 'remote' |
96 | 104 |
formdef.store() |
97 | 105 |
assert formdef.fields[0].storage == 'default' |
98 | 106 |
assert formdef.fields[1].storage == 'remote' |
99 | 107 | |
100 |
wscall.return_value = None, 200, '{"err": 0, "data": {"redirect_url": "https://crypto.example.net/"}}' |
|
108 |
wscall.return_value = None, 200, json.dumps( |
|
109 |
{"err": 0, "data": {"redirect_url": "https://crypto.example.net/"}}) |
|
101 | 110 | |
102 |
image_content = open(os.path.join(os.path.dirname(__file__), 'image-with-gps-data.jpeg'), 'rb').read() |
|
111 |
image_content = open(os.path.join(os.path.dirname(__file__), |
|
112 |
'image-with-gps-data.jpeg'), 'rb').read() |
|
103 | 113 | |
104 | 114 |
upload_0 = Upload('file.jpg', image_content, 'image/jpeg') |
105 | 115 |
upload_1 = Upload('remote.jpg', image_content, 'image/jpeg') |
116 | ||
106 | 117 |
resp = get_app(pub).get('/test/') |
107 | 118 |
resp.forms[0]['f0$file'] = upload_0 |
108 | 119 |
resp.forms[0]['f1$file'] = upload_1 |
... | ... | |
113 | 124 |
resp = resp.follow() |
114 | 125 |
assert 'The form has been recorded' in resp.text |
115 | 126 | |
116 |
assert resp.text.count('thumbnail=1') == 1 # thumbnail only for first file |
|
127 |
assert 'download?f=0&thumbnail=1' in resp.text |
|
128 |
assert 'download?f=1&thumbnail=1' not in resp.text # no thumbnail for remote storage |
|
129 |
assert 'href="download?f=0"' in resp.text |
|
130 |
assert 'href="download?f=1"' in resp.text |
|
117 | 131 | |
118 | 132 |
resp = resp.click('remote.jpg') |
119 | 133 |
assert resp.location.startswith('https://crypto.example.net/') |
120 | 134 |
assert '&signature=' in resp.location |
121 | 135 | |
136 |
# no links, via webservice |
|
137 |
wscall.return_value = None, 200, json.dumps( |
|
138 |
{"err": 0, "data": { |
|
139 |
"redirect_url": "https://crypto.example.net/", |
|
140 |
"backoffice_redirect_url": None, |
|
141 |
"frontoffice_redirect_url": None, |
|
142 |
}}) |
|
143 |
resp = get_app(pub).get('/test/') |
|
144 |
resp.forms[0]['f0$file'] = upload_0 |
|
145 |
resp.forms[0]['f1$file'] = upload_1 |
|
146 |
resp = resp.forms[0].submit('submit') |
|
147 |
resp = resp.forms[0].submit('submit') |
|
148 |
resp = resp.follow() |
|
149 |
assert 'The form has been recorded' in resp.text |
|
150 |
assert 'download?f=0&thumbnail=1' in resp.text |
|
151 |
assert 'download?f=1&thumbnail=1' not in resp.text |
|
152 |
assert 'href="download?f=0"' in resp.text |
|
153 |
assert 'href="download?f=1"' not in resp.text # no link on frontoffice |
|
122 | 154 |
admin_app = login(get_app(pub), username='admin', password='admin') |
155 |
resp = admin_app.get('/backoffice/management/test/2/') |
|
156 |
assert 'download?f=0&thumbnail=1' in resp.text |
|
157 |
assert 'download?f=1&thumbnail=1' not in resp.text |
|
158 |
assert 'href="download?f=0"' in resp.text |
|
159 |
assert 'href="download?f=1"' not in resp.text # no link on backoffice |
|
160 |
admin_app.get('/backoffice/management/test/2/download?f=1', status=404) # cannot access |
|
161 | ||
162 |
# link only on backoffice, via site-options |
|
163 |
formdef.fields[1].storage = 'remote-bo' |
|
164 |
formdef.store() |
|
165 |
wscall.return_value = None, 200, json.dumps( |
|
166 |
{"err": 0, "data": {"redirect_url": "https://crypto.example.net/"}}) |
|
167 |
resp = get_app(pub).get('/test/') |
|
168 |
resp.forms[0]['f0$file'] = upload_0 |
|
169 |
resp.forms[0]['f1$file'] = upload_1 |
|
170 |
resp = resp.forms[0].submit('submit') |
|
171 |
resp = resp.forms[0].submit('submit') |
|
172 |
resp = resp.follow() |
|
173 |
assert 'The form has been recorded' in resp.text |
|
174 |
assert 'download?f=0&thumbnail=1' in resp.text |
|
175 |
assert 'download?f=1&thumbnail=1' not in resp.text # no thumbnail for remote storage |
|
176 |
assert 'href="download?f=0"' in resp.text |
|
177 |
assert 'href="download?f=1"' not in resp.text # no link on frontoffice |
|
178 |
# go to backoffice |
|
179 |
resp = admin_app.get('/backoffice/management/test/3/') |
|
180 |
assert 'download?f=0&thumbnail=1' in resp.text |
|
181 |
assert 'download?f=1&thumbnail=1' not in resp.text # no thumbnail for remote storage |
|
182 |
assert 'href="download?f=0"' in resp.text |
|
183 |
assert 'href="download?f=1"' in resp.text # link is present on backoffice |
|
184 | ||
185 |
# api access (json export) |
|
123 | 186 |
resp = admin_app.get('/api/forms/test/1/', status=200) |
124 | 187 |
assert resp.json['fields']['file']['content'].startswith('/9j/4AAQSkZJRg') |
125 | 188 |
assert resp.json['fields']['remote_file']['content'] == '' |
wcs/fields.py | ||
---|---|---|
1038 | 1038 |
return self.get_view_value(value, include_image_thumbnail=False) |
1039 | 1039 | |
1040 | 1040 |
def get_view_value(self, value, include_image_thumbnail=True): |
1041 |
show_link = True |
|
1042 |
if value.has_redirect_url(): |
|
1043 |
is_in_backoffice = bool(get_request() and get_request().is_in_backoffice()) |
|
1044 |
show_link = bool(value.get_redirect_url(backoffice=is_in_backoffice)) |
|
1041 | 1045 |
t = TemplateIO(html=True) |
1042 | 1046 |
t += htmltext('<div class="file-field">') |
1043 |
t += htmltext('<a download="%s" href="[download]?f=%s">') % (value.base_filename, self.id) |
|
1047 |
if show_link: |
|
1048 |
t += htmltext('<a download="%s" href="[download]?f=%s">') % (value.base_filename, self.id) |
|
1044 | 1049 |
if include_image_thumbnail and value.can_thumbnail(): |
1045 | 1050 |
t += htmltext('<img alt="" src="[download]?f=%s&thumbnail=1"/>') % self.id |
1046 | 1051 |
t += htmltext('<span>%s</span>') % value |
1047 |
t += htmltext('</a></div>') |
|
1052 |
if show_link: |
|
1053 |
t += htmltext('</a>') |
|
1054 |
t += htmltext('</div>') |
|
1048 | 1055 |
return t.getvalue() |
1049 | 1056 | |
1050 | 1057 |
def get_csv_value(self, value, **kwargs): |
wcs/forms/common.py | ||
---|---|---|
67 | 67 |
if component and component != file.base_filename: |
68 | 68 |
raise errors.TraversalError() |
69 | 69 | |
70 |
if file.has_redirect_url(): |
|
71 |
redirect_url = file.get_redirect_url(backoffice=get_request().is_in_backoffice()) |
|
72 |
if not redirect_url: |
|
73 |
raise errors.TraversalError() |
|
74 |
redirect_url = sign_url_auto_orig(redirect_url) |
|
75 |
return redirect(redirect_url) |
|
76 | ||
70 | 77 |
response = get_response() |
71 | 78 | |
72 | 79 |
if self.thumbnails: |
... | ... | |
81 | 88 |
else: |
82 | 89 |
raise errors.TraversalError() |
83 | 90 | |
84 |
if hasattr(file, 'storage_attrs'): # remote storage |
|
85 |
if not file.storage_attrs.get('redirect_url'): |
|
86 |
raise errors.TraversalError() |
|
87 |
redirect_url = sign_url_auto_orig(file.storage_attrs['redirect_url']) |
|
88 |
return redirect(redirect_url) |
|
89 | ||
90 | 91 |
if file.content_type: |
91 | 92 |
response.set_content_type(file.content_type) |
92 | 93 |
else: |
... | ... | |
683 | 684 |
if not hasattr(file, 'content_type'): |
684 | 685 |
raise errors.TraversalError() |
685 | 686 | |
687 |
if file.has_redirect_url(): |
|
688 |
redirect_url = file.get_redirect_url(backoffice=get_request().is_in_backoffice()) |
|
689 |
if not redirect_url: |
|
690 |
raise errors.TraversalError() |
|
691 |
redirect_url = sign_url_auto_orig(redirect_url) |
|
692 |
return redirect(redirect_url) |
|
693 | ||
686 | 694 |
file_url = 'files/%s/' % fn |
687 | 695 | |
688 | 696 |
if get_request().form.get('thumbnail') == '1': |
... | ... | |
691 | 699 |
else: |
692 | 700 |
raise errors.TraversalError() |
693 | 701 | |
694 |
if hasattr(file, 'storage_attrs'): # remote storage |
|
695 |
if not file.storage_attrs.get('redirect_url'): |
|
696 |
raise errors.TraversalError() |
|
697 |
redirect_url = sign_url_auto_orig(file.storage_attrs['redirect_url']) |
|
698 |
return redirect(redirect_url) |
|
699 | ||
700 | 702 |
if getattr(file, 'base_filename'): |
701 | 703 |
file_url += file.base_filename |
702 | 704 |
return redirect(file_url) |
wcs/qommon/upload_storage.py | ||
---|---|---|
80 | 80 |
def can_thumbnail(self): |
81 | 81 |
return get_storage_object(getattr(self, 'storage', None)).can_thumbnail(self) |
82 | 82 | |
83 |
def has_redirect_url(self): |
|
84 |
return get_storage_object(getattr(self, 'storage', None)).has_redirect_url(self) |
|
85 | ||
86 |
def get_redirect_url(self, backoffice=False): |
|
87 |
return get_storage_object(getattr(self, 'storage', None)).get_redirect_url(self, |
|
88 |
backoffice=backoffice) |
|
89 | ||
83 | 90 | |
84 | 91 |
class UploadStorageError(Exception): |
85 | 92 |
pass |
... | ... | |
125 | 132 |
def can_thumbnail(self, upload): |
126 | 133 |
return can_thumbnail(upload.content_type) |
127 | 134 | |
135 |
def has_redirect_url(self, upload): |
|
136 |
return False |
|
137 | ||
138 |
def get_redirect_url(self, upload, backoffice=False): |
|
139 |
raise NotImplementedError('no get_redirect_url on UploadStorage object') |
|
140 | ||
128 | 141 | |
129 | 142 |
class RemoteOpaqueUploadStorage(object): |
130 |
def __init__(self, ws, **kwargs): |
|
143 |
def __init__(self, ws, frontoffice_redirect='true', backoffice_redirect='true', **kwargs):
|
|
131 | 144 |
self.ws = ws |
145 |
self.frontoffice_redirect = bool(frontoffice_redirect == 'true') |
|
146 |
self.backoffice_redirect = bool(backoffice_redirect == 'true') |
|
132 | 147 | |
133 | 148 |
def save_tempfile(self, upload): |
134 | 149 |
if getattr(upload, 'storage_attrs', None): |
... | ... | |
190 | 205 |
def can_thumbnail(self, upload): |
191 | 206 |
return False |
192 | 207 | |
208 |
def has_redirect_url(self, upload): |
|
209 |
return True |
|
210 | ||
211 |
def get_redirect_url(self, upload, backoffice=False): |
|
212 |
if backoffice: |
|
213 |
if not self.backoffice_redirect: |
|
214 |
return None |
|
215 |
if 'backoffice_redirect_url' in upload.storage_attrs: |
|
216 |
return upload.storage_attrs['backoffice_redirect_url'] |
|
217 |
else: |
|
218 |
if not self.frontoffice_redirect: |
|
219 |
return None |
|
220 |
if 'frontoffice_redirect_url' in upload.storage_attrs: |
|
221 |
return upload.storage_attrs['frontoffice_redirect_url'] |
|
222 |
return upload.storage_attrs.get('redirect_url') |
|
223 | ||
193 | 224 | |
194 | 225 |
def get_storage_object(storage): |
195 | 226 |
if not storage or storage == 'default': |
wcs/workflows.py | ||
---|---|---|
223 | 223 |
return {'attachments': AttachmentsSubstitutionProxy(formdata), |
224 | 224 |
'form_attachments': AttachmentsSubstitutionProxy(formdata)} |
225 | 225 | |
226 |
# mimic PicklableUpload methods: |
|
227 | ||
228 |
def can_thumbnail(self): |
|
229 |
return True |
|
230 | ||
231 |
def has_redirect_url(self): |
|
232 |
return False |
|
233 | ||
234 |
def get_redirect_url(self, upload, backoffice=False): |
|
235 |
raise NotImplementedError('no get_redirect_url on AttachmentEvolutionPart object') |
|
236 | ||
226 | 237 | |
227 | 238 |
class DuplicateGlobalActionNameError(Exception): |
228 | 239 |
pass |
229 |
- |