0001-misc-accept-non-local-storage-as-attachments-in-evol.patch
tests/test_upload_storage.py | ||
---|---|---|
11 | 11 |
from wcs.formdef import FormDef |
12 | 12 |
from wcs.categories import Category |
13 | 13 |
from wcs import fields |
14 |
from wcs.wf.register_comment import RegisterCommenterWorkflowStatusItem |
|
14 | 15 | |
15 | 16 |
from utilities import get_app, login, create_temporary_pub, clean_temporary_pub |
16 | 17 | |
... | ... | |
221 | 222 |
'redirect_url': 'https://crypto.example.net/', |
222 | 223 |
'file_size': 1834 |
223 | 224 |
} |
225 | ||
226 | ||
227 |
@mock.patch('wcs.wscalls.call_webservice') |
|
228 |
def test_remoteopaque_in_attachmentevolutionpart(wscall, pub): |
|
229 |
create_user_and_admin(pub) |
|
230 |
formdef = create_formdef() |
|
231 |
formdef.fields[1].storage = 'remote-bo' |
|
232 |
formdef.store() |
|
233 |
formdef.data_class().wipe() |
|
234 | ||
235 |
wscall.return_value = None, 200, json.dumps( |
|
236 |
{"err": 0, "data": {"redirect_url": "https://crypto.example.net/"}}) |
|
237 | ||
238 |
image_content = open(os.path.join(os.path.dirname(__file__), |
|
239 |
'image-with-gps-data.jpeg'), 'rb').read() |
|
240 |
upload_0 = Upload('local-file.jpg', image_content, 'image/jpeg') |
|
241 |
upload_1 = Upload('remote-file.jpg', image_content, 'image/jpeg') |
|
242 | ||
243 |
user_app = login(get_app(pub), username='foo', password='foo') |
|
244 |
admin_app = login(get_app(pub), username='admin', password='admin') |
|
245 | ||
246 |
resp = user_app.get('/test/') |
|
247 |
resp.forms[0]['f0$file'] = upload_0 |
|
248 |
resp.forms[0]['f1$file'] = upload_1 |
|
249 |
resp = resp.forms[0].submit('submit') |
|
250 |
assert 'Check values then click submit.' in resp.text |
|
251 |
resp = resp.forms[0].submit('submit') |
|
252 |
assert resp.status_int == 302 |
|
253 |
resp = resp.follow() |
|
254 |
assert 'The form has been recorded' in resp.text |
|
255 | ||
256 |
# register a comment = create a AttachmentEvolutionPart |
|
257 |
formdata = formdef.data_class().select()[0] |
|
258 |
item = RegisterCommenterWorkflowStatusItem() |
|
259 |
item.attachments = ['form_var_file_raw', 'form_var_remote_file_raw'] |
|
260 |
item.comment = 'text in form history' |
|
261 |
item.perform(formdata) |
|
262 | ||
263 |
# links on frontoffice: no link to remote file |
|
264 |
resp = user_app.get('/test/%s/' % formdata.id) |
|
265 |
assert resp.text.count('<p class="wf-attachment"><a href="attachment?f=') == 1 |
|
266 |
assert resp.text.count('<p class="wf-attachment"><a href="attachment?f=uuid-') == 0 |
|
267 |
# links on backoffice: links to local and remote file |
|
268 |
resp = admin_app.get('/backoffice/management/test/%s/' % formdata.id) |
|
269 |
assert resp.text.count('<p class="wf-attachment"><a href="attachment?f=') == 2 |
|
270 |
assert resp.text.count('<p class="wf-attachment"><a href="attachment?f=uuid-') == 1 |
|
271 | ||
272 |
local_file = formdata.evolution[-1].parts[0] |
|
273 |
local_file_id = os.path.basename(local_file.filename) |
|
274 |
remote_file = formdata.evolution[-1].parts[1] |
|
275 |
remote_file_id = remote_file.filename |
|
276 |
assert not local_file_id.startswith('uuid-') |
|
277 |
assert remote_file_id.startswith('uuid-') |
|
278 | ||
279 |
# clic on remote file in frontoffice: redirect... but forbidden |
|
280 |
resp = user_app.get('/test/%s/attachment?f=%s' % (formdata.id, remote_file_id)) |
|
281 |
assert resp.status_int == 302 |
|
282 |
resp = resp.follow(status=404) |
|
283 |
# clic in backoffice, redirect to decryption system |
|
284 |
resp = admin_app.get('/backoffice/management/test/%s/attachment?f=%s' % (formdata.id, remote_file_id)) |
|
285 |
assert resp.status_int == 302 |
|
286 |
resp = resp.follow() |
|
287 |
assert resp.location.startswith('https://crypto.example.net/') |
|
288 |
assert '&signature=' in resp.location |
wcs/qommon/upload_storage.py | ||
---|---|---|
41 | 41 |
def get_file_pointer(self): |
42 | 42 |
if 'fp' in self.__dict__ and self.__dict__.get('fp') is not None: |
43 | 43 |
return self.__dict__.get('fp') |
44 |
elif hasattr(self, 'qfilename'):
|
|
44 |
elif getattr(self, 'qfilename', None):
|
|
45 | 45 |
basedir = os.path.join(get_publisher().app_dir, 'uploads') |
46 | 46 |
self.fp = open(os.path.join(basedir, self.qfilename), 'rb') |
47 | 47 |
return self.fp |
wcs/workflows.py | ||
---|---|---|
39 | 39 |
from quixote.html import htmltext |
40 | 40 |
from .qommon import errors |
41 | 41 |
from .qommon.template import Template, TemplateError |
42 |
from .qommon.upload_storage import PicklableUpload, get_storage_object |
|
42 | 43 | |
43 | 44 |
from .conditions import Condition |
44 | 45 |
from .roles import Role, logged_users_role, get_user_roles |
... | ... | |
117 | 118 | |
118 | 119 |
@property |
119 | 120 |
def content(self): |
120 |
return self.attachment_evolution_part.get_file_pointer().read() |
|
121 |
fp = self.attachment_evolution_part.get_file_pointer() |
|
122 |
if fp: |
|
123 |
return fp.read() |
|
124 |
return b'' |
|
121 | 125 | |
122 | 126 |
@property |
123 | 127 |
def b64_content(self): |
... | ... | |
168 | 172 |
content_type = None |
169 | 173 |
charset = None |
170 | 174 |
varname = None |
175 |
storage = None |
|
176 |
storage_attrs = None |
|
171 | 177 | |
172 | 178 |
def __init__(self, base_filename, fp, orig_filename=None, content_type=None, |
173 |
charset=None, varname=None): |
|
179 |
charset=None, varname=None, storage=None, storage_attrs=None):
|
|
174 | 180 |
self.base_filename = base_filename |
175 | 181 |
self.orig_filename = orig_filename or base_filename |
176 | 182 |
self.content_type = content_type |
177 | 183 |
self.charset = charset |
178 | 184 |
self.fp = fp |
179 | 185 |
self.varname = varname |
186 |
self.storage = storage |
|
187 |
self.storage_attrs = storage_attrs |
|
180 | 188 | |
181 | 189 |
@classmethod |
182 | 190 |
def from_upload(cls, upload, varname=None): |
183 | 191 |
return AttachmentEvolutionPart( |
184 | 192 |
upload.base_filename, |
185 |
upload.fp,
|
|
193 |
getattr(upload, 'fp', None),
|
|
186 | 194 |
upload.orig_filename, |
187 | 195 |
upload.content_type, |
188 | 196 |
upload.charset, |
189 |
varname=varname) |
|
197 |
varname=varname, |
|
198 |
storage=getattr(upload, 'storage', None), |
|
199 |
storage_attrs=getattr(upload, 'storage_attrs', None)) |
|
190 | 200 | |
191 | 201 |
def get_file_pointer(self): |
202 |
if self.filename.startswith('uuid-'): |
|
203 |
return None |
|
192 | 204 |
return open(self.filename, 'rb') |
193 | 205 | |
194 | 206 |
def __getstate__(self): |
195 | 207 |
odict = self.__dict__.copy() |
196 |
if not 'fp' in odict: |
|
208 |
if not odict.get('fp'): |
|
209 |
if 'filename' not in odict: |
|
210 |
# we need a filename as an identifier: create one from nothing |
|
211 |
# instead of file_digest(self.fp) (see below) |
|
212 |
odict['filename'] = 'uuid-%s' % uuid.uuid4() |
|
213 |
self.filename = odict['filename'] |
|
197 | 214 |
return odict |
198 | 215 | |
199 | 216 |
del odict['fp'] |
... | ... | |
201 | 218 |
if not os.path.exists(dirname): |
202 | 219 |
os.mkdir(dirname) |
203 | 220 | |
204 |
if not 'filename' in odict: |
|
221 |
# there is not filename, or it was a temporary one: create it |
|
222 |
if not 'filename' in odict or odict['filename'].startswith('uuid-'): |
|
205 | 223 |
filename = file_digest(self.fp) |
206 | 224 |
dirname = os.path.join(dirname, filename[:4]) |
207 | 225 |
if not os.path.exists(dirname): |
... | ... | |
213 | 231 |
return odict |
214 | 232 | |
215 | 233 |
def view(self): |
216 |
return htmltext('<p class="wf-attachment"><a href="attachment?f=%s">%s</a>' % ( |
|
217 |
os.path.basename(self.filename), self.orig_filename)) |
|
234 |
show_link = True |
|
235 |
if self.has_redirect_url(): |
|
236 |
is_in_backoffice = bool(get_request() and get_request().is_in_backoffice()) |
|
237 |
show_link = bool(self.get_redirect_url(backoffice=is_in_backoffice)) |
|
238 |
if show_link: |
|
239 |
return htmltext('<p class="wf-attachment"><a href="attachment?f=%s">%s</a></p>' % ( |
|
240 |
os.path.basename(self.filename), self.orig_filename)) |
|
241 |
else: |
|
242 |
return htmltext('<p class="wf-attachment">%s</p>' % self.orig_filename) |
|
218 | 243 | |
219 | 244 |
@classmethod |
220 | 245 |
def get_substitution_variables(cls, formdata): |
... | ... | |
224 | 249 |
# mimic PicklableUpload methods: |
225 | 250 | |
226 | 251 |
def can_thumbnail(self): |
227 |
return True
|
|
252 |
return get_storage_object(getattr(self, 'storage', None)).can_thumbnail(self)
|
|
228 | 253 | |
229 | 254 |
def has_redirect_url(self): |
230 |
return False
|
|
255 |
return get_storage_object(getattr(self, 'storage', None)).has_redirect_url(self)
|
|
231 | 256 | |
232 |
def get_redirect_url(self, upload, backoffice=False):
|
|
233 |
# should never be called, has_redirect_url is False
|
|
234 |
raise AssertionError('no get_redirect_url on AttachmentEvolutionPart object')
|
|
257 |
def get_redirect_url(self, backoffice=False): |
|
258 |
return get_storage_object(getattr(self, 'storage', None)).get_redirect_url(self,
|
|
259 |
backoffice=backoffice)
|
|
235 | 260 | |
236 | 261 | |
237 | 262 |
class DuplicateGlobalActionNameError(Exception): |
... | ... | |
2179 | 2204 |
if not picklableupload: |
2180 | 2205 |
continue |
2181 | 2206 | |
2182 |
try: |
|
2183 |
# convert any value to a PicklableUpload; it will ususally |
|
2184 |
# be a dict like one provided by qommon/evalutils:attachment() |
|
2185 |
picklableupload = FileField.convert_value_from_anything(picklableupload) |
|
2186 |
except ValueError: |
|
2187 |
get_publisher().notify_of_exception(sys.exc_info(), |
|
2188 |
context='[workflow/attachments]') |
|
2189 |
continue |
|
2207 |
if not isinstance(picklableupload, PicklableUpload): |
|
2208 |
try: |
|
2209 |
# convert any value to a PicklableUpload; it will ususally |
|
2210 |
# be a dict like one provided by qommon/evalutils:attachment() |
|
2211 |
picklableupload = FileField.convert_value_from_anything(picklableupload) |
|
2212 |
except ValueError: |
|
2213 |
get_publisher().notify_of_exception(sys.exc_info(), |
|
2214 |
context='[workflow/attachments]') |
|
2215 |
continue |
|
2190 | 2216 | |
2191 | 2217 |
uploads.append(picklableupload) |
2192 | 2218 | |
2193 |
- |