0001-add-option-to-push-attached-document-to-portfolio-10.patch
tests/test_form_pages.py | ||
---|---|---|
1913 | 1913 |
assert resp.content_type == 'application/pdf' |
1914 | 1914 |
assert resp.body.startswith('%PDF-') |
1915 | 1915 | |
1916 | ||
1917 |
@pytest.mark.skipif(transform_to_pdf is None, reason='libreoffice not found') |
|
1918 |
def test_formdata_generated_document_odt_to_pdf_download_push_to_portfolio(pub, fargo_url, |
|
1919 |
fargo_secret, caplog): |
|
1920 |
create_user(pub) |
|
1921 |
pub.cfg['debug'] = {'logger': True} |
|
1922 |
pub.write_cfg() |
|
1923 |
wf = Workflow(name='status') |
|
1924 |
st1 = wf.add_status('Status1', 'st1') |
|
1925 |
export_to = ExportToModel() |
|
1926 |
export_to.label = 'create doc' |
|
1927 |
export_to.varname = 'created_doc' |
|
1928 |
template_filename = os.path.join(os.path.dirname(__file__), 'template.odt') |
|
1929 |
template = open(template_filename).read() |
|
1930 |
upload = QuixoteUpload('/foo/template.odt', content_type='application/octet-stream') |
|
1931 |
upload.fp = StringIO.StringIO() |
|
1932 |
upload.fp.write(template) |
|
1933 |
upload.fp.seek(0) |
|
1934 |
export_to.model_file = UploadedFile(pub.app_dir, None, upload) |
|
1935 |
export_to.id = '_export_to' |
|
1936 |
export_to.by = ['_submitter'] |
|
1937 |
export_to.convert_to_pdf = True |
|
1938 |
export_to.push_to_portfolio = True |
|
1939 |
st1.items.append(export_to) |
|
1940 |
export_to.parent = st1 |
|
1941 |
wf.store() |
|
1942 | ||
1943 |
formdef = create_formdef() |
|
1944 |
formdef.workflow_id = wf.id |
|
1945 |
formdef.fields = [] |
|
1946 |
formdef.store() |
|
1947 |
formdef.data_class().wipe() |
|
1948 | ||
1949 |
resp = login(get_app(pub), username='foo', password='foo').get('/test/') |
|
1950 |
resp = resp.forms[0].submit('submit') |
|
1951 |
assert 'Check values then click submit.' in resp.body |
|
1952 |
resp = resp.forms[0].submit('submit') |
|
1953 |
assert resp.status_int == 302 |
|
1954 |
form_location = resp.location |
|
1955 |
resp = resp.follow() |
|
1956 |
assert 'The form has been recorded' in resp.body |
|
1957 | ||
1958 |
with mock.patch('wcs.file_validation.http_post_request') as http_post_request: |
|
1959 |
http_post_request.return_value = None, 200, 'null', None |
|
1960 |
resp = resp.form.submit('button_export_to') |
|
1961 |
assert http_post_request.call_count == 1 |
|
1962 |
assert ('file template.pdf pushed to portfolio of foo@localhost' |
|
1963 |
== caplog.records()[-1].message) |
|
1964 | ||
1965 |
resp = resp.follow() # $form/$id/create_doc |
|
1966 |
resp = resp.follow() # $form/$id/create_doc/ |
|
1967 |
assert resp.content_type == 'application/pdf' |
|
1968 |
assert 'PDF' in resp.body |
|
1969 | ||
1970 |
export_to.attach_to_history = True |
|
1971 |
wf.store() |
|
1972 | ||
1973 |
resp = login(get_app(pub), username='foo', password='foo').get(form_location) |
|
1974 |
with mock.patch('wcs.file_validation.http_post_request') as http_post_request: |
|
1975 |
http_post_request.return_value = None, 200, 'null', None |
|
1976 |
resp = resp.form.submit('button_export_to') |
|
1977 |
assert http_post_request.call_count == 1 |
|
1978 |
assert ('file template.pdf pushed to portfolio of foo@localhost' |
|
1979 |
== caplog.records()[-1].message) |
|
1980 |
assert resp.location == form_location |
|
1981 |
resp = resp.follow() # back to form page |
|
1982 | ||
1983 |
resp = resp.click('template.pdf') |
|
1984 |
assert resp.location.endswith('/template.pdf') |
|
1985 |
resp = resp.follow() |
|
1986 |
assert resp.content_type == 'application/pdf' |
|
1987 |
assert resp.body.startswith('%PDF-') |
|
1988 | ||
1989 | ||
1916 | 1990 |
def test_formdata_form_file_download(pub): |
1917 | 1991 |
create_user(pub) |
1918 | 1992 |
wf = Workflow(name='status') |
wcs/file_validation.py | ||
---|---|---|
18 | 18 |
import urlparse |
19 | 19 |
import hashlib |
20 | 20 |
import urllib |
21 |
import base64 |
|
21 | 22 | |
22 | 23 |
from qommon import get_logger |
23 | 24 |
from qommon.misc import http_get_page, json_loads, http_post_request, ConnectionError |
24 |
from quixote import get_publisher, get_request |
|
25 |
from quixote import get_publisher, get_request, get_response
|
|
25 | 26 | |
26 | 27 |
from wcs.api_utils import get_secret_and_orig, sign_url |
27 | 28 | |
... | ... | |
57 | 58 |
return status, json_loads(response_payload) |
58 | 59 | |
59 | 60 | |
61 |
# Allow doing a signed POST in an afterjob, as fargo_url() does not work if no request is in |
|
62 |
# context; so we do it in the constructor. |
|
63 |
class fargo_post_json_async(object): |
|
64 |
def __init__(self, url, payload): |
|
65 |
self.url = fargo_url(url) |
|
66 |
self.payload = payload |
|
67 | ||
68 |
def __call__(self): |
|
69 |
headers = {'Content-Type': 'application/json'} |
|
70 |
response, status, response_payload, auth_header = http_post_request( |
|
71 |
self.url, json.dumps(self.payload), headers=headers) |
|
72 |
return status, json_loads(response_payload) |
|
73 | ||
74 | ||
60 | 75 |
def sha256_of_upload(upload): |
61 | 76 |
return hashlib.sha256(upload.get_content()).hexdigest() |
62 | 77 | |
... | ... | |
147 | 162 |
upload.metadata = response['data'] |
148 | 163 |
filled.data['%s_structured' % field.id] = upload.metadata |
149 | 164 |
filled.store() |
165 | ||
166 | ||
167 |
def push_document(user, filename, stream): |
|
168 |
if not user: |
|
169 |
return |
|
170 |
charset = get_publisher().site_charset |
|
171 |
payload = {} |
|
172 |
if user.name_identifiers: |
|
173 |
payload['user_nameid'] = unicode(user.name_identifiers[0], 'ascii') |
|
174 |
elif user.email: |
|
175 |
payload['user_email'] = unicode(user.email, 'ascii') |
|
176 |
payload['origin'] = get_request().get_server().split(':')[0] |
|
177 |
payload['file_name'] = unicode(filename, charset) |
|
178 |
stream.seek(0) |
|
179 |
payload['file_b64_content'] = base64.b64encode(stream.read()) |
|
180 |
async_post = fargo_post_json_async('/api/documents/push/', payload) |
|
181 | ||
182 |
def afterjob(job): |
|
183 |
status = 0 |
|
184 |
status, resp = async_post() |
|
185 |
if status == 200: |
|
186 |
get_logger().info('file %s pushed to portfolio of %s' |
|
187 |
% (filename, user.display_name)) |
|
188 |
else: |
|
189 |
get_logger().error('failed %s failed to be pushed to portfolio of %s' |
|
190 |
% (filename, user.display_name)) |
|
191 | ||
192 |
get_response().add_after_job( |
|
193 |
N_('Sending file %s in portfolio of %s') % (filename, user.display_name), |
|
194 |
afterjob) |
wcs/wf/attachment.py | ||
---|---|---|
21 | 21 |
from qommon.errors import * |
22 | 22 | |
23 | 23 |
from wcs.forms.common import FormStatusPage, FileDirectory |
24 |
from wcs.file_validation import has_file_validation, push_document |
|
24 | 25 | |
25 | 26 |
def lookup_wf_attachment(self, filename): |
26 | 27 |
# supports for URLs such as /$formdata/$id/files/attachment/test.txt |
... | ... | |
79 | 80 |
by = [] |
80 | 81 |
backoffice_info_text = None |
81 | 82 |
varname = None |
83 |
push_to_portfolio = False |
|
82 | 84 | |
83 | 85 |
@classmethod |
84 | 86 |
def init(cls): |
... | ... | |
112 | 114 |
if self.required: |
113 | 115 |
form.set_error('attachment%s' % self.id, _('Missing file')) |
114 | 116 |
return |
117 |
if self.push_to_portfolio: |
|
118 |
push_document(formdata.get_user(), f.base_filename, f.fp) |
|
115 | 119 |
evo.add_part(AttachmentEvolutionPart.from_upload(f, varname=self.varname)) |
116 | 120 | |
117 | 121 |
def get_parameters(self): |
118 |
return ('by', 'required', 'title', 'display_title', 'button_label',
|
|
122 |
parameters = ('by', 'required', 'title', 'display_title', 'button_label',
|
|
119 | 123 |
'display_button', 'hint', 'backoffice_info_text', 'varname') |
124 |
if has_file_validation(): |
|
125 |
parameters += ('push_to_portfolio',) |
|
126 |
return parameters |
|
120 | 127 | |
121 | 128 |
def add_parameters_widgets(self, form, parameters, prefix='', formdef=None): |
122 | 129 |
if 'by' in parameters: |
... | ... | |
146 | 153 |
if 'varname' in parameters: |
147 | 154 |
form.add(VarnameWidget, '%svarname' % prefix, |
148 | 155 |
title=_('Variable Name'), value=self.varname) |
156 |
if 'push_to_portfolio' in parameters: |
|
157 |
form.add(CheckboxWidget, '%spush_to_portfolio' % prefix, |
|
158 |
title=_('Push generated file to portfolio'), |
|
159 |
value=self.push_to_portfolio) |
|
149 | 160 | |
150 | 161 | |
151 | 162 |
register_item_class(AddAttachmentWorkflowStatusItem) |
wcs/wf/export_to_model.py | ||
---|---|---|
37 | 37 |
from wcs.fields import SubtitleField, TitleField, CommentField, PageField |
38 | 38 |
from wcs.workflows import (WorkflowStatusItem, AttachmentEvolutionPart, |
39 | 39 |
template_on_formdata, register_item_class) |
40 |
from wcs.file_validation import has_file_validation, push_document |
|
40 | 41 | |
41 | 42 | |
42 | 43 |
try: |
... | ... | |
179 | 180 |
backoffice_info_text = None |
180 | 181 |
varname = None |
181 | 182 |
convert_to_pdf = False |
183 |
push_to_portfolio = False |
|
182 | 184 | |
183 | 185 |
def render_as_line(self): |
184 | 186 |
if self.label: |
... | ... | |
207 | 209 |
if not evo.comment: |
208 | 210 |
evo.comment = _('Form exported in a model') |
209 | 211 |
in_backoffice = get_request() and get_request().is_in_backoffice() |
212 |
outstream = self.apply_template_to_formdata(formdata) |
|
213 |
filename = self.model_file.base_filename |
|
214 |
content_type = self.model_file.content_type |
|
215 |
if self.convert_to_pdf: |
|
216 |
filename = filename.rsplit('.', 1)[0] + '.pdf' |
|
217 |
content_type = 'application/pdf' |
|
218 |
if self.push_to_portfolio: |
|
219 |
push_document(formdata.get_user(), filename, outstream) |
|
210 | 220 |
if self.attach_to_history: |
211 |
filename = self.model_file.base_filename |
|
212 |
content_type = self.model_file.content_type |
|
213 |
if self.convert_to_pdf: |
|
214 |
filename = filename.rsplit('.', 1)[0] + '.pdf' |
|
215 |
content_type = 'application/pdf' |
|
216 | 221 |
evo.add_part(AttachmentEvolutionPart( |
217 | 222 |
filename, |
218 |
self.apply_template_to_formdata(formdata),
|
|
223 |
outstream,
|
|
219 | 224 |
content_type=content_type, |
220 | 225 |
varname=self.varname)) |
221 | 226 |
return formdata.get_url(backoffice=in_backoffice) |
... | ... | |
309 | 314 |
form.add(CheckboxWidget, '%sconvert_to_pdf' % prefix, |
310 | 315 |
title=_('Convert generated file to PDF'), |
311 | 316 |
value=self.convert_to_pdf) |
317 |
if 'push_to_portfolio' in parameters: |
|
318 |
form.add(CheckboxWidget, '%spush_to_portfolio' % prefix, |
|
319 |
title=_('Push generated file to portfolio'), |
|
320 |
value=self.push_to_portfolio) |
|
312 | 321 | |
313 | 322 |
def get_directory_name(self): |
314 | 323 |
return qommon.misc.simplify(self.label or 'export_to_model', space='_') |
... | ... | |
369 | 378 |
outstream.seek(0) |
370 | 379 |
return outstream |
371 | 380 | |
372 | ||
373 | 381 |
def get_parameters(self): |
374 | 382 |
parameters = ('by', 'label', 'model_file', 'attach_to_history', |
375 | 383 |
'backoffice_info_text', 'varname') |
376 | 384 |
if transform_to_pdf is not None: |
377 | 385 |
parameters += ('convert_to_pdf',) |
386 |
if has_file_validation(): |
|
387 |
parameters += ('push_to_portfolio',) |
|
378 | 388 |
return parameters |
379 | 389 | |
380 | 390 |
def model_file_export_to_xml(self, xml_item, charset, include_id=False): |
381 |
- |