0001-add-option-to-push-attached-document-to-portfolio-10.patch
tests/conftest.py | ||
---|---|---|
3 | 3 | |
4 | 4 |
import pytest |
5 | 5 | |
6 | ||
6 | 7 |
def pytest_addoption(parser): |
7 | 8 |
parser.addoption('--without-postgresql-tests', action='store_true', |
8 |
help='disable tests requiring postgresql') |
|
9 |
help='disable tests requiring postgresql') |
|
10 | ||
9 | 11 | |
10 | 12 |
def pytest_runtest_setup(item): |
11 | 13 |
if 'postgresql' in item.keywords and item.config.option.without_postgresql_tests is True: |
12 | 14 |
pytest.skip('skipped (PostgreSQL are disabled on command line)') |
13 | 15 | |
14 |
def variable_url(request, pub, variable, url): |
|
16 | ||
17 |
def site_options(request, pub, section, variable, value): |
|
15 | 18 |
config = ConfigParser.ConfigParser() |
16 | 19 |
path = os.path.join(pub.app_dir, 'site-options.cfg') |
17 | 20 |
if os.path.exists(path): |
18 | 21 |
config.read([path]) |
19 |
if not config.has_section('options'):
|
|
20 |
config.add_section('options')
|
|
21 |
config.set('options', variable, url)
|
|
22 |
if not config.has_section(section):
|
|
23 |
config.add_section(section)
|
|
24 |
config.set(section, variable, value)
|
|
22 | 25 |
with file(path, 'w') as site_option: |
23 | 26 |
config.write(site_option) |
24 | 27 | |
... | ... | |
26 | 29 |
config = ConfigParser.ConfigParser() |
27 | 30 |
if os.path.exists(path): |
28 | 31 |
config.read([path]) |
29 |
config.remove_option('options', variable)
|
|
32 |
config.remove_option(section, variable)
|
|
30 | 33 |
with file(path, 'w') as site_option: |
31 | 34 |
config.write(site_option) |
32 | 35 |
request.addfinalizer(fin) |
33 |
return url |
|
36 |
return value |
|
37 | ||
34 | 38 | |
35 | 39 |
@pytest.fixture |
36 | 40 |
def fargo_url(request, pub): |
37 |
return variable_url(request, pub, 'fargo_url', 'http://fargo.example.net') |
|
41 |
return site_options(request, pub, 'options', 'fargo_url', 'http://fargo.example.net') |
|
42 | ||
43 | ||
44 |
@pytest.fixture |
|
45 |
def fargo_secret(request, pub): |
|
46 |
return site_options(request, pub, 'wscall-secrets', 'fargo.example.net', 'xxx') |
|
47 | ||
38 | 48 | |
39 | 49 |
@pytest.fixture |
40 | 50 |
def welco_url(request, pub): |
41 |
return variable_url(request, pub, 'welco_url', 'http://welco.example.net') |
|
51 |
return site_options(request, pub, 'options', 'welco_url', 'http://welco.example.net') |
tests/test_form_pages.py | ||
---|---|---|
1904 | 1904 |
assert resp.content_type == 'application/pdf' |
1905 | 1905 |
assert resp.body.startswith('%PDF-') |
1906 | 1906 | |
1907 | ||
1908 |
@pytest.mark.skipif(transform_to_pdf is None, reason='libreoffice not found') |
|
1909 |
def test_formdata_generated_document_odt_to_pdf_download_push_to_portfolio(pub, fargo_url, |
|
1910 |
fargo_secret, caplog): |
|
1911 |
create_user(pub) |
|
1912 |
pub.cfg['debug'] = {'logger': True} |
|
1913 |
pub.write_cfg() |
|
1914 |
wf = Workflow(name='status') |
|
1915 |
st1 = wf.add_status('Status1', 'st1') |
|
1916 |
export_to = ExportToModel() |
|
1917 |
export_to.label = 'create doc' |
|
1918 |
export_to.varname = 'created_doc' |
|
1919 |
template_filename = os.path.join(os.path.dirname(__file__), 'template.odt') |
|
1920 |
template = open(template_filename).read() |
|
1921 |
upload = QuixoteUpload('/foo/template.odt', content_type='application/octet-stream') |
|
1922 |
upload.fp = StringIO.StringIO() |
|
1923 |
upload.fp.write(template) |
|
1924 |
upload.fp.seek(0) |
|
1925 |
export_to.model_file = UploadedFile(pub.app_dir, None, upload) |
|
1926 |
export_to.id = '_export_to' |
|
1927 |
export_to.by = ['_submitter'] |
|
1928 |
export_to.convert_to_pdf = True |
|
1929 |
export_to.push_to_portfolio = True |
|
1930 |
st1.items.append(export_to) |
|
1931 |
export_to.parent = st1 |
|
1932 |
wf.store() |
|
1933 | ||
1934 |
formdef = create_formdef() |
|
1935 |
formdef.workflow_id = wf.id |
|
1936 |
formdef.fields = [] |
|
1937 |
formdef.store() |
|
1938 |
formdef.data_class().wipe() |
|
1939 | ||
1940 |
resp = login(get_app(pub), username='foo', password='foo').get('/test/') |
|
1941 |
resp = resp.forms[0].submit('submit') |
|
1942 |
assert 'Check values then click submit.' in resp.body |
|
1943 |
resp = resp.forms[0].submit('submit') |
|
1944 |
assert resp.status_int == 302 |
|
1945 |
form_location = resp.location |
|
1946 |
resp = resp.follow() |
|
1947 |
assert 'The form has been recorded' in resp.body |
|
1948 | ||
1949 |
with mock.patch('wcs.file_validation.http_post_request') as http_post_request: |
|
1950 |
http_post_request.return_value = None, 200, 'null', None |
|
1951 |
resp = resp.form.submit('button_export_to') |
|
1952 |
assert http_post_request.call_count == 1 |
|
1953 |
assert ('file template.pdf pushed to portfolio of foo@localhost' |
|
1954 |
== caplog.records()[-1].message) |
|
1955 | ||
1956 |
resp = resp.follow() # $form/$id/create_doc |
|
1957 |
resp = resp.follow() # $form/$id/create_doc/ |
|
1958 |
assert resp.content_type == 'application/pdf' |
|
1959 |
assert 'PDF' in resp.body |
|
1960 | ||
1961 |
export_to.attach_to_history = True |
|
1962 |
wf.store() |
|
1963 | ||
1964 |
resp = login(get_app(pub), username='foo', password='foo').get(form_location) |
|
1965 |
with mock.patch('wcs.file_validation.http_post_request') as http_post_request: |
|
1966 |
http_post_request.return_value = None, 200, 'null', None |
|
1967 |
resp = resp.form.submit('button_export_to') |
|
1968 |
assert http_post_request.call_count == 1 |
|
1969 |
assert ('file template.pdf pushed to portfolio of foo@localhost' |
|
1970 |
== caplog.records()[-1].message) |
|
1971 |
assert resp.location == form_location |
|
1972 |
resp = resp.follow() # back to form page |
|
1973 | ||
1974 |
resp = resp.click('template.pdf') |
|
1975 |
assert resp.location.endswith('/template.pdf') |
|
1976 |
resp = resp.follow() |
|
1977 |
assert resp.content_type == 'application/pdf' |
|
1978 |
assert resp.body.startswith('%PDF-') |
|
1979 | ||
1980 | ||
1907 | 1981 |
def test_formdata_form_file_download(pub): |
1908 | 1982 |
create_user(pub) |
1909 | 1983 |
wf = Workflow(name='status') |
wcs/file_validation.py | ||
---|---|---|
18 | 18 |
import urlparse |
19 | 19 |
import hashlib |
20 | 20 |
import urllib |
21 |
import base64 |
|
21 | 22 | |
23 |
from qommon import get_logger |
|
22 | 24 |
from qommon.misc import http_get_page, json_loads, http_post_request |
23 |
from quixote import get_publisher, get_request |
|
25 |
from quixote import get_publisher, get_request, get_response
|
|
24 | 26 | |
25 | 27 |
from wcs.api_utils import get_secret_and_orig, sign_url |
26 | 28 | |
... | ... | |
56 | 58 |
return status, json_loads(response_payload) |
57 | 59 | |
58 | 60 | |
61 |
# Allow doing a signed POST in an afterjob, as fargo_url() does not work if no request is in |
|
62 |
# context; so we do it in the constructor. |
|
63 |
class fargo_post_json_async(object): |
|
64 |
def __init__(self, url, payload): |
|
65 |
self.url = fargo_url(url) |
|
66 |
self.payload = payload |
|
67 | ||
68 |
def __call__(self): |
|
69 |
headers = {'Content-Type': 'application/json'} |
|
70 |
response, status, response_payload, auth_header = http_post_request( |
|
71 |
self.url, json.dumps(self.payload), headers=headers) |
|
72 |
return status, json_loads(response_payload) |
|
73 | ||
74 | ||
59 | 75 |
def sha256_of_upload(upload): |
60 | 76 |
return hashlib.sha256(upload.get_content()).hexdigest() |
61 | 77 | |
... | ... | |
129 | 145 |
upload.metadata = response['data'] |
130 | 146 |
filled.data['%s_structured' % field.id] = upload.metadata |
131 | 147 |
filled.store() |
148 | ||
149 | ||
150 |
def push_document(user, filename, stream): |
|
151 |
if not user: |
|
152 |
return |
|
153 |
charset = get_publisher().site_charset |
|
154 |
payload = {} |
|
155 |
if user.name_identifiers: |
|
156 |
payload['user_nameid'] = unicode(user.name_identifiers[0], 'ascii') |
|
157 |
elif user.email: |
|
158 |
payload['user_email'] = unicode(user.email, 'ascii') |
|
159 |
payload['origin'] = get_request().get_server().split(':')[0] |
|
160 |
payload['file_name'] = unicode(filename, charset) |
|
161 |
stream.seek(0) |
|
162 |
payload['file_b64_content'] = base64.b64encode(stream.read()) |
|
163 |
async_post = fargo_post_json_async('/api/documents/push/', payload) |
|
164 | ||
165 |
def afterjob(job): |
|
166 |
status = 0 |
|
167 |
status, resp = async_post() |
|
168 |
if status == 200: |
|
169 |
get_logger().info('file %s pushed to portfolio of %s' |
|
170 |
% (filename, user.display_name)) |
|
171 |
else: |
|
172 |
get_logger().error('failed %s failed to be pushed to portfolio of %s' |
|
173 |
% (filename, user.display_name)) |
|
174 | ||
175 |
get_response().add_after_job( |
|
176 |
N_('Sending file %s in portfolio of %s') % (filename, user.display_name), |
|
177 |
afterjob) |
wcs/wf/attachment.py | ||
---|---|---|
21 | 21 |
from qommon.errors import * |
22 | 22 | |
23 | 23 |
from wcs.forms.common import FormStatusPage, FileDirectory |
24 |
from wcs.file_validation import has_file_validation, push_document |
|
24 | 25 | |
25 | 26 |
def lookup_wf_attachment(self, filename): |
26 | 27 |
# supports for URLs such as /$formdata/$id/files/attachment/test.txt |
... | ... | |
79 | 80 |
by = [] |
80 | 81 |
backoffice_info_text = None |
81 | 82 |
varname = None |
83 |
push_to_portfolio = False |
|
82 | 84 | |
83 | 85 |
@classmethod |
84 | 86 |
def init(cls): |
... | ... | |
112 | 114 |
if self.required: |
113 | 115 |
form.set_error('attachment%s' % self.id, _('Missing file')) |
114 | 116 |
return |
117 |
if self.push_to_portfolio: |
|
118 |
push_document(formdata.get_user(), f.base_filename, f.fp) |
|
115 | 119 |
evo.add_part(AttachmentEvolutionPart.from_upload(f, varname=self.varname)) |
116 | 120 | |
117 | 121 |
def get_parameters(self): |
118 |
return ('by', 'required', 'title', 'display_title', 'button_label',
|
|
122 |
parameters = ('by', 'required', 'title', 'display_title', 'button_label',
|
|
119 | 123 |
'display_button', 'hint', 'backoffice_info_text', 'varname') |
124 |
if has_file_validation(): |
|
125 |
parameters += ('push_to_portfolio',) |
|
126 |
return parameters |
|
120 | 127 | |
121 | 128 |
def add_parameters_widgets(self, form, parameters, prefix='', formdef=None): |
122 | 129 |
if 'by' in parameters: |
... | ... | |
146 | 153 |
if 'varname' in parameters: |
147 | 154 |
form.add(VarnameWidget, '%svarname' % prefix, |
148 | 155 |
title=_('Variable Name'), value=self.varname) |
156 |
if 'push_to_portfolio' in parameters: |
|
157 |
form.add(CheckboxWidget, '%spush_to_portfolio' % prefix, |
|
158 |
title=_('Push generated file to portfiolo'), |
|
159 |
value=self.push_to_portfolio) |
|
149 | 160 | |
150 | 161 | |
151 | 162 |
register_item_class(AddAttachmentWorkflowStatusItem) |
wcs/wf/export_to_model.py | ||
---|---|---|
37 | 37 |
from wcs.fields import SubtitleField, TitleField, CommentField, PageField |
38 | 38 |
from wcs.workflows import (WorkflowStatusItem, AttachmentEvolutionPart, |
39 | 39 |
template_on_formdata, register_item_class) |
40 |
from wcs.file_validation import has_file_validation, push_document |
|
40 | 41 | |
41 | 42 | |
42 | 43 |
try: |
... | ... | |
179 | 180 |
backoffice_info_text = None |
180 | 181 |
varname = None |
181 | 182 |
convert_to_pdf = False |
183 |
push_to_portfolio = False |
|
182 | 184 | |
183 | 185 |
def render_as_line(self): |
184 | 186 |
if self.label: |
... | ... | |
207 | 209 |
if not evo.comment: |
208 | 210 |
evo.comment = _('Form exported in a model') |
209 | 211 |
in_backoffice = get_request() and get_request().is_in_backoffice() |
212 |
outstream = self.apply_template_to_formdata(formdata) |
|
213 |
filename = self.model_file.base_filename |
|
214 |
content_type = self.model_file.content_type |
|
215 |
if self.convert_to_pdf: |
|
216 |
filename = filename.rsplit('.', 1)[0] + '.pdf' |
|
217 |
content_type = 'application/pdf' |
|
218 |
if self.push_to_portfolio: |
|
219 |
push_document(formdata.get_user(), filename, outstream) |
|
210 | 220 |
if self.attach_to_history: |
211 |
filename = self.model_file.base_filename |
|
212 |
content_type = self.model_file.content_type |
|
213 |
if self.convert_to_pdf: |
|
214 |
filename = filename.rsplit('.', 1)[0] + '.pdf' |
|
215 |
content_type = 'application/pdf' |
|
216 | 221 |
evo.add_part(AttachmentEvolutionPart( |
217 | 222 |
filename, |
218 |
self.apply_template_to_formdata(formdata),
|
|
223 |
outstream,
|
|
219 | 224 |
content_type=content_type, |
220 | 225 |
varname=self.varname)) |
221 | 226 |
return formdata.get_url(backoffice=in_backoffice) |
... | ... | |
309 | 314 |
form.add(CheckboxWidget, '%sconvert_to_pdf' % prefix, |
310 | 315 |
title=_('Convert generated file to PDF'), |
311 | 316 |
value=self.convert_to_pdf) |
317 |
if 'push_to_portfolio' in parameters: |
|
318 |
form.add(CheckboxWidget, '%spush_to_portfolio' % prefix, |
|
319 |
title=_('Push generated file to portfiolo'), |
|
320 |
value=self.push_to_portfolio) |
|
312 | 321 | |
313 | 322 |
def get_directory_name(self): |
314 | 323 |
return qommon.misc.simplify(self.label or 'export_to_model', space='_') |
... | ... | |
369 | 378 |
outstream.seek(0) |
370 | 379 |
return outstream |
371 | 380 | |
372 | ||
373 | 381 |
def get_parameters(self): |
374 | 382 |
parameters = ('by', 'label', 'model_file', 'attach_to_history', |
375 | 383 |
'backoffice_info_text', 'varname') |
376 | 384 |
if transform_to_pdf is not None: |
377 | 385 |
parameters += ('convert_to_pdf',) |
386 |
if has_file_validation(): |
|
387 |
parameters += ('push_to_portfolio',) |
|
378 | 388 |
return parameters |
379 | 389 | |
380 | 390 |
def model_file_export_to_xml(self, xml_item, charset, include_id=False): |
381 |
- |