Projet

Général

Profil

0001-misc-accept-non-local-storage-as-attachments-in-evol.patch

Thomas Noël, 16 octobre 2020 17:12

Télécharger (10,7 ko)

Voir les différences:

Subject: [PATCH] misc: accept non-local storage as attachments in evolution
 (#47704)

 tests/test_upload_storage.py | 65 ++++++++++++++++++++++++++++++++++
 wcs/qommon/upload_storage.py |  2 +-
 wcs/workflows.py             | 68 +++++++++++++++++++++++++-----------
 3 files changed, 113 insertions(+), 22 deletions(-)
tests/test_upload_storage.py
11 11
from wcs.formdef import FormDef
12 12
from wcs.categories import Category
13 13
from wcs import fields
14
from wcs.wf.register_comment import RegisterCommenterWorkflowStatusItem
14 15

  
15 16
from utilities import get_app, login, create_temporary_pub, clean_temporary_pub
16 17

  
......
221 222
        'redirect_url': 'https://crypto.example.net/',
222 223
        'file_size': 1834
223 224
    }
225

  
226

  
227
@mock.patch('wcs.wscalls.call_webservice')
228
def test_remoteopaque_in_attachmentevolutionpart(wscall, pub):
229
    create_user_and_admin(pub)
230
    formdef = create_formdef()
231
    formdef.fields[1].storage = 'remote-bo'
232
    formdef.store()
233
    formdef.data_class().wipe()
234

  
235
    wscall.return_value = None, 200, json.dumps(
236
            {"err": 0, "data": {"redirect_url": "https://crypto.example.net/"}})
237

  
238
    image_content = open(os.path.join(os.path.dirname(__file__),
239
        'image-with-gps-data.jpeg'), 'rb').read()
240
    upload_0 = Upload('local-file.jpg', image_content, 'image/jpeg')
241
    upload_1 = Upload('remote-file.jpg', image_content, 'image/jpeg')
242

  
243
    user_app = login(get_app(pub), username='foo', password='foo')
244
    admin_app = login(get_app(pub), username='admin', password='admin')
245

  
246
    resp = user_app.get('/test/')
247
    resp.forms[0]['f0$file'] = upload_0
248
    resp.forms[0]['f1$file'] = upload_1
249
    resp = resp.forms[0].submit('submit')
250
    assert 'Check values then click submit.' in resp.text
251
    resp = resp.forms[0].submit('submit')
252
    assert resp.status_int == 302
253
    resp = resp.follow()
254
    assert 'The form has been recorded' in resp.text
255

  
256
    # register a comment = create a AttachmentEvolutionPart
257
    formdata = formdef.data_class().select()[0]
258
    item = RegisterCommenterWorkflowStatusItem()
259
    item.attachments = ['form_var_file_raw', 'form_var_remote_file_raw']
260
    item.comment = 'text in form history'
261
    item.perform(formdata)
262

  
263
    # links on frontoffice: no link to remote file
264
    resp = user_app.get('/test/%s/' % formdata.id)
265
    assert resp.text.count('<p class="wf-attachment"><a href="attachment?f=') == 1
266
    assert resp.text.count('<p class="wf-attachment"><a href="attachment?f=uuid-') == 0
267
    # links on backoffice: links to local and remote file
268
    resp = admin_app.get('/backoffice/management/test/%s/' % formdata.id)
269
    assert resp.text.count('<p class="wf-attachment"><a href="attachment?f=') == 2
270
    assert resp.text.count('<p class="wf-attachment"><a href="attachment?f=uuid-') == 1
271

  
272
    local_file = formdata.evolution[-1].parts[0]
273
    local_file_id = os.path.basename(local_file.filename)
274
    remote_file = formdata.evolution[-1].parts[1]
275
    remote_file_id = remote_file.filename
276
    assert not local_file_id.startswith('uuid-')
277
    assert remote_file_id.startswith('uuid-')
278

  
279
    # clic on remote file in frontoffice: redirect... but forbidden
280
    resp = user_app.get('/test/%s/attachment?f=%s' % (formdata.id, remote_file_id))
281
    assert resp.status_int == 302
282
    resp = resp.follow(status=404)
283
    # clic in backoffice, redirect to decryption system
284
    resp = admin_app.get('/backoffice/management/test/%s/attachment?f=%s' % (formdata.id, remote_file_id))
285
    assert resp.status_int == 302
286
    resp = resp.follow()
287
    assert resp.location.startswith('https://crypto.example.net/')
288
    assert '&signature=' in resp.location
wcs/qommon/upload_storage.py
41 41
    def get_file_pointer(self):
42 42
        if 'fp' in self.__dict__ and self.__dict__.get('fp') is not None:
43 43
            return self.__dict__.get('fp')
44
        elif hasattr(self, 'qfilename'):
44
        elif getattr(self, 'qfilename', None):
45 45
            basedir = os.path.join(get_publisher().app_dir, 'uploads')
46 46
            self.fp = open(os.path.join(basedir, self.qfilename), 'rb')
47 47
            return self.fp
wcs/workflows.py
39 39
from quixote.html import htmltext
40 40
from .qommon import errors
41 41
from .qommon.template import Template, TemplateError
42
from .qommon.upload_storage import PicklableUpload, get_storage_object
42 43

  
43 44
from .conditions import Condition
44 45
from .roles import Role, logged_users_role, get_user_roles
......
117 118

  
118 119
    @property
119 120
    def content(self):
120
        return self.attachment_evolution_part.get_file_pointer().read()
121
        fp = self.attachment_evolution_part.get_file_pointer()
122
        if fp:
123
            return fp.read()
124
        return b''
121 125

  
122 126
    @property
123 127
    def b64_content(self):
......
168 172
    content_type = None
169 173
    charset = None
170 174
    varname = None
175
    storage = None
176
    storage_attrs = None
171 177

  
172 178
    def __init__(self, base_filename, fp, orig_filename=None, content_type=None,
173
            charset=None, varname=None):
179
            charset=None, varname=None, storage=None, storage_attrs=None):
174 180
        self.base_filename = base_filename
175 181
        self.orig_filename = orig_filename or base_filename
176 182
        self.content_type = content_type
177 183
        self.charset = charset
178 184
        self.fp = fp
179 185
        self.varname = varname
186
        self.storage = storage
187
        self.storage_attrs = storage_attrs
180 188

  
181 189
    @classmethod
182 190
    def from_upload(cls, upload, varname=None):
183 191
        return AttachmentEvolutionPart(
184 192
                upload.base_filename,
185
                upload.fp,
193
                getattr(upload, 'fp', None),
186 194
                upload.orig_filename,
187 195
                upload.content_type,
188 196
                upload.charset,
189
                varname=varname)
197
                varname=varname,
198
                storage=getattr(upload, 'storage', None),
199
                storage_attrs=getattr(upload, 'storage_attrs', None))
190 200

  
191 201
    def get_file_pointer(self):
202
        if self.filename.startswith('uuid-'):
203
            return None
192 204
        return open(self.filename, 'rb')
193 205

  
194 206
    def __getstate__(self):
195 207
        odict = self.__dict__.copy()
196
        if not 'fp' in odict:
208
        if not odict.get('fp'):
209
            if 'filename' not in odict:
210
                # we need a filename as an identifier: create one from nothing
211
                # instead of file_digest(self.fp) (see below)
212
                odict['filename'] = 'uuid-%s' % uuid.uuid4()
213
                self.filename = odict['filename']
197 214
            return odict
198 215

  
199 216
        del odict['fp']
......
201 218
        if not os.path.exists(dirname):
202 219
            os.mkdir(dirname)
203 220

  
204
        if not 'filename' in odict:
221
        # there is not filename, or it was a temporary one: create it
222
        if not 'filename' in odict or odict['filename'].startswith('uuid-'):
205 223
            filename = file_digest(self.fp)
206 224
            dirname = os.path.join(dirname, filename[:4])
207 225
            if not os.path.exists(dirname):
......
213 231
        return odict
214 232

  
215 233
    def view(self):
216
        return htmltext('<p class="wf-attachment"><a href="attachment?f=%s">%s</a>' % (
217
                    os.path.basename(self.filename), self.orig_filename))
234
        show_link = True
235
        if self.has_redirect_url():
236
            is_in_backoffice = bool(get_request() and get_request().is_in_backoffice())
237
            show_link = bool(self.get_redirect_url(backoffice=is_in_backoffice))
238
        if show_link:
239
            return htmltext('<p class="wf-attachment"><a href="attachment?f=%s">%s</a></p>' % (
240
                        os.path.basename(self.filename), self.orig_filename))
241
        else:
242
            return htmltext('<p class="wf-attachment">%s</p>' % self.orig_filename)
218 243

  
219 244
    @classmethod
220 245
    def get_substitution_variables(cls, formdata):
......
224 249
    # mimic PicklableUpload methods:
225 250

  
226 251
    def can_thumbnail(self):
227
        return True
252
        return get_storage_object(getattr(self, 'storage', None)).can_thumbnail(self)
228 253

  
229 254
    def has_redirect_url(self):
230
        return False
255
        return get_storage_object(getattr(self, 'storage', None)).has_redirect_url(self)
231 256

  
232
    def get_redirect_url(self, upload, backoffice=False):
233
        # should never be called, has_redirect_url is False
234
        raise AssertionError('no get_redirect_url on AttachmentEvolutionPart object')
257
    def get_redirect_url(self, backoffice=False):
258
        return get_storage_object(getattr(self, 'storage', None)).get_redirect_url(self,
259
                backoffice=backoffice)
235 260

  
236 261

  
237 262
class DuplicateGlobalActionNameError(Exception):
......
2179 2204
                if not picklableupload:
2180 2205
                    continue
2181 2206

  
2182
                try:
2183
                    # convert any value to a PicklableUpload; it will ususally
2184
                    # be a dict like one provided by qommon/evalutils:attachment()
2185
                    picklableupload = FileField.convert_value_from_anything(picklableupload)
2186
                except ValueError:
2187
                    get_publisher().notify_of_exception(sys.exc_info(),
2188
                                                        context='[workflow/attachments]')
2189
                    continue
2207
                if not isinstance(picklableupload, PicklableUpload):
2208
                    try:
2209
                        # convert any value to a PicklableUpload; it will ususally
2210
                        # be a dict like one provided by qommon/evalutils:attachment()
2211
                        picklableupload = FileField.convert_value_from_anything(picklableupload)
2212
                    except ValueError:
2213
                        get_publisher().notify_of_exception(sys.exc_info(),
2214
                                                            context='[workflow/attachments]')
2215
                        continue
2190 2216

  
2191 2217
                uploads.append(picklableupload)
2192 2218

  
2193
-