0001-misc-split-form_pages-tests.patch
tests/form_pages/test_all.py | ||
---|---|---|
3 | 3 |
import json |
4 | 4 |
import pytest |
5 | 5 |
import hashlib |
6 |
import locale |
|
7 | 6 |
import os |
8 | 7 |
import re |
9 | 8 |
import time |
10 | 9 |
import zipfile |
11 |
import base64 |
|
12 | 10 |
from webtest import Upload, Hidden |
13 | 11 |
import mock |
14 | 12 |
import xml.etree.ElementTree as ET |
... | ... | |
21 | 19 |
from django.utils.six import StringIO, BytesIO |
22 | 20 |
from django.utils.six.moves.urllib import parse as urlparse |
23 | 21 | |
24 |
from quixote.http_request import Upload as QuixoteUpload |
|
25 | 22 |
from django.utils.encoding import force_bytes, force_text |
26 | 23 | |
27 | 24 |
from wcs.qommon import force_str |
28 | 25 |
from wcs.qommon.emails import docutils |
29 |
from wcs.qommon.form import UploadedFile |
|
30 | 26 |
from wcs.qommon.ident.password_accounts import PasswordAccount |
31 | 27 |
from wcs.carddef import CardDef |
32 | 28 |
from wcs.formdef import FormDef |
... | ... | |
36 | 32 |
ChoiceWorkflowStatusItem, JumpOnSubmitWorkflowStatusItem, |
37 | 33 |
CommentableWorkflowStatusItem, WorkflowVariablesFieldsFormDef) |
38 | 34 |
from wcs.wf.backoffice_fields import SetBackofficeFieldsWorkflowStatusItem |
39 |
from wcs.wf.export_to_model import ExportToModel, transform_to_pdf |
|
40 | 35 |
from wcs.wf.jump import JumpWorkflowStatusItem |
41 |
from wcs.wf.attachment import AddAttachmentWorkflowStatusItem |
|
42 | 36 |
from wcs.wf.form import FormWorkflowStatusItem, WorkflowFormFieldsFormDef |
43 | 37 |
from wcs.wf.register_comment import RegisterCommenterWorkflowStatusItem |
44 | 38 |
from wcs.wf.resubmit import ResubmitWorkflowStatusItem |
... | ... | |
48 | 42 |
from wcs.roles import Role, logged_users_role |
49 | 43 |
from wcs.tracking_code import TrackingCode |
50 | 44 |
from wcs.data_sources import NamedDataSource |
51 |
from wcs.wscalls import NamedWsCall |
|
52 | 45 |
from wcs import fields |
53 | 46 |
from wcs.logged_errors import LoggedError |
54 | 47 |
from wcs.forms.root import PublicFormStatusPage |
... | ... | |
3547 | 3540 |
assert 'There were errors processing the form' in resp |
3548 | 3541 | |
3549 | 3542 | |
3550 |
def test_formdata_attachment_download(pub): |
|
3551 |
create_user(pub) |
|
3552 |
wf = Workflow(name='status') |
|
3553 |
st1 = wf.add_status('Status1', 'st1') |
|
3554 |
attach = AddAttachmentWorkflowStatusItem() |
|
3555 |
attach.id = '_attach' |
|
3556 |
attach.by = ['_submitter'] |
|
3557 |
st1.items.append(attach) |
|
3558 |
attach.parent = st1 |
|
3559 |
wf.store() |
|
3560 | ||
3561 |
formdef = create_formdef() |
|
3562 |
formdef.workflow_id = wf.id |
|
3563 |
formdef.fields = [] |
|
3564 |
formdef.store() |
|
3565 |
formdef.data_class().wipe() |
|
3566 | ||
3567 |
resp = login(get_app(pub), username='foo', password='foo').get('/test/') |
|
3568 |
resp = resp.forms[0].submit('submit') |
|
3569 |
assert 'Check values then click submit.' in resp.text |
|
3570 |
resp = resp.forms[0].submit('submit') |
|
3571 |
assert resp.status_int == 302 |
|
3572 |
resp = resp.follow() |
|
3573 |
assert 'The form has been recorded' in resp.text |
|
3574 | ||
3575 |
resp.forms[0]['attachment_attach$file'] = Upload('test.txt', b'foobar', 'text/plain') |
|
3576 |
resp = resp.forms[0].submit('button_attach') |
|
3577 | ||
3578 |
assert formdef.data_class().count() == 1 |
|
3579 |
formdata = formdef.data_class().select()[0] |
|
3580 |
assert formdata.evolution[-1].parts[0].__class__.__name__ == 'AttachmentEvolutionPart' |
|
3581 |
attachment = formdata.evolution[-1].parts[0] |
|
3582 |
assert attachment.content_type == 'text/plain' |
|
3583 |
assert attachment.orig_filename == 'test.txt' |
|
3584 | ||
3585 |
resp = resp.follow() # back to form page |
|
3586 |
resp = resp.click('test.txt') |
|
3587 |
assert resp.location.endswith('/test.txt') |
|
3588 |
resp = resp.follow() |
|
3589 |
assert resp.content_type == 'text/plain' |
|
3590 |
assert resp.text == 'foobar' |
|
3591 | ||
3592 | ||
3593 |
def test_formdata_attachment_download_with_substitution_variable(pub): |
|
3594 |
create_user_and_admin(pub) |
|
3595 |
wf = Workflow(name='status') |
|
3596 |
st1 = wf.add_status('Status1', 'st1') |
|
3597 |
attach = AddAttachmentWorkflowStatusItem() |
|
3598 |
attach.varname = 'attached_doc' |
|
3599 |
attach.id = '_attach' |
|
3600 |
attach.by = ['_submitter'] |
|
3601 |
st1.items.append(attach) |
|
3602 |
attach.parent = st1 |
|
3603 |
wf.store() |
|
3604 | ||
3605 |
formdef = create_formdef() |
|
3606 |
formdef.workflow_id = wf.id |
|
3607 |
formdef.fields = [] |
|
3608 |
formdef.store() |
|
3609 |
formdef.data_class().wipe() |
|
3610 | ||
3611 |
resp = login(get_app(pub), username='foo', password='foo').get('/test/') |
|
3612 |
resp = resp.forms[0].submit('submit') |
|
3613 |
assert 'Check values then click submit.' in resp.text |
|
3614 |
resp = resp.forms[0].submit('submit') |
|
3615 |
assert resp.status_int == 302 |
|
3616 |
resp = resp.follow() |
|
3617 |
assert 'The form has been recorded' in resp.text |
|
3618 | ||
3619 |
resp.forms[0]['attachment_attach$file'] = Upload('test.txt', b'foobar', 'text/plain') |
|
3620 |
resp = resp.forms[0].submit('button_attach') |
|
3621 | ||
3622 |
assert formdef.data_class().count() == 1 |
|
3623 |
formdata = formdef.data_class().select()[0] |
|
3624 |
assert formdata.evolution[-1].parts[0].__class__.__name__ == 'AttachmentEvolutionPart' |
|
3625 |
attachment = formdata.evolution[-1].parts[0] |
|
3626 |
assert attachment.content_type == 'text/plain' |
|
3627 |
assert attachment.orig_filename == 'test.txt' |
|
3628 | ||
3629 |
resp = resp.follow() # back to form page |
|
3630 |
resp = resp.click('test.txt') |
|
3631 |
assert resp.location.endswith('/test.txt') |
|
3632 |
resp = resp.follow() |
|
3633 |
assert resp.content_type == 'text/plain' |
|
3634 |
assert resp.text == 'foobar' |
|
3635 | ||
3636 |
variables = formdef.data_class().select()[0].get_substitution_variables() |
|
3637 |
assert 'attachments' in variables |
|
3638 |
attachments = variables['attachments'] |
|
3639 |
assert attachments is not None |
|
3640 |
attachment_variable = attachments.attached_doc |
|
3641 | ||
3642 |
resp = login(get_app(pub), username='admin', password='admin').get( |
|
3643 |
attachment_variable.url).follow() |
|
3644 |
assert attachment_variable.content == resp.body |
|
3645 |
assert attachment_variable.b64_content == base64.b64encode(resp.body) |
|
3646 |
assert attachment_variable.content_type == resp._headers['content-type'].split(';')[0] |
|
3647 |
content_disposition = resp._headers['content-disposition'] |
|
3648 |
assert len(content_disposition.split(';')) == 2 |
|
3649 |
assert content_disposition.split(';')[0] == 'attachment' |
|
3650 |
assert resp.request.environ['PATH_INFO'].endswith(attachment_variable.filename) |
|
3651 | ||
3652 | ||
3653 |
def test_formdata_attachment_download_to_backoffice_file_field(pub): |
|
3654 |
create_user(pub) |
|
3655 |
wf = Workflow(name='status') |
|
3656 |
wf.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(wf) |
|
3657 |
wf.backoffice_fields_formdef.fields = [ |
|
3658 |
fields.FileField(id='bo1', label='bo field 1', type='file'), |
|
3659 |
] |
|
3660 |
st1 = wf.add_status('Status1', 'st1') |
|
3661 |
attach = AddAttachmentWorkflowStatusItem() |
|
3662 |
attach.id = '_attach' |
|
3663 |
attach.by = ['_submitter'] |
|
3664 |
attach.backoffice_filefield_id = 'bo1' |
|
3665 |
st1.items.append(attach) |
|
3666 |
attach.parent = st1 |
|
3667 |
wf.store() |
|
3668 | ||
3669 |
assert attach.get_backoffice_filefield_options() == [('bo1', 'bo field 1', 'bo1')] |
|
3670 | ||
3671 |
formdef = create_formdef() |
|
3672 |
formdef.workflow_id = wf.id |
|
3673 |
formdef.fields = [] |
|
3674 |
formdef.store() |
|
3675 |
formdef.data_class().wipe() |
|
3676 | ||
3677 |
resp = login(get_app(pub), username='foo', password='foo').get('/test/') |
|
3678 |
resp = resp.forms[0].submit('submit') |
|
3679 |
assert 'Check values then click submit.' in resp.text |
|
3680 |
resp = resp.forms[0].submit('submit') |
|
3681 |
assert resp.status_int == 302 |
|
3682 |
resp = resp.follow() |
|
3683 |
assert 'The form has been recorded' in resp.text |
|
3684 | ||
3685 |
resp.forms[0]['attachment_attach$file'] = Upload('test.txt', b'foobar', 'text/plain') |
|
3686 |
resp = resp.forms[0].submit('button_attach') |
|
3687 | ||
3688 |
# backoffice file field is set |
|
3689 |
assert formdef.data_class().count() == 1 |
|
3690 |
formdata = formdef.data_class().select()[0] |
|
3691 |
assert 'bo1' in formdata.data |
|
3692 |
bo1 = formdata.data['bo1'] |
|
3693 |
assert bo1.base_filename == 'test.txt' |
|
3694 |
assert bo1.content_type == 'text/plain' |
|
3695 |
assert bo1.get_content() == b'foobar' |
|
3696 | ||
3697 |
# and file is in history, too |
|
3698 |
assert formdata.evolution[-1].parts[0].__class__.__name__ == 'AttachmentEvolutionPart' |
|
3699 |
attachment = formdata.evolution[-1].parts[0] |
|
3700 |
assert attachment.content_type == 'text/plain' |
|
3701 |
assert attachment.orig_filename == 'test.txt' |
|
3702 | ||
3703 | ||
3704 |
def test_formdata_attachment_download_to_backoffice_file_field_only(pub): |
|
3705 |
create_user(pub) |
|
3706 |
wf = Workflow(name='status') |
|
3707 |
wf.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(wf) |
|
3708 |
wf.backoffice_fields_formdef.fields = [ |
|
3709 |
fields.FileField(id='bo1', label='bo field 1', type='file'), |
|
3710 |
] |
|
3711 |
st1 = wf.add_status('Status1', 'st1') |
|
3712 |
attach = AddAttachmentWorkflowStatusItem() |
|
3713 |
attach.id = '_attach' |
|
3714 |
attach.by = ['_submitter'] |
|
3715 |
attach.backoffice_filefield_id = 'bo1' |
|
3716 |
attach.attach_to_history = False # store only in backoffice field |
|
3717 |
st1.items.append(attach) |
|
3718 |
attach.parent = st1 |
|
3719 |
wf.store() |
|
3720 | ||
3721 |
assert attach.get_backoffice_filefield_options() == [('bo1', 'bo field 1', 'bo1')] |
|
3722 | ||
3723 |
formdef = create_formdef() |
|
3724 |
formdef.workflow_id = wf.id |
|
3725 |
formdef.fields = [] |
|
3726 |
formdef.store() |
|
3727 |
formdef.data_class().wipe() |
|
3728 | ||
3729 |
resp = login(get_app(pub), username='foo', password='foo').get('/test/') |
|
3730 |
resp = resp.forms[0].submit('submit') |
|
3731 |
assert 'Check values then click submit.' in resp.text |
|
3732 |
resp = resp.forms[0].submit('submit') |
|
3733 |
assert resp.status_int == 302 |
|
3734 |
resp = resp.follow() |
|
3735 |
assert 'The form has been recorded' in resp.text |
|
3736 | ||
3737 |
resp.forms[0]['attachment_attach$file'] = Upload('test.txt', b'foobar', 'text/plain') |
|
3738 |
resp = resp.forms[0].submit('button_attach') |
|
3739 | ||
3740 |
# backoffice file field is set |
|
3741 |
assert formdef.data_class().count() == 1 |
|
3742 |
formdata = formdef.data_class().select()[0] |
|
3743 |
assert 'bo1' in formdata.data |
|
3744 |
bo1 = formdata.data['bo1'] |
|
3745 |
assert bo1.base_filename == 'test.txt' |
|
3746 |
assert bo1.content_type == 'text/plain' |
|
3747 |
assert bo1.get_content() == b'foobar' |
|
3748 | ||
3749 |
# but nothing in history |
|
3750 |
for evo in formdata.evolution: |
|
3751 |
assert not evo.parts |
|
3752 | ||
3753 | ||
3754 |
def test_formdata_attachment_file_options(pub): |
|
3755 |
create_user(pub) |
|
3756 |
wf = Workflow(name='status') |
|
3757 |
st1 = wf.add_status('Status1', 'st1') |
|
3758 |
attach = AddAttachmentWorkflowStatusItem() |
|
3759 |
attach.id = '_attach' |
|
3760 |
attach.by = ['_submitter'] |
|
3761 |
attach.document_type = {'label': 'Fichiers vidéo', 'mimetypes': ['video/*'], 'id': '_video'} |
|
3762 |
attach.max_file_size = '3Mo' |
|
3763 | ||
3764 |
st1.items.append(attach) |
|
3765 |
attach.parent = st1 |
|
3766 |
wf.store() |
|
3767 | ||
3768 |
formdef = create_formdef() |
|
3769 |
formdef.workflow_id = wf.id |
|
3770 |
formdef.fields = [] |
|
3771 |
formdef.store() |
|
3772 |
formdef.data_class().wipe() |
|
3773 | ||
3774 |
resp = login(get_app(pub), username='foo', password='foo').get('/test/') |
|
3775 |
resp = resp.forms[0].submit('submit') |
|
3776 |
assert 'Check values then click submit.' in resp.text |
|
3777 |
resp = resp.forms[0].submit('submit') |
|
3778 |
assert resp.status_int == 302 |
|
3779 |
resp = resp.follow() |
|
3780 |
assert 'The form has been recorded' in resp.text |
|
3781 | ||
3782 |
file_input = resp.forms[0]['attachment_attach$file'] |
|
3783 |
assert file_input.attrs['accept'] == 'video/*' |
|
3784 |
assert file_input.attrs['data-max-file-size'] == '3000000' |
|
3785 | ||
3786 | ||
3787 |
def test_formdata_generated_document_download(pub): |
|
3788 |
create_user(pub) |
|
3789 |
wf = Workflow(name='status') |
|
3790 |
st1 = wf.add_status('Status1', 'st1') |
|
3791 |
export_to = ExportToModel() |
|
3792 |
export_to.convert_to_pdf = False |
|
3793 |
export_to.label = 'create doc' |
|
3794 |
upload = QuixoteUpload('/foo/test.rtf', content_type='application/rtf') |
|
3795 |
upload.fp = BytesIO() |
|
3796 |
upload.fp.write(b'HELLO WORLD') |
|
3797 |
upload.fp.seek(0) |
|
3798 |
export_to.model_file = UploadedFile(pub.app_dir, None, upload) |
|
3799 |
export_to.id = '_export_to' |
|
3800 |
export_to.by = ['_submitter'] |
|
3801 |
st1.items.append(export_to) |
|
3802 |
export_to.parent = st1 |
|
3803 |
wf.store() |
|
3804 | ||
3805 |
formdef = create_formdef() |
|
3806 |
formdef.workflow_id = wf.id |
|
3807 |
formdef.fields = [] |
|
3808 |
formdef.store() |
|
3809 |
formdef.data_class().wipe() |
|
3810 | ||
3811 |
resp = login(get_app(pub), username='foo', password='foo').get('/test/') |
|
3812 |
resp = resp.forms[0].submit('submit') |
|
3813 |
assert 'Check values then click submit.' in resp.text |
|
3814 |
resp = resp.forms[0].submit('submit') |
|
3815 |
assert resp.status_int == 302 |
|
3816 |
form_location = resp.location |
|
3817 |
resp = resp.follow() |
|
3818 |
assert 'The form has been recorded' in resp.text |
|
3819 | ||
3820 |
resp = resp.form.submit('button_export_to') |
|
3821 | ||
3822 |
resp = resp.follow() # $form/$id/create_doc |
|
3823 |
resp = resp.follow() # $form/$id/create_doc/ |
|
3824 |
assert resp.text == 'HELLO WORLD' |
|
3825 | ||
3826 |
export_to.attach_to_history = True |
|
3827 |
wf.store() |
|
3828 | ||
3829 |
resp = login(get_app(pub), username='foo', password='foo').get(form_location) |
|
3830 |
resp = resp.form.submit('button_export_to') |
|
3831 |
assert resp.location == form_location + '#action-zone' |
|
3832 |
resp = resp.follow() # back to form page |
|
3833 | ||
3834 |
resp = resp.click('test.rtf') |
|
3835 |
assert resp.location.endswith('/test.rtf') |
|
3836 |
resp = resp.follow() |
|
3837 |
assert resp.content_type == 'application/rtf' |
|
3838 |
assert resp.text == 'HELLO WORLD' |
|
3839 | ||
3840 |
# change export model to now be a RTF file, do the action again on the same form and |
|
3841 |
# check that both the old .odt file and the new .rtf file are there and valid. |
|
3842 |
upload = QuixoteUpload('/foo/test.rtf', content_type='application/rtf') |
|
3843 |
upload.fp = BytesIO() |
|
3844 |
upload.fp.write(b'HELLO NEW WORLD') |
|
3845 |
upload.fp.seek(0) |
|
3846 |
export_to.model_file = UploadedFile(pub.app_dir, None, upload) |
|
3847 |
wf.store() |
|
3848 | ||
3849 |
resp = login(get_app(pub), username='foo', password='foo').get(form_location) |
|
3850 |
resp = resp.form.submit('button_export_to') |
|
3851 |
assert resp.location == form_location + '#action-zone' |
|
3852 |
resp = resp.follow() # back to form page |
|
3853 | ||
3854 |
assert resp.click('test.rtf', index=0).follow().text == 'HELLO WORLD' |
|
3855 |
assert resp.click('test.rtf', index=1).follow().text == 'HELLO NEW WORLD' |
|
3856 | ||
3857 |
# use substitution variables on rtf: only ezt format is accepted |
|
3858 |
upload = QuixoteUpload('/foo/test.rtf', content_type='application/rtf') |
|
3859 |
upload.fp = BytesIO() |
|
3860 |
upload.fp.write(b'HELLO {{DJANGO}} WORLD [form_name]') |
|
3861 |
upload.fp.seek(0) |
|
3862 |
export_to.model_file = UploadedFile(pub.app_dir, None, upload) |
|
3863 |
wf.store() |
|
3864 | ||
3865 |
resp = login(get_app(pub), username='foo', password='foo').get(form_location) |
|
3866 |
resp = resp.form.submit('button_export_to') |
|
3867 |
assert resp.location == form_location + '#action-zone' |
|
3868 |
resp = resp.follow() |
|
3869 | ||
3870 |
assert resp.click('test.rtf', index=2).follow().text == 'HELLO {{DJANGO}} WORLD {\\uc1{test}}' |
|
3871 | ||
3872 | ||
3873 |
@pytest.fixture(params=['template.odt', 'template-django.odt']) |
|
3874 |
def odt_template(request): |
|
3875 |
return request.param |
|
3876 | ||
3877 | ||
3878 |
def test_formdata_generated_document_odt_download(pub, odt_template): |
|
3879 |
create_user(pub) |
|
3880 |
wf = Workflow(name='status') |
|
3881 |
st1 = wf.add_status('Status1', 'st1') |
|
3882 |
export_to = ExportToModel() |
|
3883 |
export_to.convert_to_pdf = False |
|
3884 |
export_to.label = 'create doc' |
|
3885 |
template_filename = os.path.join(os.path.dirname(__file__), '..', odt_template) |
|
3886 |
template = open(template_filename, 'rb').read() |
|
3887 |
upload = QuixoteUpload('/foo/' + odt_template, content_type='application/octet-stream') |
|
3888 |
upload.fp = BytesIO() |
|
3889 |
upload.fp.write(template) |
|
3890 |
upload.fp.seek(0) |
|
3891 |
export_to.model_file = UploadedFile(pub.app_dir, None, upload) |
|
3892 |
export_to.id = '_export_to' |
|
3893 |
export_to.by = ['_submitter'] |
|
3894 |
st1.items.append(export_to) |
|
3895 |
export_to.parent = st1 |
|
3896 |
wf.store() |
|
3897 | ||
3898 |
formdef = create_formdef() |
|
3899 |
formdef.workflow_id = wf.id |
|
3900 |
formdef.fields = [fields.TextField(id='0', label='comment', type='text', varname='comment')] |
|
3901 |
formdef.store() |
|
3902 |
formdef.data_class().wipe() |
|
3903 | ||
3904 |
resp = login(get_app(pub), username='foo', password='foo').get('/test/') |
|
3905 |
resp.form['f0'] = 'Hello\n\nWorld.' |
|
3906 |
resp = resp.forms[0].submit('submit') |
|
3907 |
assert 'Check values then click submit.' in resp.text |
|
3908 |
resp = resp.forms[0].submit('submit') |
|
3909 |
assert resp.status_int == 302 |
|
3910 |
form_location = resp.location |
|
3911 |
resp = resp.follow() |
|
3912 |
assert 'The form has been recorded' in resp.text |
|
3913 | ||
3914 |
resp = resp.form.submit('button_export_to') |
|
3915 | ||
3916 |
resp = resp.follow() # $form/$id/create_doc |
|
3917 |
resp = resp.follow() # $form/$id/create_doc/ |
|
3918 |
with open(os.path.join(os.path.dirname(__file__), '..', 'template-out.odt'), 'rb') as f: |
|
3919 |
assert_equal_zip(BytesIO(resp.body), f) |
|
3920 | ||
3921 |
resp = login(get_app(pub), username='foo', password='foo').get(form_location) |
|
3922 |
resp = resp.form.submit('button_export_to') |
|
3923 |
resp = resp.follow() # $form/$id/create_doc |
|
3924 |
with mock.patch('wcs.wf.export_to_model.get_formdata_template_context') as get_context_1: |
|
3925 |
with mock.patch('wcs.workflows.get_formdata_template_context') as get_context_never: |
|
3926 |
get_context_1.return_value = {} |
|
3927 |
get_context_never.return_value = {} |
|
3928 |
resp = resp.follow() # $form/$id/create_doc/ |
|
3929 |
# substitution variables are computed only one : |
|
3930 |
assert get_context_1.call_count == 1 |
|
3931 |
assert get_context_never.call_count == 0 |
|
3932 | ||
3933 |
export_to.attach_to_history = True |
|
3934 |
wf.store() |
|
3935 | ||
3936 |
resp = login(get_app(pub), username='foo', password='foo').get(form_location) |
|
3937 |
resp = resp.form.submit('button_export_to') |
|
3938 |
assert resp.location == form_location + '#action-zone' |
|
3939 |
resp = resp.follow() # back to form page |
|
3940 | ||
3941 |
resp = resp.click(odt_template) |
|
3942 |
assert resp.location.endswith('/' + odt_template) |
|
3943 |
resp = resp.follow() |
|
3944 |
assert resp.content_type == 'application/octet-stream' |
|
3945 |
with open(os.path.join(os.path.dirname(__file__), '..', 'template-out.odt'), 'rb') as f: |
|
3946 |
assert_equal_zip(BytesIO(resp.body), f) |
|
3947 | ||
3948 |
# change file content, same name |
|
3949 |
upload = QuixoteUpload('/foo/test.rtf', content_type='application/rtf') |
|
3950 |
upload.fp = BytesIO() |
|
3951 |
upload.fp.write(b'HELLO NEW WORLD') |
|
3952 |
upload.fp.seek(0) |
|
3953 |
export_to.model_file = UploadedFile(pub.app_dir, None, upload) |
|
3954 |
wf.store() |
|
3955 | ||
3956 |
resp = login(get_app(pub), username='foo', password='foo').get(form_location) |
|
3957 |
resp = resp.form.submit('button_export_to') |
|
3958 |
assert resp.location == form_location + '#action-zone' |
|
3959 |
resp = resp.follow() # back to form page |
|
3960 | ||
3961 |
with open(os.path.join(os.path.dirname(__file__), '..', 'template-out.odt'), 'rb') as f: |
|
3962 |
body = resp.click(odt_template, index=0).follow().body |
|
3963 |
assert_equal_zip(BytesIO(body), f) |
|
3964 |
assert resp.click('test.rtf', index=0).follow().body == b'HELLO NEW WORLD' |
|
3965 | ||
3966 | ||
3967 |
def test_formdata_generated_document_odt_download_with_substitution_variable(pub): |
|
3968 |
create_user_and_admin(pub) |
|
3969 |
wf = Workflow(name='status') |
|
3970 |
st1 = wf.add_status('Status1', 'st1') |
|
3971 |
export_to = ExportToModel() |
|
3972 |
export_to.convert_to_pdf = False |
|
3973 |
export_to.label = 'create doc' |
|
3974 |
export_to.varname = 'created_doc' |
|
3975 |
template_filename = os.path.join(os.path.dirname(__file__), '..', 'template.odt') |
|
3976 |
template = open(template_filename, 'rb').read() |
|
3977 |
upload = QuixoteUpload('/foo/template.odt', content_type='application/octet-stream') |
|
3978 |
upload.fp = BytesIO() |
|
3979 |
upload.fp.write(template) |
|
3980 |
upload.fp.seek(0) |
|
3981 |
export_to.model_file = UploadedFile(pub.app_dir, None, upload) |
|
3982 |
export_to.id = '_export_to' |
|
3983 |
export_to.by = ['_submitter'] |
|
3984 |
st1.items.append(export_to) |
|
3985 |
export_to.parent = st1 |
|
3986 |
wf.store() |
|
3987 | ||
3988 |
formdef = create_formdef() |
|
3989 |
formdef.workflow_id = wf.id |
|
3990 |
formdef.fields = [fields.TextField(id='0', label='comment', type='text', varname='comment')] |
|
3991 |
formdef.store() |
|
3992 |
formdef.data_class().wipe() |
|
3993 | ||
3994 |
resp = login(get_app(pub), username='foo', password='foo').get('/test/') |
|
3995 |
resp.form['f0'] = 'Hello\n\nWorld.' |
|
3996 |
resp = resp.forms[0].submit('submit') |
|
3997 |
assert 'Check values then click submit.' in resp.text |
|
3998 |
resp = resp.forms[0].submit('submit') |
|
3999 |
assert resp.status_int == 302 |
|
4000 |
form_location = resp.location |
|
4001 |
resp = resp.follow() |
|
4002 |
assert 'The form has been recorded' in resp.text |
|
4003 | ||
4004 |
resp = resp.form.submit('button_export_to') |
|
4005 | ||
4006 |
resp = resp.follow() # $form/$id/create_doc |
|
4007 |
resp = resp.follow() # $form/$id/create_doc/ |
|
4008 |
with open(os.path.join(os.path.dirname(__file__), '..', 'template-out.odt'), 'rb') as f: |
|
4009 |
assert_equal_zip(BytesIO(resp.body), f) |
|
4010 | ||
4011 |
export_to.attach_to_history = True |
|
4012 |
wf.store() |
|
4013 | ||
4014 |
resp = login(get_app(pub), username='foo', password='foo').get(form_location) |
|
4015 |
resp = resp.form.submit('button_export_to') |
|
4016 |
assert resp.location == form_location + '#action-zone' |
|
4017 |
resp = resp.follow() # back to form page |
|
4018 | ||
4019 |
resp = resp.click('template.odt') |
|
4020 |
assert resp.location.endswith('/template.odt') |
|
4021 |
response1 = resp = resp.follow() |
|
4022 |
assert resp.content_type == 'application/octet-stream' |
|
4023 |
with open(os.path.join(os.path.dirname(__file__), '..', 'template-out.odt'), 'rb') as f: |
|
4024 |
assert_equal_zip(BytesIO(resp.body), f) |
|
4025 | ||
4026 |
# change file content, same name |
|
4027 |
upload = QuixoteUpload('/foo/test.rtf', content_type='application/rtf') |
|
4028 |
upload.fp = BytesIO() |
|
4029 |
upload.fp.write(b'HELLO NEW WORLD') |
|
4030 |
upload.fp.seek(0) |
|
4031 |
export_to.model_file = UploadedFile(pub.app_dir, None, upload) |
|
4032 |
wf.store() |
|
4033 | ||
4034 |
resp = login(get_app(pub), username='foo', password='foo').get(form_location) |
|
4035 |
resp = resp.form.submit('button_export_to') |
|
4036 |
assert resp.location == form_location + '#action-zone' |
|
4037 |
resp = resp.follow() # back to form page |
|
4038 | ||
4039 |
with open(os.path.join(os.path.dirname(__file__), '..', 'template-out.odt'), 'rb') as f: |
|
4040 |
body = resp.click('template.odt', index=0).follow().body |
|
4041 |
assert_equal_zip(BytesIO(body), f) |
|
4042 |
response2 = resp.click('test.rtf', index=0).follow() |
|
4043 |
assert response2.body == b'HELLO NEW WORLD' |
|
4044 |
# Test attachment substitution variables |
|
4045 |
variables = formdef.data_class().select()[0].get_substitution_variables() |
|
4046 |
assert 'attachments' in variables |
|
4047 |
attachments = variables['attachments'] |
|
4048 |
assert attachments is not None |
|
4049 |
file1 = attachments.created_doc |
|
4050 |
assert file1.content == response2.body |
|
4051 |
assert file1.b64_content == base64.b64encode(response2.body) |
|
4052 |
assert file1.content_type == response2._headers['content-type'] |
|
4053 |
content_disposition = response2._headers['content-disposition'] |
|
4054 |
assert len(content_disposition.split(';')) == 2 |
|
4055 |
assert content_disposition.split(';')[0] == 'attachment' |
|
4056 |
assert response2.request.environ['PATH_INFO'].endswith(file1.filename) |
|
4057 | ||
4058 |
resp = login(get_app(pub), username='admin', password='admin').get(file1.url).follow() |
|
4059 |
assert file1.content == resp.body |
|
4060 |
assert file1.b64_content == base64.b64encode(resp.body) |
|
4061 |
assert file1.content_type == resp._headers['content-type'] |
|
4062 |
content_disposition = resp._headers['content-disposition'] |
|
4063 |
assert len(content_disposition.split(';')) == 2 |
|
4064 |
assert content_disposition.split(';')[0] == 'attachment' |
|
4065 |
assert resp.request.environ['PATH_INFO'].endswith(file1.filename) |
|
4066 | ||
4067 |
file2 = attachments.created_doc[1] |
|
4068 |
assert file2.content == response1.body |
|
4069 |
assert file2.b64_content == base64.b64encode(response1.body) |
|
4070 |
assert file2.content_type == response1._headers['content-type'] |
|
4071 |
content_disposition = response1._headers['content-disposition'] |
|
4072 |
assert len(content_disposition.split(';')) == 2 |
|
4073 |
assert content_disposition.split(';')[0] == 'attachment' |
|
4074 |
assert response1.request.environ['PATH_INFO'].endswith(file2.filename) |
|
4075 | ||
4076 |
resp = login(get_app(pub), username='admin', password='admin').get(file2.url).follow() |
|
4077 |
assert file2.content == resp.body |
|
4078 |
assert file2.b64_content == base64.b64encode(resp.body) |
|
4079 |
assert file2.content_type == resp._headers['content-type'] |
|
4080 |
content_disposition = resp._headers['content-disposition'] |
|
4081 |
assert len(content_disposition.split(';')) == 2 |
|
4082 |
assert content_disposition.split(';')[0] == 'attachment' |
|
4083 |
assert resp.request.environ['PATH_INFO'].endswith(file2.filename) |
|
4084 | ||
4085 | ||
4086 |
@pytest.mark.skipif(transform_to_pdf is None, reason='libreoffice not found') |
|
4087 |
def test_formdata_generated_document_odt_to_pdf_download(pub): |
|
4088 |
create_user(pub) |
|
4089 |
wf = Workflow(name='status') |
|
4090 |
st1 = wf.add_status('Status1', 'st1') |
|
4091 |
export_to = ExportToModel() |
|
4092 |
export_to.label = 'create doc' |
|
4093 |
export_to.varname = 'created_doc' |
|
4094 |
template_filename = os.path.join(os.path.dirname(__file__), '..', 'template.odt') |
|
4095 |
template = open(template_filename, 'rb').read() |
|
4096 |
upload = QuixoteUpload('/foo/template.odt', content_type='application/octet-stream') |
|
4097 |
upload.fp = BytesIO() |
|
4098 |
upload.fp.write(template) |
|
4099 |
upload.fp.seek(0) |
|
4100 |
export_to.model_file = UploadedFile(pub.app_dir, None, upload) |
|
4101 |
export_to.id = '_export_to' |
|
4102 |
export_to.by = ['_submitter'] |
|
4103 |
export_to.convert_to_pdf = True |
|
4104 |
st1.items.append(export_to) |
|
4105 |
export_to.parent = st1 |
|
4106 |
wf.store() |
|
4107 | ||
4108 |
formdef = create_formdef() |
|
4109 |
formdef.workflow_id = wf.id |
|
4110 |
formdef.fields = [] |
|
4111 |
formdef.store() |
|
4112 |
formdef.data_class().wipe() |
|
4113 | ||
4114 |
resp = login(get_app(pub), username='foo', password='foo').get('/test/') |
|
4115 |
resp = resp.forms[0].submit('submit') |
|
4116 |
assert 'Check values then click submit.' in resp.text |
|
4117 |
resp = resp.forms[0].submit('submit') |
|
4118 |
assert resp.status_int == 302 |
|
4119 |
form_location = resp.location |
|
4120 |
resp = resp.follow() |
|
4121 |
assert 'The form has been recorded' in resp.text |
|
4122 | ||
4123 |
resp = resp.form.submit('button_export_to') |
|
4124 | ||
4125 |
resp = resp.follow() # $form/$id/create_doc |
|
4126 |
resp = resp.follow() # $form/$id/create_doc/ |
|
4127 |
assert resp.content_type == 'application/pdf' |
|
4128 |
assert b'PDF' in resp.body |
|
4129 | ||
4130 |
export_to.attach_to_history = True |
|
4131 |
wf.store() |
|
4132 | ||
4133 |
resp = login(get_app(pub), username='foo', password='foo').get(form_location) |
|
4134 |
resp = resp.form.submit('button_export_to') |
|
4135 |
assert resp.location == form_location + '#action-zone' |
|
4136 |
resp = resp.follow() # back to form page |
|
4137 | ||
4138 |
resp = resp.click('template.pdf') |
|
4139 |
assert resp.location.endswith('/template.pdf') |
|
4140 |
resp = resp.follow() |
|
4141 |
assert resp.content_type == 'application/pdf' |
|
4142 |
content_disposition = resp._headers['content-disposition'] |
|
4143 |
assert len(content_disposition.split(';')) == 2 |
|
4144 |
assert content_disposition.split(';')[0] == 'inline' |
|
4145 |
assert resp.body.startswith(b'%PDF-') |
|
4146 | ||
4147 | ||
4148 |
@pytest.mark.skipif(transform_to_pdf is None, reason='libreoffice not found') |
|
4149 |
def test_formdata_generated_document_odt_to_pdf_download_push_to_portfolio(pub, fargo_url, |
|
4150 |
fargo_secret, caplog): |
|
4151 |
user = create_user(pub) |
|
4152 |
user.name = 'Foo Baré' |
|
4153 |
user.store() |
|
4154 | ||
4155 |
pub.cfg['debug'] = {'logger': True} |
|
4156 |
pub.write_cfg() |
|
4157 |
wf = Workflow(name='status') |
|
4158 |
st1 = wf.add_status('Status1', 'st1') |
|
4159 |
export_to = ExportToModel() |
|
4160 |
export_to.label = 'create doc' |
|
4161 |
export_to.varname = 'created_doc' |
|
4162 |
template_filename = os.path.join(os.path.dirname(__file__), '..', 'template.odt') |
|
4163 |
template = open(template_filename, 'rb').read() |
|
4164 |
upload = QuixoteUpload('/foo/template.odt', content_type='application/octet-stream') |
|
4165 |
upload.fp = BytesIO() |
|
4166 |
upload.fp.write(template) |
|
4167 |
upload.fp.seek(0) |
|
4168 |
export_to.model_file = UploadedFile(pub.app_dir, None, upload) |
|
4169 |
export_to.id = '_export_to' |
|
4170 |
export_to.by = ['_submitter'] |
|
4171 |
export_to.convert_to_pdf = True |
|
4172 |
export_to.push_to_portfolio = True |
|
4173 |
st1.items.append(export_to) |
|
4174 |
export_to.parent = st1 |
|
4175 |
wf.store() |
|
4176 | ||
4177 |
formdef = create_formdef() |
|
4178 |
formdef.workflow_id = wf.id |
|
4179 |
formdef.fields = [] |
|
4180 |
formdef.store() |
|
4181 |
formdef.data_class().wipe() |
|
4182 | ||
4183 |
resp = login(get_app(pub), username='foo', password='foo').get('/test/') |
|
4184 |
resp = resp.forms[0].submit('submit') |
|
4185 |
assert 'Check values then click submit.' in resp.text |
|
4186 |
resp = resp.forms[0].submit('submit') |
|
4187 |
assert resp.status_int == 302 |
|
4188 |
form_location = resp.location |
|
4189 |
resp = resp.follow() |
|
4190 |
assert 'The form has been recorded' in resp.text |
|
4191 | ||
4192 |
with mock.patch('wcs.portfolio.http_post_request') as http_post_request: |
|
4193 |
http_post_request.return_value = None, 200, 'null', None |
|
4194 |
resp = resp.form.submit('button_export_to') |
|
4195 |
assert http_post_request.call_count == 1 |
|
4196 |
if locale.getpreferredencoding() == 'UTF-8': |
|
4197 |
assert ("file 'template.pdf' pushed to portfolio of 'Foo Baré'" |
|
4198 |
== caplog.records[-1].message) |
|
4199 |
else: # Python < 3.7 |
|
4200 |
assert ("file 'template.pdf' pushed to portfolio of 'Foo Bar\xe9'" |
|
4201 |
== caplog.records[-1].message) |
|
4202 | ||
4203 |
resp = resp.follow() # $form/$id/create_doc |
|
4204 |
resp = resp.follow() # $form/$id/create_doc/ |
|
4205 |
assert resp.content_type == 'application/pdf' |
|
4206 |
assert b'PDF' in resp.body |
|
4207 | ||
4208 |
resp = login(get_app(pub), username='foo', password='foo').get(form_location) |
|
4209 |
with mock.patch('wcs.portfolio.http_post_request') as http_post_request: |
|
4210 |
http_post_request.return_value = None, 400, 'null', None # fail |
|
4211 |
resp = resp.form.submit('button_export_to') |
|
4212 |
assert http_post_request.call_count == 1 |
|
4213 |
assert caplog.records[-1].message.startswith("file 'template.pdf' failed to be pushed to portfolio of 'Foo") |
|
4214 | ||
4215 |
# failed to push to portfolio, but document is here |
|
4216 |
resp = resp.follow() # $form/$id/create_doc |
|
4217 |
resp = resp.follow() # $form/$id/create_doc/ |
|
4218 |
assert resp.content_type == 'application/pdf' |
|
4219 |
assert b'PDF' in resp.body |
|
4220 | ||
4221 |
export_to.attach_to_history = True |
|
4222 |
wf.store() |
|
4223 | ||
4224 |
resp = login(get_app(pub), username='foo', password='foo').get(form_location) |
|
4225 |
with mock.patch('wcs.portfolio.http_post_request') as http_post_request: |
|
4226 |
http_post_request.return_value = None, 200, 'null', None |
|
4227 |
resp = resp.form.submit('button_export_to') |
|
4228 |
assert http_post_request.call_count == 1 |
|
4229 |
assert http_post_request.call_args[0][0].startswith('http://fargo.example.net/api/documents/push/') |
|
4230 |
payload = json.loads(http_post_request.call_args[0][1]) |
|
4231 |
assert payload['file_name'] == 'template.pdf' |
|
4232 |
assert payload['user_email'] == 'foo@localhost' |
|
4233 |
assert payload['origin'] == 'example.net' |
|
4234 |
assert base64.decodebytes(force_bytes(payload['file_b64_content'])).startswith(b'%PDF') |
|
4235 |
assert caplog.records[-1].message.startswith("file 'template.pdf' pushed to portfolio of 'Foo") |
|
4236 |
assert resp.location == form_location + '#action-zone' |
|
4237 |
resp = resp.follow() # back to form page |
|
4238 | ||
4239 |
resp = resp.click('template.pdf') |
|
4240 |
assert resp.location.endswith('/template.pdf') |
|
4241 |
resp = resp.follow() |
|
4242 |
assert resp.content_type == 'application/pdf' |
|
4243 |
assert resp.body.startswith(b'%PDF-') |
|
4244 | ||
4245 | ||
4246 |
def test_formdata_generated_document_non_interactive(pub): |
|
4247 |
create_user(pub) |
|
4248 |
wf = Workflow(name='status') |
|
4249 |
st1 = wf.add_status('Status1', 'st1') |
|
4250 |
export_to = ExportToModel() |
|
4251 |
export_to.convert_to_pdf = False |
|
4252 |
export_to.method = 'non-interactive' |
|
4253 |
template_filename = os.path.join(os.path.dirname(__file__), '..', 'template.odt') |
|
4254 |
template = open(template_filename, 'rb').read() |
|
4255 |
upload = QuixoteUpload('/foo/template.odt', content_type='application/octet-stream') |
|
4256 |
upload.fp = BytesIO() |
|
4257 |
upload.fp.write(template) |
|
4258 |
upload.fp.seek(0) |
|
4259 |
export_to.model_file = UploadedFile(pub.app_dir, None, upload) |
|
4260 |
export_to.id = '_export_to' |
|
4261 |
export_to.attach_to_history = True |
|
4262 |
st1.items.append(export_to) |
|
4263 |
export_to.parent = st1 |
|
4264 | ||
4265 |
jump = JumpWorkflowStatusItem() |
|
4266 |
jump.status = 'st2' |
|
4267 |
st1.items.append(jump) |
|
4268 |
jump.parent = st1 |
|
4269 | ||
4270 |
st2 = wf.add_status('Status2', 'st2') |
|
4271 | ||
4272 |
wf.store() |
|
4273 | ||
4274 |
formdef = create_formdef() |
|
4275 |
formdef.workflow_id = wf.id |
|
4276 |
formdef.fields = [fields.TextField(id='0', label='comment', type='text', varname='comment')] |
|
4277 |
formdef.store() |
|
4278 |
formdef.data_class().wipe() |
|
4279 | ||
4280 |
resp = login(get_app(pub), username='foo', password='foo').get('/test/') |
|
4281 |
resp.form['f0'] = 'Hello\n\nWorld.' |
|
4282 |
resp = resp.forms[0].submit('submit') |
|
4283 |
assert 'Check values then click submit.' in resp.text |
|
4284 |
resp = resp.forms[0].submit('submit') |
|
4285 |
assert resp.status_int == 302 |
|
4286 |
form_location = resp.location |
|
4287 |
resp = resp.follow() |
|
4288 |
assert 'The form has been recorded' in resp.text |
|
4289 | ||
4290 |
resp = resp.click('template.odt') |
|
4291 |
assert resp.location.endswith('/template.odt') |
|
4292 |
resp = resp.follow() |
|
4293 |
assert resp.content_type == 'application/octet-stream' |
|
4294 |
with open(os.path.join(os.path.dirname(__file__), '..', 'template-out.odt'), 'rb') as f: |
|
4295 |
assert_equal_zip(BytesIO(resp.body), f) |
|
4296 | ||
4297 |
assert formdef.data_class().count() == 1 |
|
4298 |
assert formdef.data_class().select()[0].status == 'wf-st2' |
|
4299 | ||
4300 | ||
4301 |
def test_formdata_generated_document_to_backoffice_field(pub): |
|
4302 |
create_user_and_admin(pub) |
|
4303 |
wf = Workflow(name='status') |
|
4304 |
wf.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(wf) |
|
4305 |
wf.backoffice_fields_formdef.fields = [ |
|
4306 |
fields.FileField(id='bo1', label='bo field 1', type='file'), |
|
4307 |
fields.StringField(id='bo2', label='bo field 2', type='string'), |
|
4308 |
] |
|
4309 | ||
4310 |
st1 = wf.add_status('Status1', 'st1') |
|
4311 |
export_to = ExportToModel() |
|
4312 |
export_to.convert_to_pdf = False |
|
4313 |
export_to.method = 'non-interactive' |
|
4314 |
template_filename = os.path.join(os.path.dirname(__file__), '..', 'template.odt') |
|
4315 |
template = open(template_filename, 'rb').read() |
|
4316 |
upload = QuixoteUpload('/foo/template.odt', content_type='application/octet-stream') |
|
4317 |
upload.fp = BytesIO() |
|
4318 |
upload.fp.write(template) |
|
4319 |
upload.fp.seek(0) |
|
4320 |
export_to.model_file = UploadedFile(pub.app_dir, None, upload) |
|
4321 |
export_to.id = '_export_to' |
|
4322 |
export_to.attach_to_history = True |
|
4323 |
export_to.backoffice_filefield_id = 'bo1' |
|
4324 |
st1.items.append(export_to) |
|
4325 |
export_to.parent = st1 |
|
4326 | ||
4327 |
assert export_to.get_backoffice_filefield_options() == [('bo1', 'bo field 1', 'bo1')] |
|
4328 | ||
4329 |
jump = JumpWorkflowStatusItem() |
|
4330 |
jump.status = 'st2' |
|
4331 |
st1.items.append(jump) |
|
4332 |
jump.parent = st1 |
|
4333 |
wf.add_status('Status2', 'st2') |
|
4334 |
wf.store() |
|
4335 | ||
4336 |
formdef = create_formdef() |
|
4337 |
formdef.workflow_id = wf.id |
|
4338 |
formdef.fields = [fields.TextField(id='0', label='comment', type='text', varname='comment')] |
|
4339 |
formdef.store() |
|
4340 |
formdef.data_class().wipe() |
|
4341 | ||
4342 |
resp = login(get_app(pub), username='foo', password='foo').get('/test/') |
|
4343 |
resp.form['f0'] = 'Hello\n\nWorld.' |
|
4344 |
resp = resp.forms[0].submit('submit') |
|
4345 |
assert 'Check values then click submit.' in resp.text |
|
4346 |
resp = resp.forms[0].submit('submit') |
|
4347 |
assert resp.status_int == 302 |
|
4348 |
resp = resp.follow() |
|
4349 |
assert 'The form has been recorded' in resp.text |
|
4350 | ||
4351 |
# get the two generated files from backoffice: in backoffice fields |
|
4352 |
# (export_to.backoffice_filefield_id), and in history (export_to.attach_to_history) |
|
4353 |
for index in (0, 1): |
|
4354 |
resp = login(get_app(pub), username='admin', password='admin').get( |
|
4355 |
'/backoffice/management/test/1/') |
|
4356 |
resp = resp.click('template.odt', index=index) |
|
4357 |
assert resp.location.endswith('/template.odt') |
|
4358 |
resp = resp.follow() |
|
4359 |
assert resp.content_type == 'application/octet-stream' |
|
4360 |
with open(os.path.join(os.path.dirname(__file__), '..', 'template-out.odt'), 'rb') as f: |
|
4361 |
assert_equal_zip(BytesIO(resp.body), f) |
|
4362 | ||
4363 |
assert formdef.data_class().count() == 1 |
|
4364 |
assert formdef.data_class().select()[0].status == 'wf-st2' |
|
4365 | ||
4366 | ||
4367 |
def test_formdata_generated_document_in_private_history(pub): |
|
4368 |
user = create_user(pub) |
|
4369 | ||
4370 |
Role.wipe() |
|
4371 |
role = Role(name='xxx') |
|
4372 |
role.store() |
|
4373 | ||
4374 |
user.roles = [role.id] |
|
4375 |
user.store() |
|
4376 | ||
4377 |
wf = Workflow(name='status') |
|
4378 |
st0 = wf.add_status('Status0', 'st0') |
|
4379 |
st1 = wf.add_status('Status1', 'st1') |
|
4380 |
export_to = ExportToModel() |
|
4381 |
export_to.label = 'create doc' |
|
4382 |
upload = QuixoteUpload('/foo/test.rtf', content_type='application/rtf') |
|
4383 |
upload.fp = BytesIO() |
|
4384 |
upload.fp.write(b'HELLO WORLD') |
|
4385 |
upload.fp.seek(0) |
|
4386 |
export_to.model_file = UploadedFile(pub.app_dir, None, upload) |
|
4387 |
export_to.attach_to_history = True |
|
4388 |
export_to.id = '_export_to' |
|
4389 |
export_to.by = ['_submitter'] |
|
4390 |
st1.items.append(export_to) |
|
4391 |
export_to.parent = st1 |
|
4392 | ||
4393 |
st2 = wf.add_status('Status2', 'st2') |
|
4394 | ||
4395 |
jump1 = ChoiceWorkflowStatusItem() |
|
4396 |
jump1.id = '_jump1' |
|
4397 |
jump1.label = 'Jump 1' |
|
4398 |
jump1.by = ['_receiver'] |
|
4399 |
jump1.status = st1.id |
|
4400 |
jump1.parent = st0 |
|
4401 |
st0.items.append(jump1) |
|
4402 | ||
4403 |
jump2 = ChoiceWorkflowStatusItem() |
|
4404 |
jump2.id = '_jump2' |
|
4405 |
jump2.label = 'Jump 2' |
|
4406 |
jump2.by = ['_receiver'] |
|
4407 |
jump2.status = st2.id |
|
4408 |
jump2.parent = st1 |
|
4409 |
st1.items.append(jump2) |
|
4410 | ||
4411 |
wf.store() |
|
4412 | ||
4413 |
formdef = create_formdef() |
|
4414 |
formdef.workflow_id = wf.id |
|
4415 |
formdef.workflow_roles = {'_receiver': role.id} |
|
4416 |
formdef.fields = [] |
|
4417 |
formdef.store() |
|
4418 |
formdef.data_class().wipe() |
|
4419 | ||
4420 |
resp = login(get_app(pub), username='foo', password='foo').get('/test/') |
|
4421 |
resp = resp.forms[0].submit('submit') |
|
4422 |
assert 'Check values then click submit.' in resp.text |
|
4423 |
resp = resp.forms[0].submit('submit') |
|
4424 |
assert resp.status_int == 302 |
|
4425 |
form_location = resp.location |
|
4426 |
resp = resp.follow() |
|
4427 |
assert 'The form has been recorded' in resp.text |
|
4428 | ||
4429 |
resp = resp.form.submit('button_jump1') |
|
4430 |
resp = resp.follow() |
|
4431 | ||
4432 |
resp = resp.form.submit('button_export_to') |
|
4433 |
resp = resp.follow() |
|
4434 |
assert 'Form exported in a model' in resp.text |
|
4435 | ||
4436 |
resp = resp.form.submit('button_jump2') |
|
4437 |
resp = resp.follow() |
|
4438 | ||
4439 |
# limit visibility of status with document |
|
4440 |
st1.visibility = ['_receiver'] |
|
4441 |
wf.store() |
|
4442 | ||
4443 |
formdata = formdef.data_class().select()[0] |
|
4444 |
resp = login(get_app(pub), username='foo', password='foo').get(formdata.get_url()) |
|
4445 |
assert not 'Form exported in a model' in resp.text |
|
4446 | ||
4447 |
# check status is visible in backoffice |
|
4448 |
resp = login(get_app(pub), username='foo', password='foo').get(formdata.get_url(backoffice=True)) |
|
4449 |
assert 'visibility-off' in resp.text |
|
4450 |
assert 'Form exported in a model' in resp.text |
|
4451 | ||
4452 | ||
4453 |
def test_formdata_form_file_download(pub): |
|
4454 |
create_user(pub) |
|
4455 |
wf = Workflow(name='status') |
|
4456 |
st1 = wf.add_status('Status1', 'st1') |
|
4457 | ||
4458 |
display_form = FormWorkflowStatusItem() |
|
4459 |
display_form.id = '_x' |
|
4460 |
display_form.by = ['_submitter'] |
|
4461 |
display_form.varname = 'xxx' |
|
4462 |
display_form.formdef = WorkflowFormFieldsFormDef(item=display_form) |
|
4463 |
display_form.formdef.fields.append(fields.FileField(id='1', label='File', |
|
4464 |
type='file', varname='yyy')) |
|
4465 |
st1.items.append(display_form) |
|
4466 |
display_form.parent = st1 |
|
4467 | ||
4468 |
wf.store() |
|
4469 | ||
4470 |
formdef = create_formdef() |
|
4471 |
formdef.workflow_id = wf.id |
|
4472 |
formdef.fields = [] |
|
4473 |
formdef.store() |
|
4474 |
formdef.data_class().wipe() |
|
4475 | ||
4476 |
resp = login(get_app(pub), username='foo', password='foo').get('/test/') |
|
4477 |
resp = resp.forms[0].submit('submit') |
|
4478 |
assert 'Check values then click submit.' in resp.text |
|
4479 |
resp = resp.forms[0].submit('submit') |
|
4480 |
assert resp.status_int == 302 |
|
4481 |
resp = resp.follow() |
|
4482 |
assert 'The form has been recorded' in resp.text |
|
4483 | ||
4484 |
assert 'qommon.fileupload.js' in resp.text |
|
4485 |
resp.forms[0]['f1$file'] = Upload('test.txt', b'foobar', 'text/plain') |
|
4486 |
resp = resp.forms[0].submit('submit') |
|
4487 | ||
4488 |
assert formdef.data_class().count() == 1 |
|
4489 |
formdata = formdef.data_class().select()[0] |
|
4490 |
assert 'xxx_var_yyy_raw' in formdata.workflow_data |
|
4491 | ||
4492 |
download = resp.test_app.get(urlparse.urljoin(resp.location, 'files/form-xxx-yyy/test.txt')) |
|
4493 |
assert download.content_type == 'text/plain' |
|
4494 |
assert download.body == b'foobar' |
|
4495 | ||
4496 |
# go back to the status page, this will exercise the substitution variables |
|
4497 |
# codepath. |
|
4498 |
resp = resp.follow() |
|
4499 | ||
4500 | ||
4501 |
def test_formdata_workflow_form_prefill(pub): |
|
4502 |
create_user(pub) |
|
4503 |
wf = Workflow(name='status') |
|
4504 |
st1 = wf.add_status('Status1', 'st1') |
|
4505 | ||
4506 |
display_form = FormWorkflowStatusItem() |
|
4507 |
display_form.id = '_x' |
|
4508 |
display_form.by = ['_submitter'] |
|
4509 |
display_form.varname = 'xxx' |
|
4510 |
display_form.formdef = WorkflowFormFieldsFormDef(item=display_form) |
|
4511 |
display_form.formdef.fields.append(fields.StringField(id='1', label='blah', |
|
4512 |
type='string', varname='yyy', prefill={'type': 'user', 'value': 'email'})) |
|
4513 |
st1.items.append(display_form) |
|
4514 |
display_form.parent = st1 |
|
4515 | ||
4516 |
wf.store() |
|
4517 | ||
4518 |
formdef = create_formdef() |
|
4519 |
formdef.workflow_id = wf.id |
|
4520 |
formdef.fields = [] |
|
4521 |
formdef.store() |
|
4522 |
formdef.data_class().wipe() |
|
4523 | ||
4524 |
resp = login(get_app(pub), username='foo', password='foo').get('/test/') |
|
4525 |
resp = resp.forms[0].submit('submit') |
|
4526 |
assert 'Check values then click submit.' in resp.text |
|
4527 |
resp = resp.forms[0].submit('submit') |
|
4528 |
assert resp.status_int == 302 |
|
4529 |
resp = resp.follow() |
|
4530 |
assert 'The form has been recorded' in resp.text |
|
4531 |
assert resp.forms[0]['f1'].value == 'foo@localhost' |
|
4532 | ||
4533 | ||
4534 |
def test_formdata_workflow_form_prefill_conditional_field(pub): |
|
4535 |
create_user(pub) |
|
4536 |
wf = Workflow(name='status') |
|
4537 |
st1 = wf.add_status('Status1', 'st1') |
|
4538 | ||
4539 |
display_form = FormWorkflowStatusItem() |
|
4540 |
display_form.id = '_x' |
|
4541 |
display_form.by = ['_submitter'] |
|
4542 |
display_form.varname = 'xxx' |
|
4543 |
display_form.formdef = WorkflowFormFieldsFormDef(item=display_form) |
|
4544 |
display_form.formdef.fields.append(fields.StringField(id='1', label='blah1', |
|
4545 |
type='string', varname='yyy1', prefill={'type': 'user', 'value': 'email'}, |
|
4546 |
condition={'type': 'django', 'value': '0'})) |
|
4547 |
display_form.formdef.fields.append(fields.StringField(id='2', label='blah2', |
|
4548 |
type='string', varname='yyy2', prefill={'type': 'user', 'value': 'email'}, |
|
4549 |
condition={'type': 'django', 'value': '1'})) |
|
4550 |
st1.items.append(display_form) |
|
4551 |
display_form.parent = st1 |
|
4552 | ||
4553 |
wf.store() |
|
4554 | ||
4555 |
formdef = create_formdef() |
|
4556 |
formdef.workflow_id = wf.id |
|
4557 |
formdef.fields = [] |
|
4558 |
formdef.store() |
|
4559 |
formdef.data_class().wipe() |
|
4560 | ||
4561 |
resp = login(get_app(pub), username='foo', password='foo').get('/test/') |
|
4562 |
resp = resp.forms[0].submit('submit') |
|
4563 |
assert 'Check values then click submit.' in resp.text |
|
4564 |
resp = resp.forms[0].submit('submit') |
|
4565 |
assert resp.status_int == 302 |
|
4566 |
resp = resp.follow() |
|
4567 |
assert 'The form has been recorded' in resp.text |
|
4568 |
assert resp.forms[0]['f2'].value == 'foo@localhost' |
|
4569 | ||
4570 | ||
4571 |
def test_formdata_workflow_form_prefill_checkbox(pub): |
|
4572 |
create_user(pub) |
|
4573 |
wf = Workflow(name='status') |
|
4574 |
st1 = wf.add_status('Status1', 'st1') |
|
4575 | ||
4576 |
display_form = FormWorkflowStatusItem() |
|
4577 |
display_form.id = '_x' |
|
4578 |
display_form.by = ['_submitter'] |
|
4579 |
display_form.varname = 'xxx' |
|
4580 |
display_form.formdef = WorkflowFormFieldsFormDef(item=display_form) |
|
4581 |
display_form.formdef.fields.append(fields.BoolField(id='1', label='blah', |
|
4582 |
type='bool', varname='yyy', prefill={'type': 'formula', 'value': 'True'})) |
|
4583 |
display_form.formdef.fields.append(fields.BoolField(id='2', label='blah2', |
|
4584 |
type='bool', varname='zzz', prefill={'type': 'formula', 'value': 'True'})) |
|
4585 |
st1.items.append(display_form) |
|
4586 |
display_form.parent = st1 |
|
4587 | ||
4588 |
wf.store() |
|
4589 | ||
4590 |
formdef = create_formdef() |
|
4591 |
formdef.workflow_id = wf.id |
|
4592 |
formdef.fields = [] |
|
4593 |
formdef.store() |
|
4594 |
formdef.data_class().wipe() |
|
4595 | ||
4596 |
resp = login(get_app(pub), username='foo', password='foo').get('/test/') |
|
4597 |
resp = resp.form.submit('submit') |
|
4598 |
assert 'Check values then click submit.' in resp |
|
4599 |
resp = resp.form.submit('submit').follow() |
|
4600 |
assert 'The form has been recorded' in resp |
|
4601 |
assert resp.form['f1'].checked is True |
|
4602 |
assert resp.form['f2'].checked is True |
|
4603 |
resp.form['f1'].checked = False |
|
4604 |
resp = resp.form.submit('submit') |
|
4605 | ||
4606 |
formdata = formdef.data_class().select()[0] |
|
4607 |
assert formdata.workflow_data['xxx_var_yyy_raw'] is False |
|
4608 |
assert formdata.workflow_data['xxx_var_zzz_raw'] is True |
|
4609 | ||
4610 | ||
4611 |
def test_formdata_workflow_form_prefill_autocomplete(pub): |
|
4612 |
create_user(pub) |
|
4613 | ||
4614 |
NamedDataSource.wipe() |
|
4615 |
data_source = NamedDataSource(name='foobar') |
|
4616 |
data_source.data_source = {'type': 'json', 'value': 'http://local-mock/test'} |
|
4617 |
data_source.query_parameter = 'q' |
|
4618 |
data_source.id_parameter = 'id' |
|
4619 |
data_source.store() |
|
4620 | ||
4621 |
wf = Workflow(name='status') |
|
4622 |
st1 = wf.add_status('Status1', 'st1') |
|
4623 | ||
4624 |
display_form = FormWorkflowStatusItem() |
|
4625 |
display_form.id = '_x' |
|
4626 |
display_form.by = ['_submitter'] |
|
4627 |
display_form.varname = 'xxx' |
|
4628 |
display_form.formdef = WorkflowFormFieldsFormDef(item=display_form) |
|
4629 |
display_form.formdef.fields = [ |
|
4630 |
fields.ItemField(id='4', label='string', type='item', |
|
4631 |
data_source={'type': 'foobar'}, |
|
4632 |
required=False, |
|
4633 |
display_mode='autocomplete', |
|
4634 |
prefill={'type': 'string', 'value': '{{ form_var_foo_raw }}'}, |
|
4635 |
), |
|
4636 |
] |
|
4637 |
st1.items.append(display_form) |
|
4638 |
display_form.parent = st1 |
|
4639 | ||
4640 |
wf.store() |
|
4641 | ||
4642 |
formdef = create_formdef() |
|
4643 |
formdef.workflow_id = wf.id |
|
4644 |
formdef.fields = [ |
|
4645 |
fields.ItemField(id='0', label='string', type='item', |
|
4646 |
data_source={'type': 'foobar'}, |
|
4647 |
display_mode='autocomplete', |
|
4648 |
required=False, |
|
4649 |
varname='foo', |
|
4650 |
), |
|
4651 |
] |
|
4652 |
formdef.store() |
|
4653 |
formdef.data_class().wipe() |
|
4654 | ||
4655 |
resp = login(get_app(pub), username='foo', password='foo').get('/test/') |
|
4656 |
with mock.patch('wcs.qommon.misc.urlopen') as urlopen: |
|
4657 |
data = {'data': [{'id': '1', 'text': 'hello', 'extra': 'foo'}]} |
|
4658 | ||
4659 |
def side_effect(url, *args): |
|
4660 |
return StringIO(json.dumps(data)) |
|
4661 | ||
4662 |
urlopen.side_effect = side_effect |
|
4663 | ||
4664 |
assert 'data-select2-url=' in resp |
|
4665 |
# simulate select2 |
|
4666 |
resp.form.fields['f0_display'] = Hidden(form=resp.form, tag='input', name='f0_display', pos=10) |
|
4667 |
resp.form['f0'].force_value('1') |
|
4668 |
resp.form.fields['f0_display'].force_value('foo') |
|
4669 |
resp = resp.form.submit('submit') |
|
4670 |
assert 'Check values then click submit.' in resp |
|
4671 |
resp = resp.form.submit('submit').follow() |
|
4672 |
assert 'The form has been recorded' in resp |
|
4673 | ||
4674 |
# check display value is in form action widget |
|
4675 |
assert resp.form['f4'].attrs['data-value'] == '1' |
|
4676 |
assert resp.form['f4'].attrs['data-initial-display-value'] == 'hello' |
|
4677 | ||
4678 |
# check it is also displayed in a fresh session |
|
4679 |
resp = login(get_app(pub), username='foo', password='foo').get(resp.request.url) |
|
4680 |
assert resp.form['f4'].attrs['data-value'] == '1' |
|
4681 |
assert resp.form['f4'].attrs['data-initial-display-value'] == 'hello' |
|
4682 | ||
4683 | ||
4684 | 3543 |
def test_form_map_field_back_and_submit(pub): |
4685 | 3544 |
formdef = create_formdef() |
4686 | 3545 |
formdef.fields = [ |
... | ... | |
6760 | 5619 |
assert len(LoggedError.get_ids_with_indexed_value('workflow_id', 'X')) == 0 |
6761 | 5620 | |
6762 | 5621 | |
6763 |
def test_formdata_named_wscall(http_requests, pub): |
|
6764 |
create_user(pub) |
|
6765 |
NamedWsCall.wipe() |
|
6766 | ||
6767 |
wscall = NamedWsCall() |
|
6768 |
wscall.name = 'Hello world' |
|
6769 |
wscall.request = {'url': 'http://remote.example.net/json'} |
|
6770 |
wscall.store() |
|
6771 |
assert wscall.slug == 'hello_world' |
|
6772 | ||
6773 |
wf = Workflow(name='status') |
|
6774 |
st1 = wf.add_status('Status1', 'st1') |
|
6775 |
comment = RegisterCommenterWorkflowStatusItem() |
|
6776 |
comment.id = '_comment' |
|
6777 |
comment.comment = 'Hello [webservice.hello_world.foo] World' |
|
6778 |
st1.items.append(comment) |
|
6779 |
comment.parent = st1 |
|
6780 | ||
6781 |
display = DisplayMessageWorkflowStatusItem() |
|
6782 |
display.message = 'The form has been recorded and: X[webservice.hello_world.foo]Y' |
|
6783 |
display.to = [] |
|
6784 |
st1.items.append(display) |
|
6785 |
display.parent = st1 |
|
6786 | ||
6787 |
wf.store() |
|
6788 | ||
6789 |
formdef = create_formdef() |
|
6790 |
formdef.workflow_id = wf.id |
|
6791 |
formdef.fields = [] |
|
6792 |
formdef.store() |
|
6793 |
formdef.data_class().wipe() |
|
6794 | ||
6795 |
resp = login(get_app(pub), username='foo', password='foo').get('/test/') |
|
6796 |
resp = resp.forms[0].submit('submit') |
|
6797 |
assert 'Check values then click submit.' in resp.text |
|
6798 |
resp = resp.forms[0].submit('submit') |
|
6799 |
assert resp.status_int == 302 |
|
6800 |
resp = resp.follow() |
|
6801 |
assert 'The form has been recorded and: XbarY' in resp.text |
|
6802 | ||
6803 |
formdata = formdef.data_class().select()[0] |
|
6804 |
assert formdata.evolution[0].parts[0].content == 'Hello bar World' |
|
6805 | ||
6806 |
# check with publisher variable in named webservice call |
|
6807 |
if not pub.site_options.has_section('variables'): |
|
6808 |
pub.site_options.add_section('variables') |
|
6809 |
pub.site_options.set('variables', 'example_url', 'http://remote.example.net/') |
|
6810 |
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd: |
|
6811 |
pub.site_options.write(fd) |
|
6812 | ||
6813 |
wscall = NamedWsCall() |
|
6814 |
wscall.name = 'Hello world' |
|
6815 |
wscall.request = {'url': '[example_url]json'} |
|
6816 |
wscall.store() |
|
6817 | ||
6818 |
formdef.data_class().wipe() |
|
6819 | ||
6820 |
resp = login(get_app(pub), username='foo', password='foo').get('/test/') |
|
6821 |
resp = resp.forms[0].submit('submit') |
|
6822 |
assert 'Check values then click submit.' in resp.text |
|
6823 |
resp = resp.forms[0].submit('submit') |
|
6824 |
assert resp.status_int == 302 |
|
6825 |
resp = resp.follow() |
|
6826 |
assert 'The form has been recorded and: XbarY' in resp.text |
|
6827 | ||
6828 |
formdata = formdef.data_class().select()[0] |
|
6829 |
assert formdata.evolution[0].parts[0].content == 'Hello bar World' |
|
6830 | ||
6831 | ||
6832 |
def test_formdata_named_wscall_in_conditions(http_requests, pub): |
|
6833 |
create_user(pub) |
|
6834 |
NamedWsCall.wipe() |
|
6835 | ||
6836 |
wscall = NamedWsCall() |
|
6837 |
wscall.name = 'Hello world' |
|
6838 |
wscall.request = {'url': 'http://remote.example.net/json', 'method': 'GET'} |
|
6839 |
wscall.store() |
|
6840 |
assert wscall.slug == 'hello_world' |
|
6841 | ||
6842 |
formdef = create_formdef() |
|
6843 |
formdef.fields = [ |
|
6844 |
fields.PageField(id='0', label='1st page', type='page'), |
|
6845 |
fields.PageField(id='1', label='2nd page', type='page', |
|
6846 |
condition={'type': 'python', 'value': 'webservice.hello_world["foo"] == "bar"'}), |
|
6847 |
fields.PageField(id='1', label='3rd page', type='page', |
|
6848 |
condition={'type': 'python', 'value': 'webservice.hello_world["foo"] != "bar"'}), |
|
6849 |
fields.PageField(id='1', label='4th page', type='page', |
|
6850 |
condition={'type': 'python', 'value': 'webservice.hello_world["foo"] == "bar"'}), |
|
6851 |
] |
|
6852 |
formdef.store() |
|
6853 |
formdef.data_class().wipe() |
|
6854 | ||
6855 |
resp = login(get_app(pub), username='foo', password='foo').get('/test/') |
|
6856 |
assert '>1st page<' in resp.text |
|
6857 |
assert '>2nd page<' in resp.text |
|
6858 |
assert '>3rd page<' not in resp.text |
|
6859 |
assert '>4th page<' in resp.text |
|
6860 |
assert len(http_requests.requests) == 1 |
|
6861 | ||
6862 | ||
6863 | 5622 |
def test_resubmit(pub): |
6864 | 5623 |
user = create_user(pub) |
6865 | 5624 |
tests/form_pages/test_block.py | ||
---|---|---|
3 | 3 |
import pytest |
4 | 4 |
from webtest import Upload |
5 | 5 | |
6 |
from wcs.qommon.ident.password_accounts import PasswordAccount |
|
7 | 6 |
from wcs.blocks import BlockDef |
8 | 7 |
from wcs.formdef import FormDef |
9 | 8 |
from wcs.workflows import Workflow |
... | ... | |
13 | 12 |
from wcs import fields |
14 | 13 | |
15 | 14 |
from utilities import get_app, login, create_temporary_pub, clean_temporary_pub |
15 |
from .test_all import create_user |
|
16 | 16 | |
17 | 17 | |
18 | 18 |
def pytest_generate_tests(metafunc): |
... | ... | |
42 | 42 |
clean_temporary_pub() |
43 | 43 | |
44 | 44 | |
45 |
def create_user(pub): |
|
46 |
pub.user_class.wipe() |
|
47 |
PasswordAccount.wipe() |
|
48 | ||
49 |
user = pub.user_class() |
|
50 |
user.name = 'User Name' |
|
51 |
user.email = 'foo@localhost' |
|
52 |
user.store() |
|
53 |
account = PasswordAccount(id='foo') |
|
54 |
account.set_password('foo') |
|
55 |
account.user_id = user.id |
|
56 |
account.store() |
|
57 |
return user |
|
58 | ||
59 | ||
60 | 45 |
def test_block_simple(pub, blocks_feature): |
61 | 46 |
FormDef.wipe() |
62 | 47 |
BlockDef.wipe() |
tests/form_pages/test_formdata.py | ||
---|---|---|
1 |
# -*- coding: utf-8 -*- |
|
2 |
import base64 |
|
3 |
import json |
|
4 |
import locale |
|
5 |
import os |
|
6 |
import xml.etree.ElementTree as ET |
|
7 |
import zipfile |
|
8 | ||
9 |
import mock |
|
10 |
import pytest |
|
11 |
from django.utils.encoding import force_bytes |
|
12 |
from django.utils.six import BytesIO, StringIO |
|
13 |
from django.utils.six.moves.urllib import parse as urlparse |
|
14 |
from quixote.http_request import Upload as QuixoteUpload |
|
15 |
from webtest import Upload, Hidden |
|
16 | ||
17 |
from wcs import fields |
|
18 |
from wcs.data_sources import NamedDataSource |
|
19 |
from wcs.formdef import FormDef |
|
20 |
from wcs.qommon.form import UploadedFile |
|
21 |
from wcs.roles import Role |
|
22 |
from wcs.wf.attachment import AddAttachmentWorkflowStatusItem |
|
23 |
from wcs.wf.export_to_model import ExportToModel, transform_to_pdf |
|
24 |
from wcs.wf.form import FormWorkflowStatusItem, WorkflowFormFieldsFormDef |
|
25 |
from wcs.wf.jump import JumpWorkflowStatusItem |
|
26 |
from wcs.wf.register_comment import RegisterCommenterWorkflowStatusItem |
|
27 |
from wcs.workflows import (ChoiceWorkflowStatusItem, |
|
28 |
DisplayMessageWorkflowStatusItem, Workflow, |
|
29 |
WorkflowBackofficeFieldsFormDef) |
|
30 |
from wcs.wscalls import NamedWsCall |
|
31 | ||
32 |
from .test_all import create_user, create_user_and_admin |
|
33 |
from utilities import clean_temporary_pub, create_temporary_pub, get_app, login |
|
34 | ||
35 | ||
36 |
def pytest_generate_tests(metafunc): |
|
37 |
if 'pub' in metafunc.fixturenames: |
|
38 |
metafunc.parametrize('pub', ['pickle', 'sql', 'pickle-templates', 'pickle-lazy'], indirect=True) |
|
39 | ||
40 | ||
41 |
@pytest.fixture |
|
42 |
def pub(request, emails): |
|
43 |
pub = create_temporary_pub( |
|
44 |
sql_mode=bool('sql' in request.param), |
|
45 |
templates_mode=bool('templates' in request.param), |
|
46 |
lazy_mode=bool('lazy' in request.param), |
|
47 |
) |
|
48 |
pub.cfg['identification'] = {'methods': ['password']} |
|
49 |
pub.cfg['language'] = {'language': 'en'} |
|
50 |
pub.write_cfg() |
|
51 | ||
52 |
return pub |
|
53 | ||
54 | ||
55 |
def teardown_module(module): |
|
56 |
clean_temporary_pub() |
|
57 | ||
58 | ||
59 |
def assert_equal_zip(stream1, stream2): |
|
60 |
z1 = zipfile.ZipFile(stream1) |
|
61 |
z2 = zipfile.ZipFile(stream2) |
|
62 |
assert set(z1.namelist()) == set(z2.namelist()) |
|
63 |
for name in z1.namelist(): |
|
64 |
if name == 'styles.xml': |
|
65 |
continue |
|
66 |
if name in ['content.xml', 'meta.xml']: |
|
67 |
t1, t2 = ET.tostring(ET.XML(z1.read(name))), ET.tostring(ET.XML(z2.read(name))) |
|
68 |
try: |
|
69 |
# >= python 3.8: tostring preserves attribute order; use canonicalize to sort them |
|
70 |
t1, t2 = ET.canonicalize(t1), ET.canonicalize(t2) |
|
71 |
except AttributeError: |
|
72 |
pass |
|
73 |
else: |
|
74 |
t1, t2 = z1.read(name), z2.read(name) |
|
75 |
assert t1 == t2, 'file "%s" differs' % name |
|
76 | ||
77 | ||
78 |
def test_formdata_attachment_download(pub): |
|
79 |
create_user(pub) |
|
80 |
wf = Workflow(name='status') |
|
81 |
st1 = wf.add_status('Status1', 'st1') |
|
82 |
attach = AddAttachmentWorkflowStatusItem() |
|
83 |
attach.id = '_attach' |
|
84 |
attach.by = ['_submitter'] |
|
85 |
st1.items.append(attach) |
|
86 |
attach.parent = st1 |
|
87 |
wf.store() |
|
88 | ||
89 |
FormDef.wipe() |
|
90 |
formdef = FormDef() |
|
91 |
formdef.name = 'test' |
|
92 |
formdef.workflow_id = wf.id |
|
93 |
formdef.fields = [] |
|
94 |
formdef.store() |
|
95 |
formdef.data_class().wipe() |
|
96 | ||
97 |
resp = login(get_app(pub), username='foo', password='foo').get('/test/') |
|
98 |
resp = resp.forms[0].submit('submit') |
|
99 |
assert 'Check values then click submit.' in resp.text |
|
100 |
resp = resp.forms[0].submit('submit') |
|
101 |
assert resp.status_int == 302 |
|
102 |
resp = resp.follow() |
|
103 |
assert 'The form has been recorded' in resp.text |
|
104 | ||
105 |
resp.forms[0]['attachment_attach$file'] = Upload('test.txt', b'foobar', 'text/plain') |
|
106 |
resp = resp.forms[0].submit('button_attach') |
|
107 | ||
108 |
assert formdef.data_class().count() == 1 |
|
109 |
formdata = formdef.data_class().select()[0] |
|
110 |
assert formdata.evolution[-1].parts[0].__class__.__name__ == 'AttachmentEvolutionPart' |
|
111 |
attachment = formdata.evolution[-1].parts[0] |
|
112 |
assert attachment.content_type == 'text/plain' |
|
113 |
assert attachment.orig_filename == 'test.txt' |
|
114 | ||
115 |
resp = resp.follow() # back to form page |
|
116 |
resp = resp.click('test.txt') |
|
117 |
assert resp.location.endswith('/test.txt') |
|
118 |
resp = resp.follow() |
|
119 |
assert resp.content_type == 'text/plain' |
|
120 |
assert resp.text == 'foobar' |
|
121 | ||
122 | ||
123 |
def test_formdata_attachment_download_with_substitution_variable(pub): |
|
124 |
create_user_and_admin(pub) |
|
125 |
wf = Workflow(name='status') |
|
126 |
st1 = wf.add_status('Status1', 'st1') |
|
127 |
attach = AddAttachmentWorkflowStatusItem() |
|
128 |
attach.varname = 'attached_doc' |
|
129 |
attach.id = '_attach' |
|
130 |
attach.by = ['_submitter'] |
|
131 |
st1.items.append(attach) |
|
132 |
attach.parent = st1 |
|
133 |
wf.store() |
|
134 | ||
135 |
FormDef.wipe() |
|
136 |
formdef = FormDef() |
|
137 |
formdef.name = 'test' |
|
138 |
formdef.workflow_id = wf.id |
|
139 |
formdef.fields = [] |
|
140 |
formdef.store() |
|
141 |
formdef.data_class().wipe() |
|
142 | ||
143 |
resp = login(get_app(pub), username='foo', password='foo').get('/test/') |
|
144 |
resp = resp.forms[0].submit('submit') |
|
145 |
assert 'Check values then click submit.' in resp.text |
|
146 |
resp = resp.forms[0].submit('submit') |
|
147 |
assert resp.status_int == 302 |
|
148 |
resp = resp.follow() |
|
149 |
assert 'The form has been recorded' in resp.text |
|
150 | ||
151 |
resp.forms[0]['attachment_attach$file'] = Upload('test.txt', b'foobar', 'text/plain') |
|
152 |
resp = resp.forms[0].submit('button_attach') |
|
153 | ||
154 |
assert formdef.data_class().count() == 1 |
|
155 |
formdata = formdef.data_class().select()[0] |
|
156 |
assert formdata.evolution[-1].parts[0].__class__.__name__ == 'AttachmentEvolutionPart' |
|
157 |
attachment = formdata.evolution[-1].parts[0] |
|
158 |
assert attachment.content_type == 'text/plain' |
|
159 |
assert attachment.orig_filename == 'test.txt' |
|
160 | ||
161 |
resp = resp.follow() # back to form page |
|
162 |
resp = resp.click('test.txt') |
|
163 |
assert resp.location.endswith('/test.txt') |
|
164 |
resp = resp.follow() |
|
165 |
assert resp.content_type == 'text/plain' |
|
166 |
assert resp.text == 'foobar' |
|
167 | ||
168 |
variables = formdef.data_class().select()[0].get_substitution_variables() |
|
169 |
assert 'attachments' in variables |
|
170 |
attachments = variables['attachments'] |
|
171 |
assert attachments is not None |
|
172 |
attachment_variable = attachments.attached_doc |
|
173 | ||
174 |
resp = login(get_app(pub), username='admin', password='admin').get( |
|
175 |
attachment_variable.url).follow() |
|
176 |
assert attachment_variable.content == resp.body |
|
177 |
assert attachment_variable.b64_content == base64.b64encode(resp.body) |
|
178 |
assert attachment_variable.content_type == resp._headers['content-type'].split(';')[0] |
|
179 |
content_disposition = resp._headers['content-disposition'] |
|
180 |
assert len(content_disposition.split(';')) == 2 |
|
181 |
assert content_disposition.split(';')[0] == 'attachment' |
|
182 |
assert resp.request.environ['PATH_INFO'].endswith(attachment_variable.filename) |
|
183 | ||
184 | ||
185 |
def test_formdata_attachment_download_to_backoffice_file_field(pub): |
|
186 |
create_user(pub) |
|
187 |
wf = Workflow(name='status') |
|
188 |
wf.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(wf) |
|
189 |
wf.backoffice_fields_formdef.fields = [ |
|
190 |
fields.FileField(id='bo1', label='bo field 1', type='file'), |
|
191 |
] |
|
192 |
st1 = wf.add_status('Status1', 'st1') |
|
193 |
attach = AddAttachmentWorkflowStatusItem() |
|
194 |
attach.id = '_attach' |
|
195 |
attach.by = ['_submitter'] |
|
196 |
attach.backoffice_filefield_id = 'bo1' |
|
197 |
st1.items.append(attach) |
|
198 |
attach.parent = st1 |
|
199 |
wf.store() |
|
200 | ||
201 |
assert attach.get_backoffice_filefield_options() == [('bo1', 'bo field 1', 'bo1')] |
|
202 | ||
203 |
FormDef.wipe() |
|
204 |
formdef = FormDef() |
|
205 |
formdef.name = 'test' |
|
206 |
formdef.workflow_id = wf.id |
|
207 |
formdef.fields = [] |
|
208 |
formdef.store() |
|
209 |
formdef.data_class().wipe() |
|
210 | ||
211 |
resp = login(get_app(pub), username='foo', password='foo').get('/test/') |
|
212 |
resp = resp.forms[0].submit('submit') |
|
213 |
assert 'Check values then click submit.' in resp.text |
|
214 |
resp = resp.forms[0].submit('submit') |
|
215 |
assert resp.status_int == 302 |
|
216 |
resp = resp.follow() |
|
217 |
assert 'The form has been recorded' in resp.text |
|
218 | ||
219 |
resp.forms[0]['attachment_attach$file'] = Upload('test.txt', b'foobar', 'text/plain') |
|
220 |
resp = resp.forms[0].submit('button_attach') |
|
221 | ||
222 |
# backoffice file field is set |
|
223 |
assert formdef.data_class().count() == 1 |
|
224 |
formdata = formdef.data_class().select()[0] |
|
225 |
assert 'bo1' in formdata.data |
|
226 |
bo1 = formdata.data['bo1'] |
|
227 |
assert bo1.base_filename == 'test.txt' |
|
228 |
assert bo1.content_type == 'text/plain' |
|
229 |
assert bo1.get_content() == b'foobar' |
|
230 | ||
231 |
# and file is in history, too |
|
232 |
assert formdata.evolution[-1].parts[0].__class__.__name__ == 'AttachmentEvolutionPart' |
|
233 |
attachment = formdata.evolution[-1].parts[0] |
|
234 |
assert attachment.content_type == 'text/plain' |
|
235 |
assert attachment.orig_filename == 'test.txt' |
|
236 | ||
237 | ||
238 |
def test_formdata_attachment_download_to_backoffice_file_field_only(pub): |
|
239 |
create_user(pub) |
|
240 |
wf = Workflow(name='status') |
|
241 |
wf.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(wf) |
|
242 |
wf.backoffice_fields_formdef.fields = [ |
|
243 |
fields.FileField(id='bo1', label='bo field 1', type='file'), |
|
244 |
] |
|
245 |
st1 = wf.add_status('Status1', 'st1') |
|
246 |
attach = AddAttachmentWorkflowStatusItem() |
|
247 |
attach.id = '_attach' |
|
248 |
attach.by = ['_submitter'] |
|
249 |
attach.backoffice_filefield_id = 'bo1' |
|
250 |
attach.attach_to_history = False # store only in backoffice field |
|
251 |
st1.items.append(attach) |
|
252 |
attach.parent = st1 |
|
253 |
wf.store() |
|
254 | ||
255 |
assert attach.get_backoffice_filefield_options() == [('bo1', 'bo field 1', 'bo1')] |
|
256 | ||
257 |
FormDef.wipe() |
|
258 |
formdef = FormDef() |
|
259 |
formdef.name = 'test' |
|
260 |
formdef.workflow_id = wf.id |
|
261 |
formdef.fields = [] |
|
262 |
formdef.store() |
|
263 |
formdef.data_class().wipe() |
|
264 | ||
265 |
resp = login(get_app(pub), username='foo', password='foo').get('/test/') |
|
266 |
resp = resp.forms[0].submit('submit') |
|
267 |
assert 'Check values then click submit.' in resp.text |
|
268 |
resp = resp.forms[0].submit('submit') |
|
269 |
assert resp.status_int == 302 |
|
270 |
resp = resp.follow() |
|
271 |
assert 'The form has been recorded' in resp.text |
|
272 | ||
273 |
resp.forms[0]['attachment_attach$file'] = Upload('test.txt', b'foobar', 'text/plain') |
|
274 |
resp = resp.forms[0].submit('button_attach') |
|
275 | ||
276 |
# backoffice file field is set |
|
277 |
assert formdef.data_class().count() == 1 |
|
278 |
formdata = formdef.data_class().select()[0] |
|
279 |
assert 'bo1' in formdata.data |
|
280 |
bo1 = formdata.data['bo1'] |
|
281 |
assert bo1.base_filename == 'test.txt' |
|
282 |
assert bo1.content_type == 'text/plain' |
|
283 |
assert bo1.get_content() == b'foobar' |
|
284 | ||
285 |
# but nothing in history |
|
286 |
for evo in formdata.evolution: |
|
287 |
assert not evo.parts |
|
288 | ||
289 | ||
290 |
def test_formdata_attachment_file_options(pub): |
|
291 |
create_user(pub) |
|
292 |
wf = Workflow(name='status') |
|
293 |
st1 = wf.add_status('Status1', 'st1') |
|
294 |
attach = AddAttachmentWorkflowStatusItem() |
|
295 |
attach.id = '_attach' |
|
296 |
attach.by = ['_submitter'] |
|
297 |
attach.document_type = {'label': 'Fichiers vidéo', 'mimetypes': ['video/*'], 'id': '_video'} |
|
298 |
attach.max_file_size = '3Mo' |
|
299 | ||
300 |
st1.items.append(attach) |
|
301 |
attach.parent = st1 |
|
302 |
wf.store() |
|
303 | ||
304 |
FormDef.wipe() |
|
305 |
formdef = FormDef() |
|
306 |
formdef.name = 'test' |
|
307 |
formdef.workflow_id = wf.id |
|
308 |
formdef.fields = [] |
|
309 |
formdef.store() |
|
310 |
formdef.data_class().wipe() |
|
311 | ||
312 |
resp = login(get_app(pub), username='foo', password='foo').get('/test/') |
|
313 |
resp = resp.forms[0].submit('submit') |
|
314 |
assert 'Check values then click submit.' in resp.text |
|
315 |
resp = resp.forms[0].submit('submit') |
|
316 |
assert resp.status_int == 302 |
|
317 |
resp = resp.follow() |
|
318 |
assert 'The form has been recorded' in resp.text |
|
319 | ||
320 |
file_input = resp.forms[0]['attachment_attach$file'] |
|
321 |
assert file_input.attrs['accept'] == 'video/*' |
|
322 |
assert file_input.attrs['data-max-file-size'] == '3000000' |
|
323 | ||
324 | ||
325 |
def test_formdata_generated_document_download(pub): |
|
326 |
create_user(pub) |
|
327 |
wf = Workflow(name='status') |
|
328 |
st1 = wf.add_status('Status1', 'st1') |
|
329 |
export_to = ExportToModel() |
|
330 |
export_to.convert_to_pdf = False |
|
331 |
export_to.label = 'create doc' |
|
332 |
upload = QuixoteUpload('/foo/test.rtf', content_type='application/rtf') |
|
333 |
upload.fp = BytesIO() |
|
334 |
upload.fp.write(b'HELLO WORLD') |
|
335 |
upload.fp.seek(0) |
|
336 |
export_to.model_file = UploadedFile(pub.app_dir, None, upload) |
|
337 |
export_to.id = '_export_to' |
|
338 |
export_to.by = ['_submitter'] |
|
339 |
st1.items.append(export_to) |
|
340 |
export_to.parent = st1 |
|
341 |
wf.store() |
|
342 | ||
343 |
FormDef.wipe() |
|
344 |
formdef = FormDef() |
|
345 |
formdef.name = 'test' |
|
346 |
formdef.workflow_id = wf.id |
|
347 |
formdef.fields = [] |
|
348 |
formdef.store() |
|
349 |
formdef.data_class().wipe() |
|
350 | ||
351 |
resp = login(get_app(pub), username='foo', password='foo').get('/test/') |
|
352 |
resp = resp.forms[0].submit('submit') |
|
353 |
assert 'Check values then click submit.' in resp.text |
|
354 |
resp = resp.forms[0].submit('submit') |
|
355 |
assert resp.status_int == 302 |
|
356 |
form_location = resp.location |
|
357 |
resp = resp.follow() |
|
358 |
assert 'The form has been recorded' in resp.text |
|
359 | ||
360 |
resp = resp.form.submit('button_export_to') |
|
361 | ||
362 |
resp = resp.follow() # $form/$id/create_doc |
|
363 |
resp = resp.follow() # $form/$id/create_doc/ |
|
364 |
assert resp.text == 'HELLO WORLD' |
|
365 | ||
366 |
export_to.attach_to_history = True |
|
367 |
wf.store() |
|
368 | ||
369 |
resp = login(get_app(pub), username='foo', password='foo').get(form_location) |
|
370 |
resp = resp.form.submit('button_export_to') |
|
371 |
assert resp.location == form_location + '#action-zone' |
|
372 |
resp = resp.follow() # back to form page |
|
373 | ||
374 |
resp = resp.click('test.rtf') |
|
375 |
assert resp.location.endswith('/test.rtf') |
|
376 |
resp = resp.follow() |
|
377 |
assert resp.content_type == 'application/rtf' |
|
378 |
assert resp.text == 'HELLO WORLD' |
|
379 | ||
380 |
# change export model to now be a RTF file, do the action again on the same form and |
|
381 |
# check that both the old .odt file and the new .rtf file are there and valid. |
|
382 |
upload = QuixoteUpload('/foo/test.rtf', content_type='application/rtf') |
|
383 |
upload.fp = BytesIO() |
|
384 |
upload.fp.write(b'HELLO NEW WORLD') |
|
385 |
upload.fp.seek(0) |
|
386 |
export_to.model_file = UploadedFile(pub.app_dir, None, upload) |
|
387 |
wf.store() |
|
388 | ||
389 |
resp = login(get_app(pub), username='foo', password='foo').get(form_location) |
|
390 |
resp = resp.form.submit('button_export_to') |
|
391 |
assert resp.location == form_location + '#action-zone' |
|
392 |
resp = resp.follow() # back to form page |
|
393 | ||
394 |
assert resp.click('test.rtf', index=0).follow().text == 'HELLO WORLD' |
|
395 |
assert resp.click('test.rtf', index=1).follow().text == 'HELLO NEW WORLD' |
|
396 | ||
397 |
# use substitution variables on rtf: only ezt format is accepted |
|
398 |
upload = QuixoteUpload('/foo/test.rtf', content_type='application/rtf') |
|
399 |
upload.fp = BytesIO() |
|
400 |
upload.fp.write(b'HELLO {{DJANGO}} WORLD [form_name]') |
|
401 |
upload.fp.seek(0) |
|
402 |
export_to.model_file = UploadedFile(pub.app_dir, None, upload) |
|
403 |
wf.store() |
|
404 | ||
405 |
resp = login(get_app(pub), username='foo', password='foo').get(form_location) |
|
406 |
resp = resp.form.submit('button_export_to') |
|
407 |
assert resp.location == form_location + '#action-zone' |
|
408 |
resp = resp.follow() |
|
409 | ||
410 |
assert resp.click('test.rtf', index=2).follow().text == 'HELLO {{DJANGO}} WORLD {\\uc1{test}}' |
|
411 | ||
412 | ||
413 |
@pytest.fixture(params=['template.odt', 'template-django.odt']) |
|
414 |
def odt_template(request): |
|
415 |
return request.param |
|
416 | ||
417 | ||
418 |
def test_formdata_generated_document_odt_download(pub, odt_template): |
|
419 |
create_user(pub) |
|
420 |
wf = Workflow(name='status') |
|
421 |
st1 = wf.add_status('Status1', 'st1') |
|
422 |
export_to = ExportToModel() |
|
423 |
export_to.convert_to_pdf = False |
|
424 |
export_to.label = 'create doc' |
|
425 |
template_filename = os.path.join(os.path.dirname(__file__), '..', odt_template) |
|
426 |
template = open(template_filename, 'rb').read() |
|
427 |
upload = QuixoteUpload('/foo/' + odt_template, content_type='application/octet-stream') |
|
428 |
upload.fp = BytesIO() |
|
429 |
upload.fp.write(template) |
|
430 |
upload.fp.seek(0) |
|
431 |
export_to.model_file = UploadedFile(pub.app_dir, None, upload) |
|
432 |
export_to.id = '_export_to' |
|
433 |
export_to.by = ['_submitter'] |
|
434 |
st1.items.append(export_to) |
|
435 |
export_to.parent = st1 |
|
436 |
wf.store() |
|
437 | ||
438 |
FormDef.wipe() |
|
439 |
formdef = FormDef() |
|
440 |
formdef.name = 'test' |
|
441 |
formdef.workflow_id = wf.id |
|
442 |
formdef.fields = [fields.TextField(id='0', label='comment', type='text', varname='comment')] |
|
443 |
formdef.store() |
|
444 |
formdef.data_class().wipe() |
|
445 | ||
446 |
resp = login(get_app(pub), username='foo', password='foo').get('/test/') |
|
447 |
resp.form['f0'] = 'Hello\n\nWorld.' |
|
448 |
resp = resp.forms[0].submit('submit') |
|
449 |
assert 'Check values then click submit.' in resp.text |
|
450 |
resp = resp.forms[0].submit('submit') |
|
451 |
assert resp.status_int == 302 |
|
452 |
form_location = resp.location |
|
453 |
resp = resp.follow() |
|
454 |
assert 'The form has been recorded' in resp.text |
|
455 | ||
456 |
resp = resp.form.submit('button_export_to') |
|
457 | ||
458 |
resp = resp.follow() # $form/$id/create_doc |
|
459 |
resp = resp.follow() # $form/$id/create_doc/ |
|
460 |
with open(os.path.join(os.path.dirname(__file__), '..', 'template-out.odt'), 'rb') as f: |
|
461 |
assert_equal_zip(BytesIO(resp.body), f) |
|
462 | ||
463 |
resp = login(get_app(pub), username='foo', password='foo').get(form_location) |
|
464 |
resp = resp.form.submit('button_export_to') |
|
465 |
resp = resp.follow() # $form/$id/create_doc |
|
466 |
with mock.patch('wcs.wf.export_to_model.get_formdata_template_context') as get_context_1: |
|
467 |
with mock.patch('wcs.workflows.get_formdata_template_context') as get_context_never: |
|
468 |
get_context_1.return_value = {} |
|
469 |
get_context_never.return_value = {} |
|
470 |
resp = resp.follow() # $form/$id/create_doc/ |
|
471 |
# substitution variables are computed only one : |
|
472 |
assert get_context_1.call_count == 1 |
|
473 |
assert get_context_never.call_count == 0 |
|
474 | ||
475 |
export_to.attach_to_history = True |
|
476 |
wf.store() |
|
477 | ||
478 |
resp = login(get_app(pub), username='foo', password='foo').get(form_location) |
|
479 |
resp = resp.form.submit('button_export_to') |
|
480 |
assert resp.location == form_location + '#action-zone' |
|
481 |
resp = resp.follow() # back to form page |
|
482 | ||
483 |
resp = resp.click(odt_template) |
|
484 |
assert resp.location.endswith('/' + odt_template) |
|
485 |
resp = resp.follow() |
|
486 |
assert resp.content_type == 'application/octet-stream' |
|
487 |
with open(os.path.join(os.path.dirname(__file__), '..', 'template-out.odt'), 'rb') as f: |
|
488 |
assert_equal_zip(BytesIO(resp.body), f) |
|
489 | ||
490 |
# change file content, same name |
|
491 |
upload = QuixoteUpload('/foo/test.rtf', content_type='application/rtf') |
|
492 |
upload.fp = BytesIO() |
|
493 |
upload.fp.write(b'HELLO NEW WORLD') |
|
494 |
upload.fp.seek(0) |
|
495 |
export_to.model_file = UploadedFile(pub.app_dir, None, upload) |
|
496 |
wf.store() |
|
497 | ||
498 |
resp = login(get_app(pub), username='foo', password='foo').get(form_location) |
|
499 |
resp = resp.form.submit('button_export_to') |
|
500 |
assert resp.location == form_location + '#action-zone' |
|
501 |
resp = resp.follow() # back to form page |
|
502 | ||
503 |
with open(os.path.join(os.path.dirname(__file__), '..', 'template-out.odt'), 'rb') as f: |
|
504 |
body = resp.click(odt_template, index=0).follow().body |
|
505 |
assert_equal_zip(BytesIO(body), f) |
|
506 |
assert resp.click('test.rtf', index=0).follow().body == b'HELLO NEW WORLD' |
|
507 | ||
508 | ||
509 |
def test_formdata_generated_document_odt_download_with_substitution_variable(pub): |
|
510 |
create_user_and_admin(pub) |
|
511 |
wf = Workflow(name='status') |
|
512 |
st1 = wf.add_status('Status1', 'st1') |
|
513 |
export_to = ExportToModel() |
|
514 |
export_to.convert_to_pdf = False |
|
515 |
export_to.label = 'create doc' |
|
516 |
export_to.varname = 'created_doc' |
|
517 |
template_filename = os.path.join(os.path.dirname(__file__), '..', 'template.odt') |
|
518 |
template = open(template_filename, 'rb').read() |
|
519 |
upload = QuixoteUpload('/foo/template.odt', content_type='application/octet-stream') |
|
520 |
upload.fp = BytesIO() |
|
521 |
upload.fp.write(template) |
|
522 |
upload.fp.seek(0) |
|
523 |
export_to.model_file = UploadedFile(pub.app_dir, None, upload) |
|
524 |
export_to.id = '_export_to' |
|
525 |
export_to.by = ['_submitter'] |
|
526 |
st1.items.append(export_to) |
|
527 |
export_to.parent = st1 |
|
528 |
wf.store() |
|
529 | ||
530 |
FormDef.wipe() |
|
531 |
formdef = FormDef() |
|
532 |
formdef.name = 'test' |
|
533 |
formdef.workflow_id = wf.id |
|
534 |
formdef.fields = [fields.TextField(id='0', label='comment', type='text', varname='comment')] |
|
535 |
formdef.store() |
|
536 |
formdef.data_class().wipe() |
|
537 | ||
538 |
resp = login(get_app(pub), username='foo', password='foo').get('/test/') |
|
539 |
resp.form['f0'] = 'Hello\n\nWorld.' |
|
540 |
resp = resp.forms[0].submit('submit') |
|
541 |
assert 'Check values then click submit.' in resp.text |
|
542 |
resp = resp.forms[0].submit('submit') |
|
543 |
assert resp.status_int == 302 |
|
544 |
form_location = resp.location |
|
545 |
resp = resp.follow() |
|
546 |
assert 'The form has been recorded' in resp.text |
|
547 | ||
548 |
resp = resp.form.submit('button_export_to') |
|
549 | ||
550 |
resp = resp.follow() # $form/$id/create_doc |
|
551 |
resp = resp.follow() # $form/$id/create_doc/ |
|
552 |
with open(os.path.join(os.path.dirname(__file__), '..', 'template-out.odt'), 'rb') as f: |
|
553 |
assert_equal_zip(BytesIO(resp.body), f) |
|
554 | ||
555 |
export_to.attach_to_history = True |
|
556 |
wf.store() |
|
557 | ||
558 |
resp = login(get_app(pub), username='foo', password='foo').get(form_location) |
|
559 |
resp = resp.form.submit('button_export_to') |
|
560 |
assert resp.location == form_location + '#action-zone' |
|
561 |
resp = resp.follow() # back to form page |
|
562 | ||
563 |
resp = resp.click('template.odt') |
|
564 |
assert resp.location.endswith('/template.odt') |
|
565 |
response1 = resp = resp.follow() |
|
566 |
assert resp.content_type == 'application/octet-stream' |
|
567 |
with open(os.path.join(os.path.dirname(__file__), '..', 'template-out.odt'), 'rb') as f: |
|
568 |
assert_equal_zip(BytesIO(resp.body), f) |
|
569 | ||
570 |
# change file content, same name |
|
571 |
upload = QuixoteUpload('/foo/test.rtf', content_type='application/rtf') |
|
572 |
upload.fp = BytesIO() |
|
573 |
upload.fp.write(b'HELLO NEW WORLD') |
|
574 |
upload.fp.seek(0) |
|
575 |
export_to.model_file = UploadedFile(pub.app_dir, None, upload) |
|
576 |
wf.store() |
|
577 | ||
578 |
resp = login(get_app(pub), username='foo', password='foo').get(form_location) |
|
579 |
resp = resp.form.submit('button_export_to') |
|
580 |
assert resp.location == form_location + '#action-zone' |
|
581 |
resp = resp.follow() # back to form page |
|
582 | ||
583 |
with open(os.path.join(os.path.dirname(__file__), '..', 'template-out.odt'), 'rb') as f: |
|
584 |
body = resp.click('template.odt', index=0).follow().body |
|
585 |
assert_equal_zip(BytesIO(body), f) |
|
586 |
response2 = resp.click('test.rtf', index=0).follow() |
|
587 |
assert response2.body == b'HELLO NEW WORLD' |
|
588 |
# Test attachment substitution variables |
|
589 |
variables = formdef.data_class().select()[0].get_substitution_variables() |
|
590 |
assert 'attachments' in variables |
|
591 |
attachments = variables['attachments'] |
|
592 |
assert attachments is not None |
|
593 |
file1 = attachments.created_doc |
|
594 |
assert file1.content == response2.body |
|
595 |
assert file1.b64_content == base64.b64encode(response2.body) |
|
596 |
assert file1.content_type == response2._headers['content-type'] |
|
597 |
content_disposition = response2._headers['content-disposition'] |
|
598 |
assert len(content_disposition.split(';')) == 2 |
|
599 |
assert content_disposition.split(';')[0] == 'attachment' |
|
600 |
assert response2.request.environ['PATH_INFO'].endswith(file1.filename) |
|
601 | ||
602 |
resp = login(get_app(pub), username='admin', password='admin').get(file1.url).follow() |
|
603 |
assert file1.content == resp.body |
|
604 |
assert file1.b64_content == base64.b64encode(resp.body) |
|
605 |
assert file1.content_type == resp._headers['content-type'] |
|
606 |
content_disposition = resp._headers['content-disposition'] |
|
607 |
assert len(content_disposition.split(';')) == 2 |
|
608 |
assert content_disposition.split(';')[0] == 'attachment' |
|
609 |
assert resp.request.environ['PATH_INFO'].endswith(file1.filename) |
|
610 | ||
611 |
file2 = attachments.created_doc[1] |
|
612 |
assert file2.content == response1.body |
|
613 |
assert file2.b64_content == base64.b64encode(response1.body) |
|
614 |
assert file2.content_type == response1._headers['content-type'] |
|
615 |
content_disposition = response1._headers['content-disposition'] |
|
616 |
assert len(content_disposition.split(';')) == 2 |
|
617 |
assert content_disposition.split(';')[0] == 'attachment' |
|
618 |
assert response1.request.environ['PATH_INFO'].endswith(file2.filename) |
|
619 | ||
620 |
resp = login(get_app(pub), username='admin', password='admin').get(file2.url).follow() |
|
621 |
assert file2.content == resp.body |
|
622 |
assert file2.b64_content == base64.b64encode(resp.body) |
|
623 |
assert file2.content_type == resp._headers['content-type'] |
|
624 |
content_disposition = resp._headers['content-disposition'] |
|
625 |
assert len(content_disposition.split(';')) == 2 |
|
626 |
assert content_disposition.split(';')[0] == 'attachment' |
|
627 |
assert resp.request.environ['PATH_INFO'].endswith(file2.filename) |
|
628 | ||
629 | ||
630 |
@pytest.mark.skipif(transform_to_pdf is None, reason='libreoffice not found') |
|
631 |
def test_formdata_generated_document_odt_to_pdf_download(pub): |
|
632 |
create_user(pub) |
|
633 |
wf = Workflow(name='status') |
|
634 |
st1 = wf.add_status('Status1', 'st1') |
|
635 |
export_to = ExportToModel() |
|
636 |
export_to.label = 'create doc' |
|
637 |
export_to.varname = 'created_doc' |
|
638 |
template_filename = os.path.join(os.path.dirname(__file__), '..', 'template.odt') |
|
639 |
template = open(template_filename, 'rb').read() |
|
640 |
upload = QuixoteUpload('/foo/template.odt', content_type='application/octet-stream') |
|
641 |
upload.fp = BytesIO() |
|
642 |
upload.fp.write(template) |
|
643 |
upload.fp.seek(0) |
|
644 |
export_to.model_file = UploadedFile(pub.app_dir, None, upload) |
|
645 |
export_to.id = '_export_to' |
|
646 |
export_to.by = ['_submitter'] |
|
647 |
export_to.convert_to_pdf = True |
|
648 |
st1.items.append(export_to) |
|
649 |
export_to.parent = st1 |
|
650 |
wf.store() |
|
651 | ||
652 |
FormDef.wipe() |
|
653 |
formdef = FormDef() |
|
654 |
formdef.name = 'test' |
|
655 |
formdef.workflow_id = wf.id |
|
656 |
formdef.fields = [] |
|
657 |
formdef.store() |
|
658 |
formdef.data_class().wipe() |
|
659 | ||
660 |
resp = login(get_app(pub), username='foo', password='foo').get('/test/') |
|
661 |
resp = resp.forms[0].submit('submit') |
|
662 |
assert 'Check values then click submit.' in resp.text |
|
663 |
resp = resp.forms[0].submit('submit') |
|
664 |
assert resp.status_int == 302 |
|
665 |
form_location = resp.location |
|
666 |
resp = resp.follow() |
|
667 |
assert 'The form has been recorded' in resp.text |
|
668 | ||
669 |
resp = resp.form.submit('button_export_to') |
|
670 | ||
671 |
resp = resp.follow() # $form/$id/create_doc |
|
672 |
resp = resp.follow() # $form/$id/create_doc/ |
|
673 |
assert resp.content_type == 'application/pdf' |
|
674 |
assert b'PDF' in resp.body |
|
675 | ||
676 |
export_to.attach_to_history = True |
|
677 |
wf.store() |
|
678 | ||
679 |
resp = login(get_app(pub), username='foo', password='foo').get(form_location) |
|
680 |
resp = resp.form.submit('button_export_to') |
|
681 |
assert resp.location == form_location + '#action-zone' |
|
682 |
resp = resp.follow() # back to form page |
|
683 | ||
684 |
resp = resp.click('template.pdf') |
|
685 |
assert resp.location.endswith('/template.pdf') |
|
686 |
resp = resp.follow() |
|
687 |
assert resp.content_type == 'application/pdf' |
|
688 |
content_disposition = resp._headers['content-disposition'] |
|
689 |
assert len(content_disposition.split(';')) == 2 |
|
690 |
assert content_disposition.split(';')[0] == 'inline' |
|
691 |
assert resp.body.startswith(b'%PDF-') |
|
692 | ||
693 | ||
694 |
@pytest.mark.skipif(transform_to_pdf is None, reason='libreoffice not found') |
|
695 |
def test_formdata_generated_document_odt_to_pdf_download_push_to_portfolio(pub, fargo_url, |
|
696 |
fargo_secret, caplog): |
|
697 |
user = create_user(pub) |
|
698 |
user.name = 'Foo Baré' |
|
699 |
user.store() |
|
700 | ||
701 |
pub.cfg['debug'] = {'logger': True} |
|
702 |
pub.write_cfg() |
|
703 |
wf = Workflow(name='status') |
|
704 |
st1 = wf.add_status('Status1', 'st1') |
|
705 |
export_to = ExportToModel() |
|
706 |
export_to.label = 'create doc' |
|
707 |
export_to.varname = 'created_doc' |
|
708 |
template_filename = os.path.join(os.path.dirname(__file__), '..', 'template.odt') |
|
709 |
template = open(template_filename, 'rb').read() |
|
710 |
upload = QuixoteUpload('/foo/template.odt', content_type='application/octet-stream') |
|
711 |
upload.fp = BytesIO() |
|
712 |
upload.fp.write(template) |
|
713 |
upload.fp.seek(0) |
|
714 |
export_to.model_file = UploadedFile(pub.app_dir, None, upload) |
|
715 |
export_to.id = '_export_to' |
|
716 |
export_to.by = ['_submitter'] |
|
717 |
export_to.convert_to_pdf = True |
|
718 |
export_to.push_to_portfolio = True |
|
719 |
st1.items.append(export_to) |
|
720 |
export_to.parent = st1 |
|
721 |
wf.store() |
|
722 | ||
723 |
FormDef.wipe() |
|
724 |
formdef = FormDef() |
|
725 |
formdef.name = 'test' |
|
726 |
formdef.workflow_id = wf.id |
|
727 |
formdef.fields = [] |
|
728 |
formdef.store() |
|
729 |
formdef.data_class().wipe() |
|
730 | ||
731 |
resp = login(get_app(pub), username='foo', password='foo').get('/test/') |
|
732 |
resp = resp.forms[0].submit('submit') |
|
733 |
assert 'Check values then click submit.' in resp.text |
|
734 |
resp = resp.forms[0].submit('submit') |
|
735 |
assert resp.status_int == 302 |
|
736 |
form_location = resp.location |
|
737 |
resp = resp.follow() |
|
738 |
assert 'The form has been recorded' in resp.text |
|
739 | ||
740 |
with mock.patch('wcs.portfolio.http_post_request') as http_post_request: |
|
741 |
http_post_request.return_value = None, 200, 'null', None |
|
742 |
resp = resp.form.submit('button_export_to') |
|
743 |
assert http_post_request.call_count == 1 |
|
744 |
if locale.getpreferredencoding() == 'UTF-8': |
|
745 |
assert ("file 'template.pdf' pushed to portfolio of 'Foo Baré'" |
|
746 |
== caplog.records[-1].message) |
|
747 |
else: # Python < 3.7 |
|
748 |
assert ("file 'template.pdf' pushed to portfolio of 'Foo Bar\xe9'" |
|
749 |
== caplog.records[-1].message) |
|
750 | ||
751 |
resp = resp.follow() # $form/$id/create_doc |
|
752 |
resp = resp.follow() # $form/$id/create_doc/ |
|
753 |
assert resp.content_type == 'application/pdf' |
|
754 |
assert b'PDF' in resp.body |
|
755 | ||
756 |
resp = login(get_app(pub), username='foo', password='foo').get(form_location) |
|
757 |
with mock.patch('wcs.portfolio.http_post_request') as http_post_request: |
|
758 |
http_post_request.return_value = None, 400, 'null', None # fail |
|
759 |
resp = resp.form.submit('button_export_to') |
|
760 |
assert http_post_request.call_count == 1 |
|
761 |
assert caplog.records[-1].message.startswith("file 'template.pdf' failed to be pushed to portfolio of 'Foo") |
|
762 | ||
763 |
# failed to push to portfolio, but document is here |
|
764 |
resp = resp.follow() # $form/$id/create_doc |
|
765 |
resp = resp.follow() # $form/$id/create_doc/ |
|
766 |
assert resp.content_type == 'application/pdf' |
|
767 |
assert b'PDF' in resp.body |
|
768 | ||
769 |
export_to.attach_to_history = True |
|
770 |
wf.store() |
|
771 | ||
772 |
resp = login(get_app(pub), username='foo', password='foo').get(form_location) |
|
773 |
with mock.patch('wcs.portfolio.http_post_request') as http_post_request: |
|
774 |
http_post_request.return_value = None, 200, 'null', None |
|
775 |
resp = resp.form.submit('button_export_to') |
|
776 |
assert http_post_request.call_count == 1 |
|
777 |
assert http_post_request.call_args[0][0].startswith('http://fargo.example.net/api/documents/push/') |
|
778 |
payload = json.loads(http_post_request.call_args[0][1]) |
|
779 |
assert payload['file_name'] == 'template.pdf' |
|
780 |
assert payload['user_email'] == 'foo@localhost' |
|
781 |
assert payload['origin'] == 'example.net' |
|
782 |
assert base64.decodebytes(force_bytes(payload['file_b64_content'])).startswith(b'%PDF') |
|
783 |
assert caplog.records[-1].message.startswith("file 'template.pdf' pushed to portfolio of 'Foo") |
|
784 |
assert resp.location == form_location + '#action-zone' |
|
785 |
resp = resp.follow() # back to form page |
|
786 | ||
787 |
resp = resp.click('template.pdf') |
|
788 |
assert resp.location.endswith('/template.pdf') |
|
789 |
resp = resp.follow() |
|
790 |
assert resp.content_type == 'application/pdf' |
|
791 |
assert resp.body.startswith(b'%PDF-') |
|
792 | ||
793 | ||
794 |
def test_formdata_generated_document_non_interactive(pub): |
|
795 |
create_user(pub) |
|
796 |
wf = Workflow(name='status') |
|
797 |
st1 = wf.add_status('Status1', 'st1') |
|
798 |
export_to = ExportToModel() |
|
799 |
export_to.convert_to_pdf = False |
|
800 |
export_to.method = 'non-interactive' |
|
801 |
template_filename = os.path.join(os.path.dirname(__file__), '..', 'template.odt') |
|
802 |
template = open(template_filename, 'rb').read() |
|
803 |
upload = QuixoteUpload('/foo/template.odt', content_type='application/octet-stream') |
|
804 |
upload.fp = BytesIO() |
|
805 |
upload.fp.write(template) |
|
806 |
upload.fp.seek(0) |
|
807 |
export_to.model_file = UploadedFile(pub.app_dir, None, upload) |
|
808 |
export_to.id = '_export_to' |
|
809 |
export_to.attach_to_history = True |
|
810 |
st1.items.append(export_to) |
|
811 |
export_to.parent = st1 |
|
812 | ||
813 |
jump = JumpWorkflowStatusItem() |
|
814 |
jump.status = 'st2' |
|
815 |
st1.items.append(jump) |
|
816 |
jump.parent = st1 |
|
817 | ||
818 |
wf.add_status('Status2', 'st2') |
|
819 | ||
820 |
wf.store() |
|
821 | ||
822 |
FormDef.wipe() |
|
823 |
formdef = FormDef() |
|
824 |
formdef.name = 'test' |
|
825 |
formdef.workflow_id = wf.id |
|
826 |
formdef.fields = [fields.TextField(id='0', label='comment', type='text', varname='comment')] |
|
827 |
formdef.store() |
|
828 |
formdef.data_class().wipe() |
|
829 | ||
830 |
resp = login(get_app(pub), username='foo', password='foo').get('/test/') |
|
831 |
resp.form['f0'] = 'Hello\n\nWorld.' |
|
832 |
resp = resp.forms[0].submit('submit') |
|
833 |
assert 'Check values then click submit.' in resp.text |
|
834 |
resp = resp.forms[0].submit('submit') |
|
835 |
assert resp.status_int == 302 |
|
836 |
resp = resp.follow() |
|
837 |
assert 'The form has been recorded' in resp.text |
|
838 | ||
839 |
resp = resp.click('template.odt') |
|
840 |
assert resp.location.endswith('/template.odt') |
|
841 |
resp = resp.follow() |
|
842 |
assert resp.content_type == 'application/octet-stream' |
|
843 |
with open(os.path.join(os.path.dirname(__file__), '..', 'template-out.odt'), 'rb') as f: |
|
844 |
assert_equal_zip(BytesIO(resp.body), f) |
|
845 | ||
846 |
assert formdef.data_class().count() == 1 |
|
847 |
assert formdef.data_class().select()[0].status == 'wf-st2' |
|
848 | ||
849 | ||
850 |
def test_formdata_generated_document_to_backoffice_field(pub): |
|
851 |
create_user_and_admin(pub) |
|
852 |
wf = Workflow(name='status') |
|
853 |
wf.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(wf) |
|
854 |
wf.backoffice_fields_formdef.fields = [ |
|
855 |
fields.FileField(id='bo1', label='bo field 1', type='file'), |
|
856 |
fields.StringField(id='bo2', label='bo field 2', type='string'), |
|
857 |
] |
|
858 | ||
859 |
st1 = wf.add_status('Status1', 'st1') |
|
860 |
export_to = ExportToModel() |
|
861 |
export_to.convert_to_pdf = False |
|
862 |
export_to.method = 'non-interactive' |
|
863 |
template_filename = os.path.join(os.path.dirname(__file__), '..', 'template.odt') |
|
864 |
template = open(template_filename, 'rb').read() |
|
865 |
upload = QuixoteUpload('/foo/template.odt', content_type='application/octet-stream') |
|
866 |
upload.fp = BytesIO() |
|
867 |
upload.fp.write(template) |
|
868 |
upload.fp.seek(0) |
|
869 |
export_to.model_file = UploadedFile(pub.app_dir, None, upload) |
|
870 |
export_to.id = '_export_to' |
|
871 |
export_to.attach_to_history = True |
|
872 |
export_to.backoffice_filefield_id = 'bo1' |
|
873 |
st1.items.append(export_to) |
|
874 |
export_to.parent = st1 |
|
875 | ||
876 |
assert export_to.get_backoffice_filefield_options() == [('bo1', 'bo field 1', 'bo1')] |
|
877 | ||
878 |
jump = JumpWorkflowStatusItem() |
|
879 |
jump.status = 'st2' |
|
880 |
st1.items.append(jump) |
|
881 |
jump.parent = st1 |
|
882 |
wf.add_status('Status2', 'st2') |
|
883 |
wf.store() |
|
884 | ||
885 |
FormDef.wipe() |
|
886 |
formdef = FormDef() |
|
887 |
formdef.name = 'test' |
|
888 |
formdef.workflow_id = wf.id |
|
889 |
formdef.fields = [fields.TextField(id='0', label='comment', type='text', varname='comment')] |
|
890 |
formdef.store() |
|
891 |
formdef.data_class().wipe() |
|
892 | ||
893 |
resp = login(get_app(pub), username='foo', password='foo').get('/test/') |
|
894 |
resp.form['f0'] = 'Hello\n\nWorld.' |
|
895 |
resp = resp.forms[0].submit('submit') |
|
896 |
assert 'Check values then click submit.' in resp.text |
|
897 |
resp = resp.forms[0].submit('submit') |
|
898 |
assert resp.status_int == 302 |
|
899 |
resp = resp.follow() |
|
900 |
assert 'The form has been recorded' in resp.text |
|
901 | ||
902 |
# get the two generated files from backoffice: in backoffice fields |
|
903 |
# (export_to.backoffice_filefield_id), and in history (export_to.attach_to_history) |
|
904 |
for index in (0, 1): |
|
905 |
resp = login(get_app(pub), username='admin', password='admin').get( |
|
906 |
'/backoffice/management/test/1/') |
|
907 |
resp = resp.click('template.odt', index=index) |
|
908 |
assert resp.location.endswith('/template.odt') |
|
909 |
resp = resp.follow() |
|
910 |
assert resp.content_type == 'application/octet-stream' |
|
911 |
with open(os.path.join(os.path.dirname(__file__), '..', 'template-out.odt'), 'rb') as f: |
|
912 |
assert_equal_zip(BytesIO(resp.body), f) |
|
913 | ||
914 |
assert formdef.data_class().count() == 1 |
|
915 |
assert formdef.data_class().select()[0].status == 'wf-st2' |
|
916 | ||
917 | ||
918 |
def test_formdata_generated_document_in_private_history(pub): |
|
919 |
user = create_user(pub) |
|
920 | ||
921 |
Role.wipe() |
|
922 |
role = Role(name='xxx') |
|
923 |
role.store() |
|
924 | ||
925 |
user.roles = [role.id] |
|
926 |
user.store() |
|
927 | ||
928 |
wf = Workflow(name='status') |
|
929 |
st0 = wf.add_status('Status0', 'st0') |
|
930 |
st1 = wf.add_status('Status1', 'st1') |
|
931 |
export_to = ExportToModel() |
|
932 |
export_to.label = 'create doc' |
|
933 |
upload = QuixoteUpload('/foo/test.rtf', content_type='application/rtf') |
|
934 |
upload.fp = BytesIO() |
|
935 |
upload.fp.write(b'HELLO WORLD') |
|
936 |
upload.fp.seek(0) |
|
937 |
export_to.model_file = UploadedFile(pub.app_dir, None, upload) |
|
938 |
export_to.attach_to_history = True |
|
939 |
export_to.id = '_export_to' |
|
940 |
export_to.by = ['_submitter'] |
|
941 |
st1.items.append(export_to) |
|
942 |
export_to.parent = st1 |
|
943 | ||
944 |
st2 = wf.add_status('Status2', 'st2') |
|
945 | ||
946 |
jump1 = ChoiceWorkflowStatusItem() |
|
947 |
jump1.id = '_jump1' |
|
948 |
jump1.label = 'Jump 1' |
|
949 |
jump1.by = ['_receiver'] |
|
950 |
jump1.status = st1.id |
|
951 |
jump1.parent = st0 |
|
952 |
st0.items.append(jump1) |
|
953 | ||
954 |
jump2 = ChoiceWorkflowStatusItem() |
|
955 |
jump2.id = '_jump2' |
|
956 |
jump2.label = 'Jump 2' |
|
957 |
jump2.by = ['_receiver'] |
|
958 |
jump2.status = st2.id |
|
959 |
jump2.parent = st1 |
|
960 |
st1.items.append(jump2) |
|
961 | ||
962 |
wf.store() |
|
963 | ||
964 |
FormDef.wipe() |
|
965 |
formdef = FormDef() |
|
966 |
formdef.name = 'test' |
|
967 |
formdef.workflow_id = wf.id |
|
968 |
formdef.workflow_roles = {'_receiver': role.id} |
|
969 |
formdef.fields = [] |
|
970 |
formdef.store() |
|
971 |
formdef.data_class().wipe() |
|
972 | ||
973 |
resp = login(get_app(pub), username='foo', password='foo').get('/test/') |
|
974 |
resp = resp.forms[0].submit('submit') |
|
975 |
assert 'Check values then click submit.' in resp.text |
|
976 |
resp = resp.forms[0].submit('submit') |
|
977 |
assert resp.status_int == 302 |
|
978 |
resp = resp.follow() |
|
979 |
assert 'The form has been recorded' in resp.text |
|
980 | ||
981 |
resp = resp.form.submit('button_jump1') |
|
982 |
resp = resp.follow() |
|
983 | ||
984 |
resp = resp.form.submit('button_export_to') |
|
985 |
resp = resp.follow() |
|
986 |
assert 'Form exported in a model' in resp.text |
|
987 | ||
988 |
resp = resp.form.submit('button_jump2') |
|
989 |
resp = resp.follow() |
|
990 | ||
991 |
# limit visibility of status with document |
|
992 |
st1.visibility = ['_receiver'] |
|
993 |
wf.store() |
|
994 | ||
995 |
formdata = formdef.data_class().select()[0] |
|
996 |
resp = login(get_app(pub), username='foo', password='foo').get(formdata.get_url()) |
|
997 |
assert 'Form exported in a model' not in resp.text |
|
998 | ||
999 |
# check status is visible in backoffice |
|
1000 |
resp = login(get_app(pub), username='foo', password='foo').get(formdata.get_url(backoffice=True)) |
|
1001 |
assert 'visibility-off' in resp.text |
|
1002 |
assert 'Form exported in a model' in resp.text |
|
1003 | ||
1004 | ||
1005 |
def test_formdata_form_file_download(pub): |
|
1006 |
create_user(pub) |
|
1007 |
wf = Workflow(name='status') |
|
1008 |
st1 = wf.add_status('Status1', 'st1') |
|
1009 | ||
1010 |
display_form = FormWorkflowStatusItem() |
|
1011 |
display_form.id = '_x' |
|
1012 |
display_form.by = ['_submitter'] |
|
1013 |
display_form.varname = 'xxx' |
|
1014 |
display_form.formdef = WorkflowFormFieldsFormDef(item=display_form) |
|
1015 |
display_form.formdef.fields.append(fields.FileField( |
|
1016 |
id='1', label='File', |
|
1017 |
type='file', varname='yyy')) |
|
1018 |
st1.items.append(display_form) |
|
1019 |
display_form.parent = st1 |
|
1020 | ||
1021 |
wf.store() |
|
1022 | ||
1023 |
FormDef.wipe() |
|
1024 |
formdef = FormDef() |
|
1025 |
formdef.name = 'test' |
|
1026 |
formdef.workflow_id = wf.id |
|
1027 |
formdef.fields = [] |
|
1028 |
formdef.store() |
|
1029 |
formdef.data_class().wipe() |
|
1030 | ||
1031 |
resp = login(get_app(pub), username='foo', password='foo').get('/test/') |
|
1032 |
resp = resp.forms[0].submit('submit') |
|
1033 |
assert 'Check values then click submit.' in resp.text |
|
1034 |
resp = resp.forms[0].submit('submit') |
|
1035 |
assert resp.status_int == 302 |
|
1036 |
resp = resp.follow() |
|
1037 |
assert 'The form has been recorded' in resp.text |
|
1038 | ||
1039 |
assert 'qommon.fileupload.js' in resp.text |
|
1040 |
resp.forms[0]['f1$file'] = Upload('test.txt', b'foobar', 'text/plain') |
|
1041 |
resp = resp.forms[0].submit('submit') |
|
1042 | ||
1043 |
assert formdef.data_class().count() == 1 |
|
1044 |
formdata = formdef.data_class().select()[0] |
|
1045 |
assert 'xxx_var_yyy_raw' in formdata.workflow_data |
|
1046 | ||
1047 |
download = resp.test_app.get(urlparse.urljoin(resp.location, 'files/form-xxx-yyy/test.txt')) |
|
1048 |
assert download.content_type == 'text/plain' |
|
1049 |
assert download.body == b'foobar' |
|
1050 | ||
1051 |
# go back to the status page, this will exercise the substitution variables |
|
1052 |
# codepath. |
|
1053 |
resp = resp.follow() |
|
1054 | ||
1055 | ||
1056 |
def test_formdata_workflow_form_prefill(pub): |
|
1057 |
create_user(pub) |
|
1058 |
wf = Workflow(name='status') |
|
1059 |
st1 = wf.add_status('Status1', 'st1') |
|
1060 | ||
1061 |
display_form = FormWorkflowStatusItem() |
|
1062 |
display_form.id = '_x' |
|
1063 |
display_form.by = ['_submitter'] |
|
1064 |
display_form.varname = 'xxx' |
|
1065 |
display_form.formdef = WorkflowFormFieldsFormDef(item=display_form) |
|
1066 |
display_form.formdef.fields.append(fields.StringField( |
|
1067 |
id='1', label='blah', |
|
1068 |
type='string', varname='yyy', prefill={'type': 'user', 'value': 'email'})) |
|
1069 |
st1.items.append(display_form) |
|
1070 |
display_form.parent = st1 |
|
1071 | ||
1072 |
wf.store() |
|
1073 | ||
1074 |
FormDef.wipe() |
|
1075 |
formdef = FormDef() |
|
1076 |
formdef.name = 'test' |
|
1077 |
formdef.workflow_id = wf.id |
|
1078 |
formdef.fields = [] |
|
1079 |
formdef.store() |
|
1080 |
formdef.data_class().wipe() |
|
1081 | ||
1082 |
resp = login(get_app(pub), username='foo', password='foo').get('/test/') |
|
1083 |
resp = resp.forms[0].submit('submit') |
|
1084 |
assert 'Check values then click submit.' in resp.text |
|
1085 |
resp = resp.forms[0].submit('submit') |
|
1086 |
assert resp.status_int == 302 |
|
1087 |
resp = resp.follow() |
|
1088 |
assert 'The form has been recorded' in resp.text |
|
1089 |
assert resp.forms[0]['f1'].value == 'foo@localhost' |
|
1090 | ||
1091 | ||
1092 |
def test_formdata_workflow_form_prefill_conditional_field(pub): |
|
1093 |
create_user(pub) |
|
1094 |
wf = Workflow(name='status') |
|
1095 |
st1 = wf.add_status('Status1', 'st1') |
|
1096 | ||
1097 |
display_form = FormWorkflowStatusItem() |
|
1098 |
display_form.id = '_x' |
|
1099 |
display_form.by = ['_submitter'] |
|
1100 |
display_form.varname = 'xxx' |
|
1101 |
display_form.formdef = WorkflowFormFieldsFormDef(item=display_form) |
|
1102 |
display_form.formdef.fields.append(fields.StringField( |
|
1103 |
id='1', label='blah1', |
|
1104 |
type='string', varname='yyy1', prefill={'type': 'user', 'value': 'email'}, |
|
1105 |
condition={'type': 'django', 'value': '0'})) |
|
1106 |
display_form.formdef.fields.append(fields.StringField( |
|
1107 |
id='2', label='blah2', |
|
1108 |
type='string', varname='yyy2', prefill={'type': 'user', 'value': 'email'}, |
|
1109 |
condition={'type': 'django', 'value': '1'})) |
|
1110 |
st1.items.append(display_form) |
|
1111 |
display_form.parent = st1 |
|
1112 | ||
1113 |
wf.store() |
|
1114 | ||
1115 |
FormDef.wipe() |
|
1116 |
formdef = FormDef() |
|
1117 |
formdef.name = 'test' |
|
1118 |
formdef.workflow_id = wf.id |
|
1119 |
formdef.fields = [] |
|
1120 |
formdef.store() |
|
1121 |
formdef.data_class().wipe() |
|
1122 | ||
1123 |
resp = login(get_app(pub), username='foo', password='foo').get('/test/') |
|
1124 |
resp = resp.forms[0].submit('submit') |
|
1125 |
assert 'Check values then click submit.' in resp.text |
|
1126 |
resp = resp.forms[0].submit('submit') |
|
1127 |
assert resp.status_int == 302 |
|
1128 |
resp = resp.follow() |
|
1129 |
assert 'The form has been recorded' in resp.text |
|
1130 |
assert resp.forms[0]['f2'].value == 'foo@localhost' |
|
1131 | ||
1132 | ||
1133 |
def test_formdata_workflow_form_prefill_checkbox(pub): |
|
1134 |
create_user(pub) |
|
1135 |
wf = Workflow(name='status') |
|
1136 |
st1 = wf.add_status('Status1', 'st1') |
|
1137 | ||
1138 |
display_form = FormWorkflowStatusItem() |
|
1139 |
display_form.id = '_x' |
|
1140 |
display_form.by = ['_submitter'] |
|
1141 |
display_form.varname = 'xxx' |
|
1142 |
display_form.formdef = WorkflowFormFieldsFormDef(item=display_form) |
|
1143 |
display_form.formdef.fields.append(fields.BoolField( |
|
1144 |
id='1', label='blah', |
|
1145 |
type='bool', varname='yyy', prefill={'type': 'formula', 'value': 'True'})) |
|
1146 |
display_form.formdef.fields.append(fields.BoolField( |
|
1147 |
id='2', label='blah2', |
|
1148 |
type='bool', varname='zzz', prefill={'type': 'formula', 'value': 'True'})) |
|
1149 |
st1.items.append(display_form) |
|
1150 |
display_form.parent = st1 |
|
1151 | ||
1152 |
wf.store() |
|
1153 | ||
1154 |
FormDef.wipe() |
|
1155 |
formdef = FormDef() |
|
1156 |
formdef.name = 'test' |
|
1157 |
formdef.workflow_id = wf.id |
|
1158 |
formdef.fields = [] |
|
1159 |
formdef.store() |
|
1160 |
formdef.data_class().wipe() |
|
1161 | ||
1162 |
resp = login(get_app(pub), username='foo', password='foo').get('/test/') |
|
1163 |
resp = resp.form.submit('submit') |
|
1164 |
assert 'Check values then click submit.' in resp |
|
1165 |
resp = resp.form.submit('submit').follow() |
|
1166 |
assert 'The form has been recorded' in resp |
|
1167 |
assert resp.form['f1'].checked is True |
|
1168 |
assert resp.form['f2'].checked is True |
|
1169 |
resp.form['f1'].checked = False |
|
1170 |
resp = resp.form.submit('submit') |
|
1171 | ||
1172 |
formdata = formdef.data_class().select()[0] |
|
1173 |
assert formdata.workflow_data['xxx_var_yyy_raw'] is False |
|
1174 |
assert formdata.workflow_data['xxx_var_zzz_raw'] is True |
|
1175 | ||
1176 | ||
1177 |
def test_formdata_workflow_form_prefill_autocomplete(pub): |
|
1178 |
create_user(pub) |
|
1179 | ||
1180 |
NamedDataSource.wipe() |
|
1181 |
data_source = NamedDataSource(name='foobar') |
|
1182 |
data_source.data_source = {'type': 'json', 'value': 'http://local-mock/test'} |
|
1183 |
data_source.query_parameter = 'q' |
|
1184 |
data_source.id_parameter = 'id' |
|
1185 |
data_source.store() |
|
1186 | ||
1187 |
wf = Workflow(name='status') |
|
1188 |
st1 = wf.add_status('Status1', 'st1') |
|
1189 | ||
1190 |
display_form = FormWorkflowStatusItem() |
|
1191 |
display_form.id = '_x' |
|
1192 |
display_form.by = ['_submitter'] |
|
1193 |
display_form.varname = 'xxx' |
|
1194 |
display_form.formdef = WorkflowFormFieldsFormDef(item=display_form) |
|
1195 |
display_form.formdef.fields = [ |
|
1196 |
fields.ItemField( |
|
1197 |
id='4', label='string', type='item', |
|
1198 |
data_source={'type': 'foobar'}, |
|
1199 |
required=False, |
|
1200 |
display_mode='autocomplete', |
|
1201 |
prefill={'type': 'string', 'value': '{{ form_var_foo_raw }}'}, |
|
1202 |
), |
|
1203 |
] |
|
1204 |
st1.items.append(display_form) |
|
1205 |
display_form.parent = st1 |
|
1206 | ||
1207 |
wf.store() |
|
1208 | ||
1209 |
FormDef.wipe() |
|
1210 |
formdef = FormDef() |
|
1211 |
formdef.name = 'test' |
|
1212 |
formdef.workflow_id = wf.id |
|
1213 |
formdef.fields = [ |
|
1214 |
fields.ItemField( |
|
1215 |
id='0', label='string', type='item', |
|
1216 |
data_source={'type': 'foobar'}, |
|
1217 |
display_mode='autocomplete', |
|
1218 |
required=False, |
|
1219 |
varname='foo', |
|
1220 |
), |
|
1221 |
] |
|
1222 |
formdef.store() |
|
1223 |
formdef.data_class().wipe() |
|
1224 | ||
1225 |
resp = login(get_app(pub), username='foo', password='foo').get('/test/') |
|
1226 |
with mock.patch('wcs.qommon.misc.urlopen') as urlopen: |
|
1227 |
data = {'data': [{'id': '1', 'text': 'hello', 'extra': 'foo'}]} |
|
1228 | ||
1229 |
def side_effect(url, *args): |
|
1230 |
return StringIO(json.dumps(data)) |
|
1231 | ||
1232 |
urlopen.side_effect = side_effect |
|
1233 | ||
1234 |
assert 'data-select2-url=' in resp |
|
1235 |
# simulate select2 |
|
1236 |
resp.form.fields['f0_display'] = Hidden(form=resp.form, tag='input', name='f0_display', pos=10) |
|
1237 |
resp.form['f0'].force_value('1') |
|
1238 |
resp.form.fields['f0_display'].force_value('foo') |
|
1239 |
resp = resp.form.submit('submit') |
|
1240 |
assert 'Check values then click submit.' in resp |
|
1241 |
resp = resp.form.submit('submit').follow() |
|
1242 |
assert 'The form has been recorded' in resp |
|
1243 | ||
1244 |
# check display value is in form action widget |
|
1245 |
assert resp.form['f4'].attrs['data-value'] == '1' |
|
1246 |
assert resp.form['f4'].attrs['data-initial-display-value'] == 'hello' |
|
1247 | ||
1248 |
# check it is also displayed in a fresh session |
|
1249 |
resp = login(get_app(pub), username='foo', password='foo').get(resp.request.url) |
|
1250 |
assert resp.form['f4'].attrs['data-value'] == '1' |
|
1251 |
assert resp.form['f4'].attrs['data-initial-display-value'] == 'hello' |
|
1252 | ||
1253 | ||
1254 |
def test_formdata_named_wscall(http_requests, pub): |
|
1255 |
create_user(pub) |
|
1256 |
NamedWsCall.wipe() |
|
1257 | ||
1258 |
wscall = NamedWsCall() |
|
1259 |
wscall.name = 'Hello world' |
|
1260 |
wscall.request = {'url': 'http://remote.example.net/json'} |
|
1261 |
wscall.store() |
|
1262 |
assert wscall.slug == 'hello_world' |
|
1263 | ||
1264 |
wf = Workflow(name='status') |
|
1265 |
st1 = wf.add_status('Status1', 'st1') |
|
1266 |
comment = RegisterCommenterWorkflowStatusItem() |
|
1267 |
comment.id = '_comment' |
|
1268 |
comment.comment = 'Hello [webservice.hello_world.foo] World' |
|
1269 |
st1.items.append(comment) |
|
1270 |
comment.parent = st1 |
|
1271 | ||
1272 |
display = DisplayMessageWorkflowStatusItem() |
|
1273 |
display.message = 'The form has been recorded and: X[webservice.hello_world.foo]Y' |
|
1274 |
display.to = [] |
|
1275 |
st1.items.append(display) |
|
1276 |
display.parent = st1 |
|
1277 | ||
1278 |
wf.store() |
|
1279 | ||
1280 |
FormDef.wipe() |
|
1281 |
formdef = FormDef() |
|
1282 |
formdef.name = 'test' |
|
1283 |
formdef.workflow_id = wf.id |
|
1284 |
formdef.fields = [] |
|
1285 |
formdef.store() |
|
1286 |
formdef.data_class().wipe() |
|
1287 | ||
1288 |
resp = login(get_app(pub), username='foo', password='foo').get('/test/') |
|
1289 |
resp = resp.forms[0].submit('submit') |
|
1290 |
assert 'Check values then click submit.' in resp.text |
|
1291 |
resp = resp.forms[0].submit('submit') |
|
1292 |
assert resp.status_int == 302 |
|
1293 |
resp = resp.follow() |
|
1294 |
assert 'The form has been recorded and: XbarY' in resp.text |
|
1295 | ||
1296 |
formdata = formdef.data_class().select()[0] |
|
1297 |
assert formdata.evolution[0].parts[0].content == 'Hello bar World' |
|
1298 | ||
1299 |
# check with publisher variable in named webservice call |
|
1300 |
if not pub.site_options.has_section('variables'): |
|
1301 |
pub.site_options.add_section('variables') |
|
1302 |
pub.site_options.set('variables', 'example_url', 'http://remote.example.net/') |
|
1303 |
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd: |
|
1304 |
pub.site_options.write(fd) |
|
1305 | ||
1306 |
wscall = NamedWsCall() |
|
1307 |
wscall.name = 'Hello world' |
|
1308 |
wscall.request = {'url': '[example_url]json'} |
|
1309 |
wscall.store() |
|
1310 | ||
1311 |
formdef.data_class().wipe() |
|
1312 | ||
1313 |
resp = login(get_app(pub), username='foo', password='foo').get('/test/') |
|
1314 |
resp = resp.forms[0].submit('submit') |
|
1315 |
assert 'Check values then click submit.' in resp.text |
|
1316 |
resp = resp.forms[0].submit('submit') |
|
1317 |
assert resp.status_int == 302 |
|
1318 |
resp = resp.follow() |
|
1319 |
assert 'The form has been recorded and: XbarY' in resp.text |
|
1320 | ||
1321 |
formdata = formdef.data_class().select()[0] |
|
1322 |
assert formdata.evolution[0].parts[0].content == 'Hello bar World' |
|
1323 | ||
1324 | ||
1325 |
def test_formdata_named_wscall_in_conditions(http_requests, pub): |
|
1326 |
create_user(pub) |
|
1327 |
NamedWsCall.wipe() |
|
1328 | ||
1329 |
wscall = NamedWsCall() |
|
1330 |
wscall.name = 'Hello world' |
|
1331 |
wscall.request = {'url': 'http://remote.example.net/json', 'method': 'GET'} |
|
1332 |
wscall.store() |
|
1333 |
assert wscall.slug == 'hello_world' |
|
1334 | ||
1335 |
FormDef.wipe() |
|
1336 |
formdef = FormDef() |
|
1337 |
formdef.name = 'test' |
|
1338 |
formdef.fields = [ |
|
1339 |
fields.PageField(id='0', label='1st page', type='page'), |
|
1340 |
fields.PageField( |
|
1341 |
id='1', label='2nd page', type='page', |
|
1342 |
condition={'type': 'python', 'value': 'webservice.hello_world["foo"] == "bar"'}), |
|
1343 |
fields.PageField( |
|
1344 |
id='1', label='3rd page', type='page', |
|
1345 |
condition={'type': 'python', 'value': 'webservice.hello_world["foo"] != "bar"'}), |
|
1346 |
fields.PageField( |
|
1347 |
id='1', label='4th page', type='page', |
|
1348 |
condition={'type': 'python', 'value': 'webservice.hello_world["foo"] == "bar"'}), |
|
1349 |
] |
|
1350 |
formdef.store() |
|
1351 |
formdef.data_class().wipe() |
|
1352 | ||
1353 |
resp = login(get_app(pub), username='foo', password='foo').get('/test/') |
|
1354 |
assert '>1st page<' in resp.text |
|
1355 |
assert '>2nd page<' in resp.text |
|
1356 |
assert '>3rd page<' not in resp.text |
|
1357 |
assert '>4th page<' in resp.text |
|
1358 |
assert len(http_requests.requests) == 1 |
|
0 |
- |