0001-misc-remove-usage-of-six-module-51517.patch
tests/admin_pages/test_datasource.py | ||
---|---|---|
1 | 1 |
# -*- coding: utf-8 -*- |
2 | 2 | |
3 |
import io |
|
3 | 4 |
import json |
4 | 5 |
import mock |
5 | 6 |
import os |
... | ... | |
10 | 11 |
except ImportError: |
11 | 12 |
lasso = None |
12 | 13 | |
13 |
from django.utils.six import StringIO |
|
14 | ||
15 | 14 |
import pytest |
16 | 15 |
from webtest import Upload |
17 | 16 | |
... | ... | |
502 | 501 |
resp = resp.click(href='export') |
503 | 502 |
xml_export = resp.text |
504 | 503 | |
505 |
ds = StringIO(xml_export) |
|
504 |
ds = io.StringIO(xml_export)
|
|
506 | 505 |
data_source2 = NamedDataSource.import_from_xml(ds) |
507 | 506 |
assert data_source2.name == 'foobar' |
508 | 507 |
tests/admin_pages/test_form.py | ||
---|---|---|
1 | 1 |
# -*- coding: utf-8 -*- |
2 | 2 | |
3 | 3 |
import datetime |
4 |
import io |
|
4 | 5 |
import mock |
5 | 6 |
import os |
6 | 7 |
import re |
... | ... | |
11 | 12 |
import pytest |
12 | 13 |
from webtest import Upload |
13 | 14 | |
14 |
from django.utils.six import StringIO, BytesIO |
|
15 | ||
16 | 15 |
from wcs.qommon.http_request import HTTPRequest |
17 | 16 |
from wcs.qommon.errors import ConnectionError |
18 | 17 |
from wcs.categories import Category |
... | ... | |
962 | 961 |
resp = resp.click(href='export') |
963 | 962 |
xml_export = resp.text |
964 | 963 | |
965 |
fd = StringIO(xml_export) |
|
964 |
fd = io.StringIO(xml_export)
|
|
966 | 965 |
formdef2 = FormDef.import_from_xml(fd) |
967 | 966 |
assert formdef2.name == 'form title' |
968 | 967 | |
... | ... | |
1064 | 1063 |
resp.form['url'] = 'http://remote.invalid/test.wcs' |
1065 | 1064 |
resp = resp.form.submit() |
1066 | 1065 |
assert 'Error loading form' in resp |
1067 |
urlopen.side_effect = lambda *args: StringIO(formdef_xml.decode()) |
|
1066 |
urlopen.side_effect = lambda *args: io.StringIO(formdef_xml.decode())
|
|
1068 | 1067 |
resp.form['url'] = 'http://remote.invalid/test.wcs' |
1069 | 1068 |
resp = resp.form.submit() |
1070 | 1069 |
assert FormDef.count() == 1 |
... | ... | |
2368 | 2367 |
resp = resp.click(href='archive') |
2369 | 2368 |
resp = resp.form.submit('submit') |
2370 | 2369 |
assert resp.content_type == 'application/x-wcs-archive' |
2371 |
tf = tarfile.open(fileobj=BytesIO(resp.body)) |
|
2370 |
tf = tarfile.open(fileobj=io.BytesIO(resp.body))
|
|
2372 | 2371 |
assert 'formdef' in [x.name for x in tf.getmembers()] |
2373 | 2372 |
assert len(tf.getmembers()) == 8 # 7 formdata + 1 formdef |
2374 | 2373 | |
... | ... | |
2377 | 2376 |
resp = resp.click(href='archive') |
2378 | 2377 |
resp = resp.form.submit('submit') |
2379 | 2378 |
assert resp.content_type == 'application/x-wcs-archive' |
2380 |
tf = tarfile.open(fileobj=BytesIO(resp.body)) |
|
2379 |
tf = tarfile.open(fileobj=io.BytesIO(resp.body))
|
|
2381 | 2380 |
assert 'formdef' in [x.name for x in tf.getmembers()] |
2382 | 2381 |
assert len(tf.getmembers()) == 1 # 0 formdata + 1 formdef |
2383 | 2382 |
tests/admin_pages/test_settings.py | ||
---|---|---|
1 | 1 |
# -*- coding: utf-8 -*- |
2 | 2 | |
3 |
import io |
|
3 | 4 |
import logging |
4 | 5 |
import os |
5 | 6 |
import zipfile |
... | ... | |
13 | 14 |
from webtest import Upload |
14 | 15 |
import mock |
15 | 16 | |
16 |
from django.utils.six import BytesIO |
|
17 |
from django.utils.six.moves.urllib import parse as urlparse |
|
17 |
import urllib.parse |
|
18 | 18 | |
19 | 19 |
from quixote.http_request import Upload as QuixoteUpload |
20 | 20 | |
... | ... | |
113 | 113 |
resp = app.get('/backoffice/settings/export') |
114 | 114 |
resp = resp.form.submit('submit') |
115 | 115 |
assert resp.location.startswith('http://example.net/backoffice/processing?job=') |
116 |
job_id = urlparse.parse_qs(urlparse.urlparse(resp.location).query)['job'][0]
|
|
116 |
job_id = urllib.parse.parse_qs(urllib.parse.urlparse(resp.location).query)['job'][0]
|
|
117 | 117 |
resp = resp.follow() |
118 | 118 |
assert 'completed' in resp.text |
119 | 119 |
resp = resp.click('Download Export') |
120 |
zip_content = BytesIO(resp.body) |
|
120 |
zip_content = io.BytesIO(resp.body)
|
|
121 | 121 |
zipf = zipfile.ZipFile(zip_content, 'a') |
122 | 122 |
filelist = zipf.namelist() |
123 | 123 |
assert len(filelist) == 0 |
... | ... | |
147 | 147 |
export_to.label = 'test' |
148 | 148 |
upload = QuixoteUpload('/foo/bar', content_type='application/vnd.oasis.opendocument.text') |
149 | 149 |
file_content = b'''PK\x03\x04\x14\x00\x00\x08\x00\x00\'l\x8eG^\xc62\x0c\'\x00''' |
150 |
upload.fp = BytesIO() |
|
150 |
upload.fp = io.BytesIO()
|
|
151 | 151 |
upload.fp.write(file_content) |
152 | 152 |
upload.fp.seek(0) |
153 | 153 |
export_to.model_file = UploadedFile('models', 'export_to_model-1.upload', upload) |
... | ... | |
164 | 164 |
resp = app.get('/backoffice/settings/export') |
165 | 165 |
resp = resp.form.submit('submit') |
166 | 166 |
assert resp.location.startswith('http://example.net/backoffice/processing?job=') |
167 |
job_id = urlparse.parse_qs(urlparse.urlparse(resp.location).query)['job'][0]
|
|
167 |
job_id = urllib.parse.parse_qs(urllib.parse.urlparse(resp.location).query)['job'][0]
|
|
168 | 168 |
resp = resp.follow() |
169 | 169 |
resp = resp.click('Download Export') |
170 |
zip_content = BytesIO(resp.body) |
|
170 |
zip_content = io.BytesIO(resp.body)
|
|
171 | 171 |
zipf = zipfile.ZipFile(zip_content, 'a') |
172 | 172 |
filelist = zipf.namelist() |
173 | 173 |
assert 'formdefs/1' not in filelist |
... | ... | |
243 | 243 |
resp.form['wscalls'] = False |
244 | 244 |
resp = resp.form.submit('submit') |
245 | 245 |
assert resp.location.startswith('http://example.net/backoffice/processing?job=') |
246 |
job_id = urlparse.parse_qs(urlparse.urlparse(resp.location).query)['job'][0]
|
|
246 |
job_id = urllib.parse.parse_qs(urllib.parse.urlparse(resp.location).query)['job'][0]
|
|
247 | 247 |
resp = resp.follow() |
248 | 248 |
resp = resp.click('Download Export') |
249 |
zip_content = BytesIO(resp.body) |
|
249 |
zip_content = io.BytesIO(resp.body)
|
|
250 | 250 |
zipf = zipfile.ZipFile(zip_content, 'a') |
251 | 251 |
filelist = zipf.namelist() |
252 | 252 |
assert 'formdefs_xml/%s' % formdef.id in filelist |
... | ... | |
277 | 277 |
resp = app.get('/backoffice/settings/export') |
278 | 278 |
resp = resp.form.submit('submit') |
279 | 279 |
assert resp.location.startswith('http://example.net/backoffice/processing?job=') |
280 |
job_id = urlparse.parse_qs(urlparse.urlparse(resp.location).query)['job'][0]
|
|
280 |
job_id = urllib.parse.parse_qs(urllib.parse.urlparse(resp.location).query)['job'][0]
|
|
281 | 281 |
resp = resp.follow() |
282 | 282 |
resp = resp.click('Download Export') |
283 |
zip_content = BytesIO(resp.body) |
|
283 |
zip_content = io.BytesIO(resp.body)
|
|
284 | 284 |
zipf = zipfile.ZipFile(zip_content, 'a') |
285 | 285 |
filelist = zipf.namelist() |
286 | 286 |
assert len([x for x in filelist if 'roles/' in x]) == 0 |
... | ... | |
810 | 810 |
resp = app.get('/backoffice/settings/themes') |
811 | 811 |
resp = resp.click('download', index=0) |
812 | 812 |
assert resp.headers['content-type'] == 'application/zip' |
813 |
zip_content = BytesIO(resp.body) |
|
813 |
zip_content = io.BytesIO(resp.body)
|
|
814 | 814 |
zipf = zipfile.ZipFile(zip_content, 'a') |
815 | 815 |
filelist = zipf.namelist() |
816 | 816 |
assert 'alto/icon.png' in filelist |
tests/admin_pages/test_workflow.py | ||
---|---|---|
1 | 1 |
# -*- coding: utf-8 -*- |
2 | 2 | |
3 |
from io import StringIO |
|
3 |
import io |
|
4 | 4 |
import mock |
5 | 5 |
import os |
6 | 6 |
import re |
... | ... | |
358 | 358 |
resp.form['url'] = 'http://remote.invalid/test.wcs' |
359 | 359 |
resp = resp.form.submit() |
360 | 360 |
assert 'Error loading form' in resp |
361 |
urlopen.side_effect = lambda *args: StringIO(wf_export.decode()) |
|
361 |
urlopen.side_effect = lambda *args: io.StringIO(wf_export.decode())
|
|
362 | 362 |
resp.form['url'] = 'http://remote.invalid/test.wcs' |
363 | 363 |
resp = resp.form.submit() |
364 | 364 |
assert Workflow.count() == 2 |
tests/admin_pages/test_wscall.py | ||
---|---|---|
1 | 1 |
# -*- coding: utf-8 -*- |
2 | 2 | |
3 |
import io |
|
3 | 4 |
import xml.etree.ElementTree as ET |
4 | 5 | |
5 |
from django.utils.six import StringIO |
|
6 | ||
7 | 6 |
import pytest |
8 | 7 |
from webtest import Upload |
9 | 8 | |
... | ... | |
159 | 158 |
resp = resp.click(href='export') |
160 | 159 |
xml_export = resp.text |
161 | 160 | |
162 |
ds = StringIO(xml_export) |
|
161 |
ds = io.StringIO(xml_export)
|
|
163 | 162 |
wscall2 = NamedWsCall.import_from_xml(ds) |
164 | 163 |
assert wscall2.name == 'xxx' |
165 | 164 |
tests/backoffice_pages/test_all.py | ||
---|---|---|
1 | 1 |
# -*- coding: utf-8 -*- |
2 | ||
2 | 3 |
import datetime |
4 |
import io |
|
3 | 5 |
import json |
4 | 6 |
import os |
5 | 7 |
import re |
... | ... | |
10 | 12 |
import mock |
11 | 13 |
import pytest |
12 | 14 | |
13 |
from django.utils.six import StringIO, BytesIO |
|
14 | ||
15 | 15 |
from quixote import get_publisher |
16 | 16 |
from quixote.http_request import Upload as QuixoteUpload |
17 | 17 |
from wcs.blocks import BlockDef |
... | ... | |
2082 | 2082 |
formdef.store() |
2083 | 2083 |
resp = app.get('/backoffice/management/form-title/%s/' % number31.id) |
2084 | 2084 |
resp = resp.click('Download all files as .zip') |
2085 |
zip_content = BytesIO(resp.body) |
|
2085 |
zip_content = io.BytesIO(resp.body)
|
|
2086 | 2086 |
zipf = zipfile.ZipFile(zip_content, 'a') |
2087 | 2087 |
filelist = zipf.namelist() |
2088 | 2088 |
assert set(filelist) == {'1_bar', '2_bar'} |
... | ... | |
2725 | 2725 | |
2726 | 2726 |
def side_effect(url, *args): |
2727 | 2727 |
assert '?name_id=admin' in url |
2728 |
return StringIO(json.dumps(data)) |
|
2728 |
return io.StringIO(json.dumps(data))
|
|
2729 | 2729 | |
2730 | 2730 |
urlopen.side_effect = side_effect |
2731 | 2731 | |
... | ... | |
2779 | 2779 | |
2780 | 2780 |
def side_effect(url, *args): |
2781 | 2781 |
assert '?test=foobar' in url |
2782 |
return StringIO(json.dumps(data)) |
|
2782 |
return io.StringIO(json.dumps(data))
|
|
2783 | 2783 | |
2784 | 2784 |
urlopen.side_effect = side_effect |
2785 | 2785 | |
... | ... | |
2835 | 2835 | |
2836 | 2836 |
def side_effect(url, *args): |
2837 | 2837 |
assert '?xxx=FOO BAR 30' in url |
2838 |
return StringIO(json.dumps(data)) |
|
2838 |
return io.StringIO(json.dumps(data))
|
|
2839 | 2839 | |
2840 | 2840 |
urlopen.side_effect = side_effect |
2841 | 2841 | |
... | ... | |
4212 | 4212 | |
4213 | 4213 |
def side_effect(url, *args): |
4214 | 4214 |
if 'toto' not in url: |
4215 |
return StringIO(json.dumps(data1)) |
|
4215 |
return io.StringIO(json.dumps(data1))
|
|
4216 | 4216 |
else: |
4217 |
return StringIO(json.dumps(data2)) |
|
4217 |
return io.StringIO(json.dumps(data2))
|
|
4218 | 4218 | |
4219 | 4219 |
urlopen.side_effect = side_effect |
4220 | 4220 | |
... | ... | |
5560 | 5560 |
export_to.convert_to_pdf = False |
5561 | 5561 |
export_to.label = 'create doc' |
5562 | 5562 |
upload = QuixoteUpload('/foo/test.rtf', content_type='application/rtf') |
5563 |
upload.fp = BytesIO() |
|
5563 |
upload.fp = io.BytesIO()
|
|
5564 | 5564 |
upload.fp.write(b'HELLO WORLD') |
5565 | 5565 |
upload.fp.seek(0) |
5566 | 5566 |
export_to.model_file = UploadedFile(pub.app_dir, None, upload) |
tests/backoffice_pages/test_export.py | ||
---|---|---|
1 | 1 |
# -*- coding: utf-8 -*- |
2 | ||
2 | 3 |
import datetime |
4 |
import io |
|
3 | 5 |
import os |
4 | 6 |
import time |
5 | 7 |
import xml.etree.ElementTree as ET |
... | ... | |
7 | 9 | |
8 | 10 |
import pytest |
9 | 11 | |
10 |
from django.utils.six import BytesIO |
|
11 |
from django.utils.six.moves.urllib import parse as urlparse |
|
12 |
import urllib.parse |
|
12 | 13 | |
13 | 14 |
from wcs.qommon import ods |
14 | 15 |
from wcs.blocks import BlockDef |
... | ... | |
200 | 201 |
resp = app.get('/backoffice/management/form-title/') |
201 | 202 |
resp = resp.click('Export a Spreadsheet') |
202 | 203 |
assert resp.location.startswith('http://example.net/backoffice/processing?job=') |
203 |
job_id = urlparse.parse_qs(urlparse.urlparse(resp.location).query)['job'][0]
|
|
204 |
job_id = urllib.parse.parse_qs(urllib.parse.urlparse(resp.location).query)['job'][0]
|
|
204 | 205 |
resp = resp.follow() |
205 | 206 |
assert 'completed' in resp.text |
206 | 207 |
resp = resp.click('Download Export') |
... | ... | |
451 | 452 |
assert 'filename=form-title.ods' in resp.headers['content-disposition'] |
452 | 453 |
assert resp.body[:2] == b'PK' # ods has a zip container |
453 | 454 | |
454 |
zipf = zipfile.ZipFile(BytesIO(resp.body)) |
|
455 |
zipf = zipfile.ZipFile(io.BytesIO(resp.body))
|
|
455 | 456 |
ods_sheet = ET.parse(zipf.open('content.xml')) |
456 | 457 |
# check the ods contains a link to the document |
457 | 458 |
elem = ods_sheet.findall('.//{%s}a' % ods.NS['text'])[0] |
tests/backoffice_pages/test_submission.py | ||
---|---|---|
5 | 5 |
import time |
6 | 6 | |
7 | 7 |
import pytest |
8 |
from django.utils.six.moves.urllib import parse as urllib |
|
8 |
import urllib.parse |
|
9 | 9 |
from utilities import clean_temporary_pub, create_temporary_pub, get_app, login |
10 | 10 | |
11 | 11 |
from wcs import fields |
... | ... | |
290 | 290 |
def post_formdata(): |
291 | 291 |
signed_url = sign_url( |
292 | 292 |
'http://example.net/api/formdefs/form-title/submit' |
293 |
'?format=json&orig=coucou&email=%s' % urllib.quote(user.email), |
|
293 |
'?format=json&orig=coucou&email=%s' % urllib.parse.quote(user.email),
|
|
294 | 294 |
'1234', |
295 | 295 |
) |
296 | 296 |
url = signed_url[len('http://example.net') :] |
tests/conftest.py | ||
---|---|---|
1 | 1 |
import os |
2 |
from django.utils.six.moves import configparser as ConfigParser
|
|
2 |
import configparser
|
|
3 | 3 | |
4 | 4 |
import pytest |
5 | 5 | |
... | ... | |
7 | 7 | |
8 | 8 | |
9 | 9 |
def site_options(request, pub, section, variable, value): |
10 |
config = ConfigParser.ConfigParser()
|
|
10 |
config = configparser.ConfigParser()
|
|
11 | 11 |
path = os.path.join(pub.app_dir, 'site-options.cfg') |
12 | 12 |
if os.path.exists(path): |
13 | 13 |
config.read([path]) |
... | ... | |
18 | 18 |
config.write(site_option) |
19 | 19 | |
20 | 20 |
def fin(): |
21 |
config = ConfigParser.ConfigParser()
|
|
21 |
config = configparser.ConfigParser()
|
|
22 | 22 |
if os.path.exists(path): |
23 | 23 |
config.read([path]) |
24 | 24 |
config.remove_option(section, variable) |
tests/form_pages/test_all.py | ||
---|---|---|
1 | 1 |
# -*- coding: utf-8 -*- |
2 | 2 | |
3 |
import io |
|
3 | 4 |
import json |
4 | 5 |
import pytest |
5 | 6 |
import hashlib |
6 | 7 |
import os |
7 | 8 |
import re |
8 | 9 |
import time |
10 |
import urllib.parse |
|
9 | 11 |
import zipfile |
12 | ||
10 | 13 |
from webtest import Upload, Hidden, Radio |
11 | 14 |
import mock |
12 | 15 |
import xml.etree.ElementTree as ET |
... | ... | |
16 | 19 |
except ImportError: |
17 | 20 |
Image = None |
18 | 21 | |
19 |
from django.utils.six import StringIO, BytesIO |
|
20 |
from django.utils.six.moves.urllib import parse as urlparse |
|
21 | ||
22 | 22 |
from django.utils.encoding import force_bytes, force_text |
23 | 23 | |
24 | 24 |
from wcs.qommon import force_str |
... | ... | |
2627 | 2627 |
{'id': '2', 'text': 'world', 'extra': 'bar'}, |
2628 | 2628 |
] |
2629 | 2629 |
} |
2630 |
urlopen.side_effect = lambda *args: StringIO(json.dumps(data)) |
|
2630 |
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
|
|
2631 | 2631 |
resp = app.get('/test/') |
2632 | 2632 |
assert 'data-select2-url=' in resp.text |
2633 | 2633 |
# simulate select2 mode, with qommon.forms.js adding an extra hidden widget |
... | ... | |
2771 | 2771 | |
2772 | 2772 |
with mock.patch('wcs.qommon.misc.urlopen') as urlopen: |
2773 | 2773 |
data = {'data': [{'id': '1', 'text': 'un'}, {'id': '2', 'text': 'deux'}]} |
2774 |
urlopen.side_effect = lambda *args: StringIO(json.dumps(data)) |
|
2774 |
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
|
|
2775 | 2775 |
assert submit_item_data_source_field(ds) == {'0': '1', '0_display': 'un'} |
2776 | 2776 | |
2777 | 2777 |
# numeric identifiers |
2778 | 2778 |
with mock.patch('wcs.qommon.misc.urlopen') as urlopen: |
2779 | 2779 |
data = {'data': [{'id': 1, 'text': 'un'}, {'id': 2, 'text': 'deux'}]} |
2780 |
urlopen.side_effect = lambda *args: StringIO(json.dumps(data)) |
|
2780 |
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
|
|
2781 | 2781 |
assert submit_item_data_source_field(ds) == {'0': '1', '0_display': 'un'} |
2782 | 2782 | |
2783 | 2783 | |
... | ... | |
2954 | 2954 | |
2955 | 2955 |
# check it's not set if it's not whitelisted |
2956 | 2956 |
resp = get_app(pub).get('/?session_var_foo=hello') |
2957 |
assert urlparse.urlparse(resp.location).path == '/' |
|
2957 |
assert urllib.parse.urlparse(resp.location).path == '/'
|
|
2958 | 2958 |
resp = resp.follow() |
2959 | 2959 |
resp = resp.click('test') |
2960 | 2960 |
assert resp.forms[0]['f0'].value == '' |
... | ... | |
2967 | 2967 |
) |
2968 | 2968 | |
2969 | 2969 |
resp = get_app(pub).get('/?session_var_foo=hello') |
2970 |
assert urlparse.urlparse(resp.location).path == '/' |
|
2970 |
assert urllib.parse.urlparse(resp.location).path == '/'
|
|
2971 | 2971 |
resp = resp.follow() |
2972 | 2972 |
resp = resp.click('test') |
2973 | 2973 |
assert resp.forms[0]['f0'].value == 'hello' |
2974 | 2974 | |
2975 | 2975 |
# check it survives a login |
2976 | 2976 |
resp = get_app(pub).get('/?session_var_foo=hello2') |
2977 |
assert urlparse.urlparse(resp.location).path == '/' |
|
2977 |
assert urllib.parse.urlparse(resp.location).path == '/'
|
|
2978 | 2978 |
resp = resp.follow() |
2979 | 2979 |
resp = resp.click('Login') |
2980 | 2980 |
resp = resp.follow() |
... | ... | |
2987 | 2987 | |
2988 | 2988 |
# check repeated options are ignored |
2989 | 2989 |
resp = get_app(pub).get('/?session_var_foo=hello&session_var_foo=hello2') |
2990 |
assert urlparse.urlparse(resp.location).path == '/' |
|
2990 |
assert urllib.parse.urlparse(resp.location).path == '/'
|
|
2991 | 2991 |
resp = resp.follow() |
2992 | 2992 |
resp = resp.click('test') |
2993 | 2993 |
assert resp.forms[0]['f0'].value == '' |
2994 | 2994 | |
2995 | 2995 |
# check extra query string parameters are not lost |
2996 | 2996 |
resp = get_app(pub).get('/?session_var_foo=hello&foo=bar') |
2997 |
assert urlparse.urlparse(resp.location).path == '/' |
|
2998 |
assert urlparse.urlparse(resp.location).query == 'foo=bar' |
|
2997 |
assert urllib.parse.urlparse(resp.location).path == '/'
|
|
2998 |
assert urllib.parse.urlparse(resp.location).query == 'foo=bar'
|
|
2999 | 2999 | |
3000 | 3000 |
os.unlink(os.path.join(pub.app_dir, 'site-options.cfg')) |
3001 | 3001 | |
... | ... | |
3156 | 3156 | |
3157 | 3157 |
with mock.patch('wcs.qommon.misc.urlopen') as urlopen: |
3158 | 3158 |
data = {'data': [{'id': '1', 'text': 'hello'}, {'id': '2', 'text': 'world'}]} |
3159 |
urlopen.side_effect = lambda *args: StringIO(json.dumps(data)) |
|
3159 |
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
|
|
3160 | 3160 |
resp = get_app(pub).get('/test/') |
3161 | 3161 |
assert not resp.form['f0$element1'].checked |
3162 | 3162 |
assert resp.form['f0$element2'].checked |
... | ... | |
3358 | 3358 |
fargo_resp = app.get('/fargo/pick') # display file picker |
3359 | 3359 |
assert fargo_resp.location == 'http://fargo.example.net/pick/?pick=http%3A//example.net/fargo/pick' |
3360 | 3360 |
with mock.patch('wcs.portfolio.urlopen') as urlopen: |
3361 |
urlopen.side_effect = lambda *args: BytesIO(b'...') |
|
3361 |
urlopen.side_effect = lambda *args: io.BytesIO(b'...')
|
|
3362 | 3362 |
fargo_resp = app.get('/fargo/pick?url=http://www.example.org/...') |
3363 | 3363 |
assert 'window.top.document.fargo_set_token' in fargo_resp.text |
3364 | 3364 |
resp.form['f0$file'] = None |
... | ... | |
5511 | 5511 | |
5512 | 5512 |
with mock.patch('wcs.qommon.misc.urlopen') as urlopen: |
5513 | 5513 |
data = {'data': [{'id': '1', 'text': 'hello'}, {'id': '2', 'text': 'world'}]} |
5514 |
urlopen.side_effect = lambda *args: StringIO(json.dumps(data)) |
|
5514 |
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
|
|
5515 | 5515 |
resp = get_app(pub).get('/test/') |
5516 | 5516 |
resp.form['f0'] = '1' |
5517 | 5517 |
resp.form['f0'] = '2' |
... | ... | |
5524 | 5524 | |
5525 | 5525 |
with mock.patch('wcs.qommon.misc.urlopen') as urlopen: |
5526 | 5526 |
data = {'data': [{'id': '1', 'text': 'hello', 'disabled': True}, {'id': '2', 'text': 'world'}]} |
5527 |
urlopen.side_effect = lambda *args: StringIO(json.dumps(data)) |
|
5527 |
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
|
|
5528 | 5528 |
resp = get_app(pub).get('/test/') |
5529 | 5529 |
pq = resp.pyquery.remove_namespaces() |
5530 | 5530 |
assert pq('option[disabled=disabled][value="1"]').text() == 'hello' |
... | ... | |
5550 | 5550 | |
5551 | 5551 |
with mock.patch('wcs.qommon.misc.urlopen') as urlopen: |
5552 | 5552 |
data = {'data': [{'id': '1', 'text': 'hello', 'disabled': True}, {'id': '2', 'text': 'world'}]} |
5553 |
urlopen.side_effect = lambda *args: StringIO(json.dumps(data)) |
|
5553 |
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
|
|
5554 | 5554 |
resp = get_app(pub).get('/test/') |
5555 | 5555 |
pq = resp.pyquery.remove_namespaces() |
5556 | 5556 |
assert len(pq('option[disabled=disabled][value="1"]')) == 0 |
... | ... | |
5575 | 5575 | |
5576 | 5576 |
with mock.patch('wcs.qommon.misc.urlopen') as urlopen: |
5577 | 5577 |
data = {'data': [{'id': '1', 'text': 'hello'}, {'id': '2', 'text': 'world'}]} |
5578 |
urlopen.side_effect = lambda *args: StringIO(json.dumps(data)) |
|
5578 |
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
|
|
5579 | 5579 |
resp = get_app(pub).get('/test/') |
5580 | 5580 |
resp.form['f0'] = '1' |
5581 | 5581 |
resp.form['f0'] = '2' |
... | ... | |
5588 | 5588 | |
5589 | 5589 |
with mock.patch('wcs.qommon.misc.urlopen') as urlopen: |
5590 | 5590 |
data = {'data': [{'id': '1', 'text': 'hello', 'disabled': True}, {'id': '2', 'text': 'world'}]} |
5591 |
urlopen.side_effect = lambda *args: StringIO(json.dumps(data)) |
|
5591 |
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
|
|
5592 | 5592 |
resp = get_app(pub).get('/test/') |
5593 | 5593 |
pq = resp.pyquery.remove_namespaces() |
5594 | 5594 |
assert len(pq('input[name="f0"][disabled=disabled][value="1"]')) == 1 |
... | ... | |
5619 | 5619 | |
5620 | 5620 |
with mock.patch('wcs.qommon.misc.urlopen') as urlopen: |
5621 | 5621 |
data = {'data': [{'id': '1', 'text': 'hello'}, {'id': '2', 'text': 'world'}]} |
5622 |
urlopen.side_effect = lambda *args: StringIO(json.dumps(data)) |
|
5622 |
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
|
|
5623 | 5623 |
resp = get_app(pub).get('/test/') |
5624 | 5624 |
resp.form['f0$element1'].checked = True |
5625 | 5625 |
resp.form['f0$element2'].checked = True |
... | ... | |
5632 | 5632 | |
5633 | 5633 |
with mock.patch('wcs.qommon.misc.urlopen') as urlopen: |
5634 | 5634 |
data = {'data': [{'id': '1', 'text': 'hello', 'disabled': True}, {'id': '2', 'text': 'world'}]} |
5635 |
urlopen.side_effect = lambda *args: StringIO(json.dumps(data)) |
|
5635 |
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
|
|
5636 | 5636 |
resp = get_app(pub).get('/test/') |
5637 | 5637 |
assert 'disabled' in resp.form['f0$element1'].attrs |
5638 | 5638 |
resp.form['f0$element1'].checked = True |
... | ... | |
5650 | 5650 | |
5651 | 5651 |
with mock.patch('wcs.qommon.misc.urlopen') as urlopen: |
5652 | 5652 |
data = {'data': [{'id': '1', 'text': 'hello', 'disabled': True}, {'id': '2', 'text': 'world'}]} |
5653 |
urlopen.side_effect = lambda *args: StringIO(json.dumps(data)) |
|
5653 |
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
|
|
5654 | 5654 |
resp = get_app(pub).get('/test/') |
5655 | 5655 |
assert not 'f0$element1' in resp.form.fields |
5656 | 5656 |
resp.form['f0$element2'].checked = True |
... | ... | |
5688 | 5688 |
{'id': '2', 'text': 'world', 'extra': 'bar'}, |
5689 | 5689 |
] |
5690 | 5690 |
} |
5691 |
urlopen.side_effect = lambda *args: StringIO(json.dumps(data)) |
|
5691 |
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
|
|
5692 | 5692 |
resp = get_app(pub).get('/test/') |
5693 | 5693 |
assert 'data-autocomplete="true"' in resp.text |
5694 | 5694 |
assert resp.form['f0'].value == '1' |
... | ... | |
5709 | 5709 |
{'id': '2', 'text': 'world', 'extra': 'bar'}, |
5710 | 5710 |
] |
5711 | 5711 |
} |
5712 |
urlopen.side_effect = lambda *args: StringIO(json.dumps(data)) |
|
5712 |
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
|
|
5713 | 5713 |
resp = get_app(pub).get('/test/') |
5714 | 5714 |
assert 'data-autocomplete="true"' in resp.text |
5715 | 5715 |
assert 'data-hint="help text"' in resp.text |
... | ... | |
5733 | 5733 |
{'id': '2', 'text': 'world', 'extra': 'bar'}, |
5734 | 5734 |
] |
5735 | 5735 |
} |
5736 |
urlopen.side_effect = lambda *args: StringIO(json.dumps(data)) |
|
5736 |
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
|
|
5737 | 5737 |
resp = app.get('/test/') |
5738 | 5738 |
assert urlopen.call_count == 0 |
5739 | 5739 |
pq = resp.pyquery.remove_namespaces() |
... | ... | |
5741 | 5741 | |
5742 | 5742 |
with mock.patch('wcs.qommon.misc.urlopen') as urlopen: |
5743 | 5743 |
data = {'data': [{'id': '1', 'text': 'hello', 'extra': 'foo'}]} |
5744 |
urlopen.side_effect = lambda *args: StringIO(json.dumps(data)) |
|
5744 |
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
|
|
5745 | 5745 |
resp2 = app.get(select2_url + '?q=hell') |
5746 | 5746 |
assert urlopen.call_count == 1 |
5747 | 5747 |
assert urlopen.call_args[0][0] == 'http://remote.example.net/json?q=hell' |
... | ... | |
5757 | 5757 | |
5758 | 5758 |
with mock.patch('wcs.qommon.misc.urlopen') as urlopen: |
5759 | 5759 |
data = {'data': [{'id': '1', 'text': 'hello', 'extra': 'foo'}]} |
5760 |
urlopen.side_effect = lambda *args: StringIO(json.dumps(data)) |
|
5760 |
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
|
|
5761 | 5761 |
resp = resp.form.submit('submit') # -> validation page |
5762 | 5762 |
assert urlopen.call_count == 1 |
5763 | 5763 |
assert urlopen.call_args[0][0] == 'http://remote.example.net/json?id=1' |
... | ... | |
5766 | 5766 | |
5767 | 5767 |
with mock.patch('wcs.qommon.misc.urlopen') as urlopen: |
5768 | 5768 |
data = {'data': [{'id': '1', 'text': 'hello', 'extra': 'foo'}]} |
5769 |
urlopen.side_effect = lambda *args: StringIO(json.dumps(data)) |
|
5769 |
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
|
|
5770 | 5770 |
resp = resp.form.submit('submit') # -> submit |
5771 | 5771 |
assert urlopen.call_count == 1 |
5772 | 5772 |
assert urlopen.call_args[0][0] == 'http://remote.example.net/json?id=1' |
... | ... | |
5784 | 5784 |
data = { |
5785 | 5785 |
'data': [{'id': 1, 'text': 'hello', 'extra': 'foo'}, {'id': 2, 'text': 'world', 'extra': 'bar'}] |
5786 | 5786 |
} |
5787 |
urlopen.side_effect = lambda *args: StringIO(json.dumps(data)) |
|
5787 |
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
|
|
5788 | 5788 |
resp = app.get('/test/') |
5789 | 5789 |
assert urlopen.call_count == 0 |
5790 | 5790 |
pq = resp.pyquery.remove_namespaces() |
... | ... | |
5792 | 5792 | |
5793 | 5793 |
with mock.patch('wcs.qommon.misc.urlopen') as urlopen: |
5794 | 5794 |
data = {'data': [{'id': 1, 'text': 'hello', 'extra': 'foo'}]} |
5795 |
urlopen.side_effect = lambda *args: StringIO(json.dumps(data)) |
|
5795 |
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
|
|
5796 | 5796 |
resp2 = app.get(select2_url + '?q=hell') |
5797 | 5797 |
assert urlopen.call_count == 1 |
5798 | 5798 |
assert urlopen.call_args[0][0] == 'http://remote.example.net/json-numeric-id?q=hell' |
... | ... | |
5808 | 5808 | |
5809 | 5809 |
with mock.patch('wcs.qommon.misc.urlopen') as urlopen: |
5810 | 5810 |
data = {'data': [{'id': 1, 'text': 'hello', 'extra': 'foo'}]} |
5811 |
urlopen.side_effect = lambda *args: StringIO(json.dumps(data)) |
|
5811 |
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
|
|
5812 | 5812 |
resp = resp.form.submit('submit') # -> validation page |
5813 | 5813 |
assert urlopen.call_count == 1 |
5814 | 5814 |
assert urlopen.call_args[0][0] == 'http://remote.example.net/json-numeric-id?id=1' |
... | ... | |
5817 | 5817 | |
5818 | 5818 |
with mock.patch('wcs.qommon.misc.urlopen') as urlopen: |
5819 | 5819 |
data = {'data': [{'id': 1, 'text': 'hello', 'extra': 'foo'}]} |
5820 |
urlopen.side_effect = lambda *args: StringIO(json.dumps(data)) |
|
5820 |
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
|
|
5821 | 5821 |
resp = resp.form.submit('submit') # -> submit |
5822 | 5822 |
assert urlopen.call_count == 1 |
5823 | 5823 |
assert urlopen.call_args[0][0] == 'http://remote.example.net/json-numeric-id?id=1' |
... | ... | |
5844 | 5844 |
{'id': '2', 'text': 'world', 'extra': 'bar'}, |
5845 | 5845 |
] |
5846 | 5846 |
} |
5847 |
urlopen.side_effect = lambda *args: StringIO(json.dumps(data)) |
|
5847 |
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
|
|
5848 | 5848 |
resp = app.get('/test/') |
5849 | 5849 |
assert urlopen.call_count == 0 |
5850 | 5850 |
pq = resp.pyquery.remove_namespaces() |
... | ... | |
5852 | 5852 | |
5853 | 5853 |
with mock.patch('wcs.qommon.misc.urlopen') as urlopen: |
5854 | 5854 |
data = {'data': [{'id': '1', 'text': 'hello', 'extra': 'foo'}]} |
5855 |
urlopen.side_effect = lambda *args: StringIO(json.dumps(data)) |
|
5855 |
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
|
|
5856 | 5856 |
resp2 = app.get(select2_url + '?q=hell') |
5857 | 5857 |
assert urlopen.call_count == 1 |
5858 | 5858 |
assert urlopen.call_args[0][0].startswith('http://remote.example.net/json?q=hell&orig=example.net&') |
... | ... | |
5865 | 5865 | |
5866 | 5866 |
with mock.patch('wcs.qommon.misc.urlopen') as urlopen: |
5867 | 5867 |
data = {'data': [{'id': '1', 'text': 'hello', 'extra': 'foo'}]} |
5868 |
urlopen.side_effect = lambda *args: StringIO(json.dumps(data)) |
|
5868 |
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
|
|
5869 | 5869 |
resp = resp.form.submit('submit') # -> validation page |
5870 | 5870 |
assert urlopen.call_count == 1 |
5871 | 5871 |
assert urlopen.call_args[0][0].startswith('http://remote.example.net/json?id=1&orig=example.net&') |
... | ... | |
5874 | 5874 | |
5875 | 5875 |
with mock.patch('wcs.qommon.misc.urlopen') as urlopen: |
5876 | 5876 |
data = {'data': [{'id': '1', 'text': 'hello', 'extra': 'foo'}]} |
5877 |
urlopen.side_effect = lambda *args: StringIO(json.dumps(data)) |
|
5877 |
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
|
|
5878 | 5878 |
resp = resp.form.submit('submit') # -> submit |
5879 | 5879 |
assert urlopen.call_count == 1 |
5880 | 5880 |
assert urlopen.call_args[0][0].startswith('http://remote.example.net/json?id=1&orig=example.net&') |
... | ... | |
5895 | 5895 |
{'id': '2', 'text': 'world', 'extra': 'bar'}, |
5896 | 5896 |
] |
5897 | 5897 |
} |
5898 |
urlopen.side_effect = lambda *args: StringIO(json.dumps(data)) |
|
5898 |
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
|
|
5899 | 5899 |
resp = app.get('/test/') |
5900 | 5900 |
pq = resp.pyquery.remove_namespaces() |
5901 | 5901 |
select2_url = pq('select').attr['data-select2-url'] |
... | ... | |
5919 | 5919 |
{'id': '2', 'text': 'world', 'extra': 'bar'}, |
5920 | 5920 |
] |
5921 | 5921 |
} |
5922 |
urlopen.side_effect = lambda *args: StringIO(json.dumps(data)) |
|
5922 |
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
|
|
5923 | 5923 |
resp = app.get('/test/') |
5924 | 5924 |
assert urlopen.call_count == 0 |
5925 | 5925 |
pq = resp.pyquery.remove_namespaces() |
... | ... | |
5927 | 5927 | |
5928 | 5928 |
with mock.patch('wcs.qommon.misc.urlopen') as urlopen: |
5929 | 5929 |
data = {'data': [{'id': '1', 'text': 'hello', 'extra': 'foo'}]} |
5930 |
urlopen.side_effect = lambda *args: StringIO(json.dumps(data)) |
|
5930 |
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
|
|
5931 | 5931 |
resp2 = app.get(select2_url + '?q=hell', status=403) |
5932 | 5932 |
assert urlopen.call_count == 0 |
5933 | 5933 | |
... | ... | |
8332 | 8332 | |
8333 | 8333 |
with mock.patch('wcs.qommon.misc.urlopen') as urlopen: |
8334 | 8334 |
data = {'data': create_formdata['data']} |
8335 |
urlopen.side_effect = lambda *args: StringIO(json.dumps(data)) |
|
8335 |
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
|
|
8336 | 8336 | |
8337 | 8337 |
app = get_app(create_formdata['pub']) |
8338 | 8338 |
resp = app.get('/source-form/') |
... | ... | |
8710 | 8710 |
{"id": "3", "datetime": "2021-01-14 10:40:00", "text": "event 3"}, |
8711 | 8711 |
] |
8712 | 8712 |
} |
8713 |
urlopen.side_effect = lambda *args: StringIO(json.dumps(data)) |
|
8713 |
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
|
|
8714 | 8714 | |
8715 | 8715 |
resp = app.get('/test/') |
8716 | 8716 |
assert 'data-date="2021-01-12"' in resp and 'data-time="10:00"' in resp |
... | ... | |
8764 | 8764 |
{"id": "3", "datetime": "2021-01-14 10:40:00", "text": "event 3"}, |
8765 | 8765 |
] |
8766 | 8766 |
} |
8767 |
urlopen.side_effect = lambda *args: StringIO(json.dumps(data)) |
|
8767 |
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
|
|
8768 | 8768 | |
8769 | 8769 |
resp = app.get('/test/') |
8770 | 8770 |
resp.form['f2'] = '2021-01-14' |
tests/form_pages/test_block.py | ||
---|---|---|
1 | 1 |
# -*- coding: utf-8 -*- |
2 | ||
3 |
import io |
|
2 | 4 |
import json |
3 | 5 |
import mock |
4 | 6 | |
5 |
from django.utils.six import StringIO |
|
6 | ||
7 | 7 |
import pytest |
8 | 8 |
from webtest import Upload |
9 | 9 | |
... | ... | |
1255 | 1255 |
payload = [{'id': '1', 'text': 'foo'}] |
1256 | 1256 |
elif query == 'bar': |
1257 | 1257 |
payload = [{'id': '2', 'text': 'bar'}] |
1258 |
return StringIO(json.dumps({'data': payload})) |
|
1258 |
return io.StringIO(json.dumps({'data': payload}))
|
|
1259 | 1259 | |
1260 | 1260 |
mock_urlopen.side_effect = lambda url: data_source(url) |
1261 | 1261 |
tests/form_pages/test_formdata.py | ||
---|---|---|
1 | 1 |
# -*- coding: utf-8 -*- |
2 | ||
2 | 3 |
import base64 |
4 |
import io |
|
3 | 5 |
import json |
4 | 6 |
import locale |
5 | 7 |
import os |
... | ... | |
9 | 11 |
import mock |
10 | 12 |
import pytest |
11 | 13 |
from django.utils.encoding import force_bytes |
12 |
from django.utils.six import BytesIO, StringIO |
|
13 |
from django.utils.six.moves.urllib import parse as urlparse |
|
14 |
import urllib.parse |
|
14 | 15 |
from quixote.http_request import Upload as QuixoteUpload |
15 | 16 |
from webtest import Upload, Hidden |
16 | 17 | |
... | ... | |
332 | 333 |
export_to.convert_to_pdf = False |
333 | 334 |
export_to.label = 'create doc' |
334 | 335 |
upload = QuixoteUpload('/foo/test.rtf', content_type='application/rtf') |
335 |
upload.fp = BytesIO() |
|
336 |
upload.fp = io.BytesIO()
|
|
336 | 337 |
upload.fp.write(b'HELLO WORLD') |
337 | 338 |
upload.fp.seek(0) |
338 | 339 |
export_to.model_file = UploadedFile(pub.app_dir, None, upload) |
... | ... | |
382 | 383 |
# change export model to now be a RTF file, do the action again on the same form and |
383 | 384 |
# check that both the old .odt file and the new .rtf file are there and valid. |
384 | 385 |
upload = QuixoteUpload('/foo/test.rtf', content_type='application/rtf') |
385 |
upload.fp = BytesIO() |
|
386 |
upload.fp = io.BytesIO()
|
|
386 | 387 |
upload.fp.write(b'HELLO NEW WORLD') |
387 | 388 |
upload.fp.seek(0) |
388 | 389 |
export_to.model_file = UploadedFile(pub.app_dir, None, upload) |
... | ... | |
398 | 399 | |
399 | 400 |
# use substitution variables on rtf: only ezt format is accepted |
400 | 401 |
upload = QuixoteUpload('/foo/test.rtf', content_type='application/rtf') |
401 |
upload.fp = BytesIO() |
|
402 |
upload.fp = io.BytesIO()
|
|
402 | 403 |
upload.fp.write(b'HELLO {{DJANGO}} WORLD [form_name]') |
403 | 404 |
upload.fp.seek(0) |
404 | 405 |
export_to.model_file = UploadedFile(pub.app_dir, None, upload) |
... | ... | |
427 | 428 |
template_filename = os.path.join(os.path.dirname(__file__), '..', odt_template) |
428 | 429 |
template = open(template_filename, 'rb').read() |
429 | 430 |
upload = QuixoteUpload('/foo/' + odt_template, content_type='application/octet-stream') |
430 |
upload.fp = BytesIO() |
|
431 |
upload.fp = io.BytesIO()
|
|
431 | 432 |
upload.fp.write(template) |
432 | 433 |
upload.fp.seek(0) |
433 | 434 |
export_to.model_file = UploadedFile(pub.app_dir, None, upload) |
... | ... | |
460 | 461 |
resp = resp.follow() # $form/$id/create_doc |
461 | 462 |
resp = resp.follow() # $form/$id/create_doc/ |
462 | 463 |
with open(os.path.join(os.path.dirname(__file__), '..', 'template-out.odt'), 'rb') as f: |
463 |
assert_equal_zip(BytesIO(resp.body), f) |
|
464 |
assert_equal_zip(io.BytesIO(resp.body), f)
|
|
464 | 465 | |
465 | 466 |
resp = login(get_app(pub), username='foo', password='foo').get(form_location) |
466 | 467 |
resp = resp.form.submit('button_export_to') |
... | ... | |
487 | 488 |
resp = resp.follow() |
488 | 489 |
assert resp.content_type == 'application/octet-stream' |
489 | 490 |
with open(os.path.join(os.path.dirname(__file__), '..', 'template-out.odt'), 'rb') as f: |
490 |
assert_equal_zip(BytesIO(resp.body), f) |
|
491 |
assert_equal_zip(io.BytesIO(resp.body), f)
|
|
491 | 492 | |
492 | 493 |
# change file content, same name |
493 | 494 |
upload = QuixoteUpload('/foo/test.rtf', content_type='application/rtf') |
494 |
upload.fp = BytesIO() |
|
495 |
upload.fp = io.BytesIO()
|
|
495 | 496 |
upload.fp.write(b'HELLO NEW WORLD') |
496 | 497 |
upload.fp.seek(0) |
497 | 498 |
export_to.model_file = UploadedFile(pub.app_dir, None, upload) |
... | ... | |
504 | 505 | |
505 | 506 |
with open(os.path.join(os.path.dirname(__file__), '..', 'template-out.odt'), 'rb') as f: |
506 | 507 |
body = resp.click(odt_template, index=0).follow().body |
507 |
assert_equal_zip(BytesIO(body), f) |
|
508 |
assert_equal_zip(io.BytesIO(body), f)
|
|
508 | 509 |
assert resp.click('test.rtf', index=0).follow().body == b'HELLO NEW WORLD' |
509 | 510 | |
510 | 511 | |
... | ... | |
519 | 520 |
template_filename = os.path.join(os.path.dirname(__file__), '..', 'template.odt') |
520 | 521 |
template = open(template_filename, 'rb').read() |
521 | 522 |
upload = QuixoteUpload('/foo/template.odt', content_type='application/octet-stream') |
522 |
upload.fp = BytesIO() |
|
523 |
upload.fp = io.BytesIO()
|
|
523 | 524 |
upload.fp.write(template) |
524 | 525 |
upload.fp.seek(0) |
525 | 526 |
export_to.model_file = UploadedFile(pub.app_dir, None, upload) |
... | ... | |
552 | 553 |
resp = resp.follow() # $form/$id/create_doc |
553 | 554 |
resp = resp.follow() # $form/$id/create_doc/ |
554 | 555 |
with open(os.path.join(os.path.dirname(__file__), '..', 'template-out.odt'), 'rb') as f: |
555 |
assert_equal_zip(BytesIO(resp.body), f) |
|
556 |
assert_equal_zip(io.BytesIO(resp.body), f)
|
|
556 | 557 | |
557 | 558 |
export_to.attach_to_history = True |
558 | 559 |
wf.store() |
... | ... | |
567 | 568 |
response1 = resp = resp.follow() |
568 | 569 |
assert resp.content_type == 'application/octet-stream' |
569 | 570 |
with open(os.path.join(os.path.dirname(__file__), '..', 'template-out.odt'), 'rb') as f: |
570 |
assert_equal_zip(BytesIO(resp.body), f) |
|
571 |
assert_equal_zip(io.BytesIO(resp.body), f)
|
|
571 | 572 | |
572 | 573 |
# change file content, same name |
573 | 574 |
upload = QuixoteUpload('/foo/test.rtf', content_type='application/rtf') |
574 |
upload.fp = BytesIO() |
|
575 |
upload.fp = io.BytesIO()
|
|
575 | 576 |
upload.fp.write(b'HELLO NEW WORLD') |
576 | 577 |
upload.fp.seek(0) |
577 | 578 |
export_to.model_file = UploadedFile(pub.app_dir, None, upload) |
... | ... | |
584 | 585 | |
585 | 586 |
with open(os.path.join(os.path.dirname(__file__), '..', 'template-out.odt'), 'rb') as f: |
586 | 587 |
body = resp.click('template.odt', index=0).follow().body |
587 |
assert_equal_zip(BytesIO(body), f) |
|
588 |
assert_equal_zip(io.BytesIO(body), f)
|
|
588 | 589 |
response2 = resp.click('test.rtf', index=0).follow() |
589 | 590 |
assert response2.body == b'HELLO NEW WORLD' |
590 | 591 |
# Test attachment substitution variables |
... | ... | |
640 | 641 |
template_filename = os.path.join(os.path.dirname(__file__), '..', 'template.odt') |
641 | 642 |
template = open(template_filename, 'rb').read() |
642 | 643 |
upload = QuixoteUpload('/foo/template.odt', content_type='application/octet-stream') |
643 |
upload.fp = BytesIO() |
|
644 |
upload.fp = io.BytesIO()
|
|
644 | 645 |
upload.fp.write(template) |
645 | 646 |
upload.fp.seek(0) |
646 | 647 |
export_to.model_file = UploadedFile(pub.app_dir, None, upload) |
... | ... | |
711 | 712 |
template_filename = os.path.join(os.path.dirname(__file__), '..', 'template.odt') |
712 | 713 |
template = open(template_filename, 'rb').read() |
713 | 714 |
upload = QuixoteUpload('/foo/template.odt', content_type='application/octet-stream') |
714 |
upload.fp = BytesIO() |
|
715 |
upload.fp = io.BytesIO()
|
|
715 | 716 |
upload.fp.write(template) |
716 | 717 |
upload.fp.seek(0) |
717 | 718 |
export_to.model_file = UploadedFile(pub.app_dir, None, upload) |
... | ... | |
804 | 805 |
template_filename = os.path.join(os.path.dirname(__file__), '..', 'template.odt') |
805 | 806 |
template = open(template_filename, 'rb').read() |
806 | 807 |
upload = QuixoteUpload('/foo/template.odt', content_type='application/octet-stream') |
807 |
upload.fp = BytesIO() |
|
808 |
upload.fp = io.BytesIO()
|
|
808 | 809 |
upload.fp.write(template) |
809 | 810 |
upload.fp.seek(0) |
810 | 811 |
export_to.model_file = UploadedFile(pub.app_dir, None, upload) |
... | ... | |
844 | 845 |
resp = resp.follow() |
845 | 846 |
assert resp.content_type == 'application/octet-stream' |
846 | 847 |
with open(os.path.join(os.path.dirname(__file__), '..', 'template-out.odt'), 'rb') as f: |
847 |
assert_equal_zip(BytesIO(resp.body), f) |
|
848 |
assert_equal_zip(io.BytesIO(resp.body), f)
|
|
848 | 849 | |
849 | 850 |
assert formdef.data_class().count() == 1 |
850 | 851 |
assert formdef.data_class().select()[0].status == 'wf-st2' |
... | ... | |
866 | 867 |
template_filename = os.path.join(os.path.dirname(__file__), '..', 'template.odt') |
867 | 868 |
template = open(template_filename, 'rb').read() |
868 | 869 |
upload = QuixoteUpload('/foo/template.odt', content_type='application/octet-stream') |
869 |
upload.fp = BytesIO() |
|
870 |
upload.fp = io.BytesIO()
|
|
870 | 871 |
upload.fp.write(template) |
871 | 872 |
upload.fp.seek(0) |
872 | 873 |
export_to.model_file = UploadedFile(pub.app_dir, None, upload) |
... | ... | |
911 | 912 |
resp = resp.follow() |
912 | 913 |
assert resp.content_type == 'application/octet-stream' |
913 | 914 |
with open(os.path.join(os.path.dirname(__file__), '..', 'template-out.odt'), 'rb') as f: |
914 |
assert_equal_zip(BytesIO(resp.body), f) |
|
915 |
assert_equal_zip(io.BytesIO(resp.body), f)
|
|
915 | 916 | |
916 | 917 |
assert formdef.data_class().count() == 1 |
917 | 918 |
assert formdef.data_class().select()[0].status == 'wf-st2' |
... | ... | |
933 | 934 |
export_to = ExportToModel() |
934 | 935 |
export_to.label = 'create doc' |
935 | 936 |
upload = QuixoteUpload('/foo/test.rtf', content_type='application/rtf') |
936 |
upload.fp = BytesIO() |
|
937 |
upload.fp = io.BytesIO()
|
|
937 | 938 |
upload.fp.write(b'HELLO WORLD') |
938 | 939 |
upload.fp.seek(0) |
939 | 940 |
export_to.model_file = UploadedFile(pub.app_dir, None, upload) |
... | ... | |
1087 | 1088 |
formdata = formdef.data_class().select()[0] |
1088 | 1089 |
assert 'xxx_var_yyy_raw' in formdata.workflow_data |
1089 | 1090 | |
1090 |
download = resp.test_app.get(urlparse.urljoin(resp.location, 'files/form-xxx-yyy/test.txt')) |
|
1091 |
download = resp.test_app.get(urllib.parse.urljoin(resp.location, 'files/form-xxx-yyy/test.txt'))
|
|
1091 | 1092 |
assert download.content_type == 'text/plain' |
1092 | 1093 |
assert download.body == b'foobar' |
1093 | 1094 | |
... | ... | |
1292 | 1293 |
data = {'data': [{'id': '1', 'text': 'hello', 'extra': 'foo'}]} |
1293 | 1294 | |
1294 | 1295 |
def side_effect(url, *args): |
1295 |
return StringIO(json.dumps(data)) |
|
1296 |
return io.StringIO(json.dumps(data))
|
|
1296 | 1297 | |
1297 | 1298 |
urlopen.side_effect = side_effect |
1298 | 1299 |
tests/test_api.py | ||
---|---|---|
1 | 1 |
# -*- coding: utf-8 -*- |
2 | 2 | |
3 | 3 |
import pytest |
4 |
import io |
|
4 | 5 |
import json |
5 | 6 |
import shutil |
6 | 7 |
import os |
... | ... | |
13 | 14 |
import time |
14 | 15 |
import json |
15 | 16 |
import sys |
17 |
import urllib.parse |
|
16 | 18 |
import xml.etree.ElementTree as ET |
17 | 19 |
import zipfile |
18 | 20 | |
19 | 21 |
from django.utils.encoding import force_bytes, force_text |
20 |
from django.utils.six import StringIO, BytesIO |
|
21 |
from django.utils.six.moves.urllib import parse as urllib |
|
22 |
from django.utils.six.moves.urllib import parse as urlparse |
|
23 | 22 | |
24 | 23 |
from quixote import cleanup, get_publisher |
25 | 24 |
from wcs.qommon.http_request import HTTPRequest |
... | ... | |
109 | 108 | |
110 | 109 |
def sign_uri(uri, user=None, format='json'): |
111 | 110 |
timestamp = datetime.datetime.utcnow().isoformat()[:19] + 'Z' |
112 |
scheme, netloc, path, params, query, fragment = urlparse.urlparse(uri) |
|
111 |
scheme, netloc, path, params, query, fragment = urllib.parse.urlparse(uri)
|
|
113 | 112 |
if query: |
114 | 113 |
query += '&' |
115 | 114 |
if format: |
116 | 115 |
query += 'format=%s&' % format |
117 | 116 |
query += 'orig=coucou&algo=sha256×tamp=' + timestamp |
118 | 117 |
if user: |
119 |
query += '&email=' + urllib.quote(user.email) |
|
120 |
query += '&signature=%s' % urllib.quote( |
|
118 |
query += '&email=' + urllib.parse.quote(user.email)
|
|
119 |
query += '&signature=%s' % urllib.parse.quote(
|
|
121 | 120 |
base64.b64encode(hmac.new(b'1234', force_bytes(query), hashlib.sha256).digest()) |
122 | 121 |
) |
123 |
return urlparse.urlunparse((scheme, netloc, path, params, query, fragment)) |
|
122 |
return urllib.parse.urlunparse((scheme, netloc, path, params, query, fragment))
|
|
124 | 123 | |
125 | 124 | |
126 | 125 |
def test_user_page_redirect(pub): |
... | ... | |
169 | 168 | |
170 | 169 | |
171 | 170 |
def test_get_user_from_api_query_string_error_missing_timestamp(pub): |
172 |
signature = urllib.quote( |
|
171 |
signature = urllib.parse.quote(
|
|
173 | 172 |
base64.b64encode(hmac.new(b'1234', b'format=json&orig=coucou&algo=sha1', hashlib.sha1).digest()) |
174 | 173 |
) |
175 | 174 |
output = get_app(pub).get( |
... | ... | |
181 | 180 |
def test_get_user_from_api_query_string_error_missing_email(pub): |
182 | 181 |
timestamp = datetime.datetime.utcnow().isoformat()[:19] + 'Z' |
183 | 182 |
query = 'format=json&orig=coucou&algo=sha1×tamp=' + timestamp |
184 |
signature = urllib.quote(base64.b64encode(hmac.new(b'1234', force_bytes(query), hashlib.sha1).digest())) |
|
183 |
signature = urllib.parse.quote( |
|
184 |
base64.b64encode(hmac.new(b'1234', force_bytes(query), hashlib.sha1).digest()) |
|
185 |
) |
|
185 | 186 |
output = get_app(pub).get('/api/user/?%s&signature=%s' % (query, signature), status=403) |
186 | 187 |
assert output.json['err_desc'] == 'no user specified' |
187 | 188 | |
... | ... | |
189 | 190 |
def test_get_user_from_api_query_string_error_unknown_nameid(pub): |
190 | 191 |
timestamp = datetime.datetime.utcnow().isoformat()[:19] + 'Z' |
191 | 192 |
query = 'format=json&orig=coucou&algo=sha1&NameID=xxx×tamp=' + timestamp |
192 |
signature = urllib.quote(base64.b64encode(hmac.new(b'1234', force_bytes(query), hashlib.sha1).digest())) |
|
193 |
signature = urllib.parse.quote( |
|
194 |
base64.b64encode(hmac.new(b'1234', force_bytes(query), hashlib.sha1).digest()) |
|
195 |
) |
|
193 | 196 |
output = get_app(pub).get('/api/user/?%s&signature=%s' % (query, signature), status=403) |
194 | 197 |
assert output.json['err_desc'] == 'unknown NameID' |
195 | 198 | |
... | ... | |
199 | 202 |
# works fine without user. |
200 | 203 |
timestamp = datetime.datetime.utcnow().isoformat()[:19] + 'Z' |
201 | 204 |
query = 'format=json&orig=coucou&algo=sha1×tamp=' + timestamp |
202 |
signature = urllib.quote(base64.b64encode(hmac.new(b'1234', force_bytes(query), hashlib.sha1).digest())) |
|
205 |
signature = urllib.parse.quote( |
|
206 |
base64.b64encode(hmac.new(b'1234', force_bytes(query), hashlib.sha1).digest()) |
|
207 |
) |
|
203 | 208 |
output = get_app(pub).get('/categories?%s&signature=%s' % (query, signature)) |
204 | 209 |
assert output.json == {'data': []} |
205 | 210 |
output = get_app(pub).get('/json?%s&signature=%s' % (query, signature)) |
... | ... | |
210 | 215 |
# check the categories and forms endpoints accept an unknown NameID |
211 | 216 |
timestamp = datetime.datetime.utcnow().isoformat()[:19] + 'Z' |
212 | 217 |
query = 'format=json&NameID=xxx&orig=coucou&algo=sha1×tamp=' + timestamp |
213 |
signature = urllib.quote(base64.b64encode(hmac.new(b'1234', force_bytes(query), hashlib.sha1).digest())) |
|
218 |
signature = urllib.parse.quote( |
|
219 |
base64.b64encode(hmac.new(b'1234', force_bytes(query), hashlib.sha1).digest()) |
|
220 |
) |
|
214 | 221 |
output = get_app(pub).get('/categories?%s&signature=%s' % (query, signature)) |
215 | 222 |
assert output.json == {'data': []} |
216 | 223 |
output = get_app(pub).get('/json?%s&signature=%s' % (query, signature)) |
... | ... | |
221 | 228 |
timestamp = datetime.datetime.utcnow().isoformat()[:19] + 'Z' |
222 | 229 |
query = ( |
223 | 230 |
'format=json&orig=coucou&algo=sha1&email=' |
224 |
+ urllib.quote(local_user.email) |
|
231 |
+ urllib.parse.quote(local_user.email)
|
|
225 | 232 |
+ '×tamp=' |
226 | 233 |
+ timestamp |
227 | 234 |
) |
228 |
signature = urllib.quote(base64.b64encode(hmac.new(b'1234', force_bytes(query), hashlib.sha1).digest())) |
|
235 |
signature = urllib.parse.quote( |
|
236 |
base64.b64encode(hmac.new(b'1234', force_bytes(query), hashlib.sha1).digest()) |
|
237 |
) |
|
229 | 238 |
output = get_app(pub).get('/api/user/?%s&signature=%s' % (query, signature)) |
230 | 239 |
assert output.json['user_display_name'] == u'Jean Darmette' |
231 | 240 | |
... | ... | |
234 | 243 |
timestamp = datetime.datetime.utcnow().isoformat()[:19] + 'Z' |
235 | 244 |
query = ( |
236 | 245 |
'format=json&orig=coucou&algo=sha256&email=' |
237 |
+ urllib.quote(local_user.email) |
|
246 |
+ urllib.parse.quote(local_user.email)
|
|
238 | 247 |
+ '×tamp=' |
239 | 248 |
+ timestamp |
240 | 249 |
) |
241 |
signature = urllib.quote(base64.b64encode(hmac.new(b'1234', force_bytes(query), hashlib.sha1).digest())) |
|
250 |
signature = urllib.parse.quote( |
|
251 |
base64.b64encode(hmac.new(b'1234', force_bytes(query), hashlib.sha1).digest()) |
|
252 |
) |
|
242 | 253 |
output = get_app(pub).get('/api/user/?%s&signature=%s' % (query, signature), status=403) |
243 | 254 |
assert output.json['err_desc'] == 'invalid signature' |
244 | 255 | |
... | ... | |
247 | 258 |
timestamp = datetime.datetime.utcnow().isoformat()[:19] + 'Z' |
248 | 259 |
query = ( |
249 | 260 |
'format=json&orig=coucou&algo=sha256&email=' |
250 |
+ urllib.quote(local_user.email) |
|
261 |
+ urllib.parse.quote(local_user.email)
|
|
251 | 262 |
+ '×tamp=' |
252 | 263 |
+ timestamp |
253 | 264 |
) |
254 |
signature = urllib.quote(base64.b64encode(hmac.new(b'1234', force_bytes(query), hashlib.sha256).digest())) |
|
265 |
signature = urllib.parse.quote( |
|
266 |
base64.b64encode(hmac.new(b'1234', force_bytes(query), hashlib.sha256).digest()) |
|
267 |
) |
|
255 | 268 |
output = get_app(pub).get('/api/user/?%s&signature=%s' % (query, signature)) |
256 | 269 |
assert output.json['user_display_name'] == u'Jean Darmette' |
257 | 270 | |
258 | 271 | |
259 | 272 |
def test_sign_url(pub, local_user): |
260 | 273 |
signed_url = sign_url( |
261 |
'http://example.net/api/user/?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email), |
|
274 |
'http://example.net/api/user/?format=json&orig=coucou&email=%s' |
|
275 |
% urllib.parse.quote(local_user.email), |
|
262 | 276 |
'1234', |
263 | 277 |
) |
264 | 278 |
url = signed_url[len('http://example.net') :] |
... | ... | |
269 | 283 |
get_app(pub).get('%s&foo=bar' % url, status=403) |
270 | 284 | |
271 | 285 |
signed_url = sign_url( |
272 |
'http://example.net/api/user/?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email), |
|
286 |
'http://example.net/api/user/?format=json&orig=coucou&email=%s' |
|
287 |
% urllib.parse.quote(local_user.email), |
|
273 | 288 |
'12345', |
274 | 289 |
) |
275 | 290 |
url = signed_url[len('http://example.net') :] |
... | ... | |
283 | 298 |
local_user.roles = [role.id] |
284 | 299 |
local_user.store() |
285 | 300 |
signed_url = sign_url( |
286 |
'http://example.net/api/user/?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email), |
|
301 |
'http://example.net/api/user/?format=json&orig=coucou&email=%s' |
|
302 |
% urllib.parse.quote(local_user.email), |
|
287 | 303 |
'1234', |
288 | 304 |
) |
289 | 305 |
url = signed_url[len('http://example.net') :] |
... | ... | |
308 | 324 |
local_user.store() |
309 | 325 |
signed_url = sign_url( |
310 | 326 |
'http://example.net/api/user/?format=json&orig=UNKNOWN_ACCESS&email=%s' |
311 |
% (urllib.quote(local_user.email)), |
|
327 |
% (urllib.parse.quote(local_user.email)),
|
|
312 | 328 |
'5678', |
313 | 329 |
) |
314 | 330 |
url = signed_url[len('http://example.net') :] |
... | ... | |
316 | 332 |
assert output.json['err_desc'] == 'invalid orig' |
317 | 333 | |
318 | 334 |
signed_url = sign_url( |
319 |
'http://example.net/api/user/?format=json&orig=salut&email=%s' % (urllib.quote(local_user.email)), |
|
335 |
'http://example.net/api/user/?format=json&orig=salut&email=%s' |
|
336 |
% (urllib.parse.quote(local_user.email)), |
|
320 | 337 |
'5678', |
321 | 338 |
) |
322 | 339 |
url = signed_url[len('http://example.net') :] |
... | ... | |
337 | 354 |
pub.clean_nonces(now=0) |
338 | 355 |
nonce_dir = os.path.join(pub.app_dir, 'nonces') |
339 | 356 |
assert not os.path.exists(nonce_dir) or not os.listdir(nonce_dir) |
340 |
signed_url = sign_url('?format=json&orig=%s&email=%s' % (ORIG, urllib.quote(local_user.email)), KEY) |
|
357 |
signed_url = sign_url('?format=json&orig=%s&email=%s' % (ORIG, urllib.parse.quote(local_user.email)), KEY)
|
|
341 | 358 |
req = HTTPRequest( |
342 | 359 |
None, {'SCRIPT_NAME': '/', 'SERVER_NAME': 'example.net', 'QUERY_STRING': signed_url[1:]} |
343 | 360 |
) |
... | ... | |
367 | 384 | |
368 | 385 |
def test_get_user_compat_endpoint(pub, local_user): |
369 | 386 |
signed_url = sign_url( |
370 |
'http://example.net/user?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email), '1234' |
|
387 |
'http://example.net/user?format=json&orig=coucou&email=%s' % urllib.parse.quote(local_user.email), |
|
388 |
'1234', |
|
371 | 389 |
) |
372 | 390 |
url = signed_url[len('http://example.net') :] |
373 | 391 |
output = get_app(pub).get(url) |
... | ... | |
659 | 677 |
formdef.store() |
660 | 678 | |
661 | 679 |
with mock.patch('wcs.qommon.misc.urlopen') as urlopen: |
662 |
urlopen.side_effect = lambda *args: StringIO( |
|
680 |
urlopen.side_effect = lambda *args: io.StringIO(
|
|
663 | 681 |
'''\ |
664 | 682 |
{"data": [{"id": 0, "text": "zéro", "foo": "bar"}, \ |
665 | 683 |
{"id": 1, "text": "uné", "foo": "bar1"}, \ |
... | ... | |
772 | 790 |
def url(): |
773 | 791 |
signed_url = sign_url( |
774 | 792 |
'http://example.net/api/formdefs/test/submit' |
775 |
+ '?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email), |
|
793 |
+ '?format=json&orig=coucou&email=%s' % urllib.parse.quote(local_user.email),
|
|
776 | 794 |
'1234', |
777 | 795 |
) |
778 | 796 |
return signed_url[len('http://example.net') :] |
... | ... | |
867 | 885 |
def url(): |
868 | 886 |
signed_url = sign_url( |
869 | 887 |
'http://example.net/api/formdefs/test/submit' |
870 |
+ '?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email), |
|
888 |
+ '?format=json&orig=coucou&email=%s' % urllib.parse.quote(local_user.email),
|
|
871 | 889 |
'1234', |
872 | 890 |
) |
873 | 891 |
return signed_url[len('http://example.net') :] |
... | ... | |
928 | 946 | |
929 | 947 |
signed_url = sign_url( |
930 | 948 |
'http://example.net/api/formdefs/test/submit' |
931 |
+ '?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email), |
|
949 |
+ '?format=json&orig=coucou&email=%s' % urllib.parse.quote(local_user.email),
|
|
932 | 950 |
'1234', |
933 | 951 |
) |
934 | 952 |
url = signed_url[len('http://example.net') :] |
... | ... | |
2349 | 2367 | |
2350 | 2368 |
resp = get_app(pub).get(sign_uri('/api/forms/test/ods', user=local_user)) |
2351 | 2369 |
assert resp.content_type == 'application/vnd.oasis.opendocument.spreadsheet' |
2352 |
zipf = zipfile.ZipFile(BytesIO(resp.body)) |
|
2370 |
zipf = zipfile.ZipFile(io.BytesIO(resp.body))
|
|
2353 | 2371 |
ods_sheet = ET.parse(zipf.open('content.xml')) |
2354 | 2372 |
assert len(ods_sheet.findall('.//{%s}table-row' % ods.NS['table'])) == 311 |
2355 | 2373 | |
... | ... | |
2539 | 2557 | |
2540 | 2558 |
# check it now gets the data |
2541 | 2559 |
resp = get_app(pub).get(sign_uri('/api/forms/test/ods', user=local_user)) |
2542 |
zipf = zipfile.ZipFile(BytesIO(resp.body)) |
|
2560 |
zipf = zipfile.ZipFile(io.BytesIO(resp.body))
|
|
2543 | 2561 |
ods_sheet = ET.parse(zipf.open('content.xml')) |
2544 | 2562 |
assert len(ods_sheet.findall('.//{%s}table-row' % ods.NS['table'])) == 11 |
2545 | 2563 | |
... | ... | |
2553 | 2571 |
custom_view.store() |
2554 | 2572 | |
2555 | 2573 |
resp = get_app(pub).get(sign_uri('/api/forms/test/ods/custom-view', user=local_user)) |
2556 |
zipf = zipfile.ZipFile(BytesIO(resp.body)) |
|
2574 |
zipf = zipfile.ZipFile(io.BytesIO(resp.body))
|
|
2557 | 2575 |
ods_sheet = ET.parse(zipf.open('content.xml')) |
2558 | 2576 |
assert len(ods_sheet.findall('.//{%s}table-row' % ods.NS['table'])) == 21 |
2559 | 2577 | |
... | ... | |
3525 | 3543 | |
3526 | 3544 |
def test_reverse_geocoding(pub): |
3527 | 3545 |
with mock.patch('wcs.qommon.misc.urlopen') as urlopen: |
3528 |
urlopen.side_effect = lambda *args: StringIO(json.dumps({'address': 'xxx'})) |
|
3546 |
urlopen.side_effect = lambda *args: io.StringIO(json.dumps({'address': 'xxx'}))
|
|
3529 | 3547 |
get_app(pub).get('/api/reverse-geocoding', status=400) |
3530 | 3548 |
resp = get_app(pub).get('/api/reverse-geocoding?lat=0&lon=0') |
3531 | 3549 |
assert resp.content_type == 'application/json' |
... | ... | |
3599 | 3617 |
def url(): |
3600 | 3618 |
signed_url = sign_url( |
3601 | 3619 |
'http://example.net/api/formdefs/test/submit' |
3602 |
'?format=json&orig=coucou&email=%s' % urllib.quote(local_user.email), |
|
3620 |
'?format=json&orig=coucou&email=%s' % urllib.parse.quote(local_user.email),
|
|
3603 | 3621 |
'1234', |
3604 | 3622 |
) |
3605 | 3623 |
return signed_url[len('http://example.net') :] |
3606 | 3624 | |
3607 | 3625 |
with mock.patch('wcs.qommon.misc.urlopen') as urlopen: |
3608 |
urlopen.side_effect = lambda *args: StringIO( |
|
3626 |
urlopen.side_effect = lambda *args: io.StringIO(
|
|
3609 | 3627 |
'''\ |
3610 | 3628 |
{"data": [{"id": 0, "text": "zéro", "foo": "bar"}, \ |
3611 | 3629 |
{"id": 1, "text": "uné", "foo": "bar1"}, \ |
... | ... | |
3643 | 3661 | |
3644 | 3662 |
def test_geocoding(pub): |
3645 | 3663 |
with mock.patch('wcs.qommon.misc.urlopen') as urlopen: |
3646 |
urlopen.side_effect = lambda *args: StringIO(json.dumps([{'lat': 0, 'lon': 0}])) |
|
3664 |
urlopen.side_effect = lambda *args: io.StringIO(json.dumps([{'lat': 0, 'lon': 0}]))
|
|
3647 | 3665 |
get_app(pub).get('/api/geocoding', status=400) |
3648 | 3666 |
resp = get_app(pub).get('/api/geocoding?q=test') |
3649 | 3667 |
assert resp.content_type == 'application/json' |
tests/test_carddef.py | ||
---|---|---|
1 |
import io |
|
1 | 2 |
import pytest |
2 | 3 |
import xml.etree.ElementTree as ET |
3 | 4 | |
4 |
from django.utils.six import BytesIO |
|
5 | ||
6 | 5 |
from wcs.qommon.http_request import HTTPRequest |
7 | 6 |
from wcs.qommon.misc import indent_xml as indent |
8 | 7 |
from wcs.qommon.template import Template |
... | ... | |
141 | 140 |
carddef.data_class().wipe() |
142 | 141 |
pub.custom_view_class.wipe() |
143 | 142 | |
144 |
carddef2 = CardDef.import_from_xml(BytesIO(ET.tostring(carddef_xml))) |
|
143 |
carddef2 = CardDef.import_from_xml(io.BytesIO(ET.tostring(carddef_xml)))
|
|
145 | 144 |
assert carddef2.name == 'foo' |
146 | 145 |
assert carddef2.fields[1].data_source == {'type': 'carddef:foo'} |
147 | 146 |
assert carddef2._custom_views |
tests/test_categories.py | ||
---|---|---|
1 |
import io |
|
1 | 2 |
import os |
2 | 3 |
import pickle |
3 | 4 |
import shutil |
4 | 5 | |
5 | 6 |
import pytest |
6 | 7 | |
7 |
from django.utils.six import BytesIO |
|
8 | 8 |
from quixote import cleanup |
9 | 9 | |
10 | 10 |
from wcs.categories import Category, CardDefCategory |
... | ... | |
108 | 108 |
test.store() |
109 | 109 |
test = category_class.get(1) |
110 | 110 | |
111 |
fd = BytesIO(test.export_to_xml_string(include_id=True)) |
|
111 |
fd = io.BytesIO(test.export_to_xml_string(include_id=True))
|
|
112 | 112 |
test2 = category_class.import_from_xml(fd, include_id=True) |
113 | 113 |
assert test.id == test2.id |
114 | 114 |
assert test.name == test2.name |
tests/test_datasource.py | ||
---|---|---|
1 | 1 |
# -*- coding: utf-8 -*- |
2 | 2 | |
3 | 3 |
import codecs |
4 |
import io |
|
4 | 5 |
import pytest |
5 | 6 |
import os |
6 | 7 |
import json |
7 | 8 |
import shutil |
8 | 9 | |
9 |
from django.utils.six import StringIO |
|
10 |
from django.utils.six.moves.urllib import parse as urlparse |
|
10 |
import urllib.parse |
|
11 | 11 | |
12 | 12 |
from quixote import cleanup |
13 | 13 |
from wcs.qommon.http_request import HTTPRequest |
... | ... | |
889 | 889 |
data_source2 = NamedDataSource.select()[0] |
890 | 890 |
assert data_source2.data_source == data_source.data_source |
891 | 891 |
with mock.patch('wcs.qommon.misc.urlopen') as urlopen: |
892 |
urlopen.side_effect = lambda *args: StringIO( |
|
892 |
urlopen.side_effect = lambda *args: io.StringIO(
|
|
893 | 893 |
'{"data": [{"id": 0, "text": "zéro"}, {"id": 1, "text": "uné"}, {"id": 2, "text": "deux"}]}' |
894 | 894 |
) |
895 | 895 |
assert data_sources.get_items({'type': 'foobar'}) == [ |
... | ... | |
906 | 906 |
data_source.store() |
907 | 907 | |
908 | 908 |
with mock.patch('wcs.qommon.misc.urlopen') as urlopen: |
909 |
urlopen.side_effect = lambda *args: StringIO('{"data": [{"id": 0, "text": "zero"}]}') |
|
909 |
urlopen.side_effect = lambda *args: io.StringIO('{"data": [{"id": 0, "text": "zero"}]}')
|
|
910 | 910 |
assert len(data_sources.get_items({'type': 'foobar'})) == 1 |
911 | 911 |
signed_url = urlopen.call_args[0][0] |
912 | 912 |
assert signed_url.startswith('https://api.example.com/json?') |
913 |
parsed = urlparse.urlparse(signed_url) |
|
914 |
querystring = urlparse.parse_qs(parsed.query) |
|
913 |
parsed = urllib.parse.urlparse(signed_url)
|
|
914 |
querystring = urllib.parse.parse_qs(parsed.query)
|
|
915 | 915 |
# stupid simple (but sufficient) signature test: |
916 | 916 |
assert querystring['algo'] == ['sha256'] |
917 | 917 |
assert querystring['orig'] == ['example.net'] |
... | ... | |
922 | 922 |
data_source.data_source = {'type': 'json', 'value': "https://api.example.com/json?foo=bar"} |
923 | 923 |
data_source.store() |
924 | 924 |
with mock.patch('wcs.qommon.misc.urlopen') as urlopen: |
925 |
urlopen.side_effect = lambda *args: StringIO('{"data": [{"id": 0, "text": "zero"}]}') |
|
925 |
urlopen.side_effect = lambda *args: io.StringIO('{"data": [{"id": 0, "text": "zero"}]}')
|
|
926 | 926 |
assert len(data_sources.get_items({'type': 'foobar'})) == 1 |
927 | 927 |
signed_url = urlopen.call_args[0][0] |
928 | 928 |
assert signed_url.startswith('https://api.example.com/json?') |
929 |
parsed = urlparse.urlparse(signed_url) |
|
930 |
querystring = urlparse.parse_qs(parsed.query) |
|
929 |
parsed = urllib.parse.urlparse(signed_url)
|
|
930 |
querystring = urllib.parse.parse_qs(parsed.query)
|
|
931 | 931 |
assert querystring['algo'] == ['sha256'] |
932 | 932 |
assert querystring['orig'] == ['example.net'] |
933 | 933 |
assert querystring['nonce'][0] |
... | ... | |
938 | 938 |
data_source.data_source = {'type': 'json', 'value': "https://no-secret.example.com/json"} |
939 | 939 |
data_source.store() |
940 | 940 |
with mock.patch('wcs.qommon.misc.urlopen') as urlopen: |
941 |
urlopen.side_effect = lambda *args: StringIO('{"data": [{"id": 0, "text": "zero"}]}') |
|
941 |
urlopen.side_effect = lambda *args: io.StringIO('{"data": [{"id": 0, "text": "zero"}]}')
|
|
942 | 942 |
assert len(data_sources.get_items({'type': 'foobar'})) == 1 |
943 | 943 |
unsigned_url = urlopen.call_args[0][0] |
944 | 944 |
assert unsigned_url == 'https://no-secret.example.com/json' |
... | ... | |
951 | 951 |
datasource.store() |
952 | 952 | |
953 | 953 |
with mock.patch('wcs.qommon.misc.urlopen') as urlopen: |
954 |
urlopen.side_effect = lambda *args: StringIO( |
|
954 |
urlopen.side_effect = lambda *args: io.StringIO(
|
|
955 | 955 |
json.dumps({'data': [{'id': '1', 'text': 'foo'}, {'id': '2', 'text': 'bar'}]}) |
956 | 956 |
) |
957 | 957 | |
... | ... | |
997 | 997 | |
998 | 998 |
with mock.patch('wcs.qommon.misc.urlopen') as urlopen: |
999 | 999 |
value = [{'id': '1', 'text': 'foo'}] |
1000 |
urlopen.side_effect = lambda *args: StringIO(json.dumps({'data': value})) |
|
1000 |
urlopen.side_effect = lambda *args: io.StringIO(json.dumps({'data': value}))
|
|
1001 | 1001 |
assert datasource.get_structured_value('1') == value[0] |
1002 | 1002 |
assert urlopen.call_count == 1 |
1003 | 1003 |
assert urlopen.call_args[0][0] == 'http://whatever/?id=1' |
... | ... | |
1009 | 1009 |
get_request().datasources_cache = {} |
1010 | 1010 |
with mock.patch('wcs.qommon.misc.urlopen') as urlopen: |
1011 | 1011 |
value = [{'id': '1', 'text': 'bar'}, {'id': '2', 'text': 'foo'}] |
1012 |
urlopen.side_effect = lambda *args: StringIO(json.dumps({'data': value})) |
|
1012 |
urlopen.side_effect = lambda *args: io.StringIO(json.dumps({'data': value}))
|
|
1013 | 1013 |
assert datasource.get_structured_value('1') == value[0] |
1014 | 1014 |
assert urlopen.call_count == 1 |
1015 | 1015 | |
1016 | 1016 |
get_request().datasources_cache = {} |
1017 | 1017 |
with mock.patch('wcs.qommon.misc.urlopen') as urlopen: |
1018 |
urlopen.side_effect = lambda *args: StringIO(json.dumps({'data': []})) # empty list |
|
1018 |
urlopen.side_effect = lambda *args: io.StringIO(json.dumps({'data': []})) # empty list
|
|
1019 | 1019 |
assert datasource.get_structured_value('1') is None |
1020 | 1020 |
assert urlopen.call_count == 1 |
1021 | 1021 | |
1022 | 1022 |
get_request().datasources_cache = {} |
1023 | 1023 |
with mock.patch('wcs.qommon.misc.urlopen') as urlopen: |
1024 | 1024 |
value = [{'id': '1', 'text': 'foo'}] |
1025 |
urlopen.side_effect = lambda *args: StringIO(json.dumps({'data': value, 'err': 0})) |
|
1025 |
urlopen.side_effect = lambda *args: io.StringIO(json.dumps({'data': value, 'err': 0}))
|
|
1026 | 1026 |
assert datasource.get_structured_value('1') == value[0] |
1027 | 1027 |
assert urlopen.call_count == 1 |
1028 | 1028 | |
1029 | 1029 |
get_request().datasources_cache = {} |
1030 | 1030 |
with mock.patch('wcs.qommon.misc.urlopen') as urlopen: |
1031 | 1031 |
value = [{'id': '1', 'text': 'foo'}] |
1032 |
urlopen.side_effect = lambda *args: StringIO(json.dumps({'data': value, 'err': 1})) |
|
1032 |
urlopen.side_effect = lambda *args: io.StringIO(json.dumps({'data': value, 'err': 1}))
|
|
1033 | 1033 |
assert datasource.get_structured_value('1') is None |
1034 | 1034 |
assert urlopen.call_count == 1 |
1035 | 1035 |
# no cache for errors |
... | ... | |
1039 | 1039 |
get_request().datasources_cache = {} |
1040 | 1040 |
with mock.patch('wcs.qommon.misc.urlopen') as urlopen: |
1041 | 1041 |
value = {'id': '1', 'text': 'foo'} # not a list |
1042 |
urlopen.side_effect = lambda *args: StringIO(json.dumps({'data': value})) |
|
1042 |
urlopen.side_effect = lambda *args: io.StringIO(json.dumps({'data': value}))
|
|
1043 | 1043 |
assert datasource.get_structured_value('1') is None |
1044 | 1044 |
assert urlopen.call_count == 1 |
1045 | 1045 | |
1046 | 1046 |
get_request().datasources_cache = {} |
1047 | 1047 |
with mock.patch('wcs.qommon.misc.urlopen') as urlopen: |
1048 |
urlopen.side_effect = lambda *args: StringIO('not json') |
|
1048 |
urlopen.side_effect = lambda *args: io.StringIO('not json')
|
|
1049 | 1049 |
assert datasource.get_structured_value('1') is None |
1050 | 1050 |
assert urlopen.call_count == 1 |
1051 | 1051 | |
... | ... | |
1053 | 1053 |
get_request().datasources_cache = {} |
1054 | 1054 |
with mock.patch('wcs.qommon.misc.urlopen') as urlopen: |
1055 | 1055 |
value = [{'id': '1', 'text': 'bar'}, {'id': '2', 'text': 'foo'}] |
1056 |
urlopen.side_effect = lambda *args: StringIO(json.dumps({'data': value})) |
|
1056 |
urlopen.side_effect = lambda *args: io.StringIO(json.dumps({'data': value}))
|
|
1057 | 1057 |
assert datasource.get_structured_value('2') == value[1] |
1058 | 1058 |
assert urlopen.call_count == 1 |
1059 | 1059 |
# try again, get from request.datasources_cache |
... | ... | |
1062 | 1062 |
get_request().datasources_cache = {} |
1063 | 1063 |
with mock.patch('wcs.qommon.misc.urlopen') as urlopen: |
1064 | 1064 |
value = [{'id': '1', 'text': 'bar'}, {'id': '2', 'text': 'foo'}] |
1065 |
urlopen.side_effect = lambda *args: StringIO(json.dumps({'data': value})) |
|
1065 |
urlopen.side_effect = lambda *args: io.StringIO(json.dumps({'data': value}))
|
|
1066 | 1066 |
assert datasource.get_structured_value('3') is None |
1067 | 1067 |
assert urlopen.call_count == 1 |
1068 | 1068 |
# try again, get from request.datasources_cache |
tests/test_datasource_chrono.py | ||
---|---|---|
1 | 1 |
# -*- coding: utf-8 -*- |
2 | 2 | |
3 |
import io |
|
3 | 4 |
import pytest |
4 | 5 |
import json |
5 | 6 |
import shutil |
6 | 7 | |
7 |
from django.utils.six import StringIO |
|
8 | ||
9 | 8 |
from quixote import cleanup |
10 | 9 |
from wcs import fields |
11 | 10 |
from wcs.data_sources import NamedDataSource, build_agenda_datasources, collect_agenda_data |
... | ... | |
112 | 111 |
pub.load_site_options() |
113 | 112 |
NamedDataSource.wipe() |
114 | 113 | |
115 |
urlopen.side_effect = lambda *args: StringIO('{"data": []}') |
|
114 |
urlopen.side_effect = lambda *args: io.StringIO('{"data": []}')
|
|
116 | 115 |
assert collect_agenda_data(pub) == [] |
117 | 116 |
assert urlopen.call_args_list == [mock.call('http://chrono.example.net/api/agenda/')] |
118 | 117 | |
... | ... | |
122 | 121 |
assert urlopen.call_args_list == [mock.call('http://chrono.example.net/api/agenda/')] |
123 | 122 | |
124 | 123 |
# events agenda |
125 |
urlopen.side_effect = lambda *args: StringIO(json.dumps({"data": AGENDA_EVENTS_DATA})) |
|
124 |
urlopen.side_effect = lambda *args: io.StringIO(json.dumps({"data": AGENDA_EVENTS_DATA}))
|
|
126 | 125 |
urlopen.reset_mock() |
127 | 126 |
assert collect_agenda_data(pub) == [ |
128 | 127 |
{'text': 'Events A', 'url': 'http://chrono.example.net/api/agenda/events-A/datetimes/'}, |
... | ... | |
132 | 131 | |
133 | 132 |
# meetings agenda |
134 | 133 |
urlopen.side_effect = [ |
135 |
StringIO(json.dumps({"data": AGENDA_MEETINGS_DATA})), |
|
136 |
StringIO(json.dumps({"data": AGENDA_MEETING_TYPES_DATA['meetings-A']})), |
|
137 |
StringIO(json.dumps({"data": AGENDA_MEETING_TYPES_DATA['virtual-B']})), |
|
134 |
io.StringIO(json.dumps({"data": AGENDA_MEETINGS_DATA})),
|
|
135 |
io.StringIO(json.dumps({"data": AGENDA_MEETING_TYPES_DATA['meetings-A']})),
|
|
136 |
io.StringIO(json.dumps({"data": AGENDA_MEETING_TYPES_DATA['virtual-B']})),
|
|
138 | 137 |
] |
139 | 138 |
urlopen.reset_mock() |
140 | 139 |
assert collect_agenda_data(pub) == [ |
... | ... | |
164 | 163 | |
165 | 164 |
# if meeting types could not be collected |
166 | 165 |
urlopen.side_effect = [ |
167 |
StringIO(json.dumps({"data": AGENDA_MEETINGS_DATA})), |
|
168 |
StringIO(json.dumps({"data": AGENDA_MEETING_TYPES_DATA['meetings-A']})), |
|
166 |
io.StringIO(json.dumps({"data": AGENDA_MEETINGS_DATA})),
|
|
167 |
io.StringIO(json.dumps({"data": AGENDA_MEETING_TYPES_DATA['meetings-A']})),
|
|
169 | 168 |
ConnectionError, |
170 | 169 |
] |
171 | 170 |
urlopen.reset_mock() |
... | ... | |
177 | 176 |
] |
178 | 177 | |
179 | 178 |
urlopen.side_effect = [ |
180 |
StringIO(json.dumps({"data": AGENDA_MEETINGS_DATA})), |
|
179 |
io.StringIO(json.dumps({"data": AGENDA_MEETINGS_DATA})),
|
|
181 | 180 |
ConnectionError, |
182 | 181 |
] |
183 | 182 |
urlopen.reset_mock() |
tests/test_ezt.py | ||
---|---|---|
1 | 1 |
import datetime |
2 |
import io |
|
2 | 3 |
import pytest |
3 | 4 |
import os |
4 | 5 | |
5 |
from django.utils.six import StringIO |
|
6 | 6 |
from quixote import cleanup |
7 | 7 |
from wcs.qommon.ezt import ( |
8 | 8 |
Template, |
... | ... | |
36 | 36 |
def test_simple_qualifier(): |
37 | 37 |
template = Template() |
38 | 38 |
template.parse('<p>[foo]</p>') |
39 |
output = StringIO() |
|
39 |
output = io.StringIO()
|
|
40 | 40 |
template.generate(output, {'foo': 'bar'}) |
41 | 41 |
assert output.getvalue() == '<p>bar</p>' |
42 | 42 | |
... | ... | |
44 | 44 |
def test_simple_qualifier_missing_variable(): |
45 | 45 |
template = Template() |
46 | 46 |
template.parse('<p>[foo]</p>') |
47 |
output = StringIO() |
|
47 |
output = io.StringIO()
|
|
48 | 48 |
template.generate(output, {}) |
49 | 49 |
assert output.getvalue() == '<p>[foo]</p>' |
50 | 50 | |
... | ... | |
54 | 54 |
template.parse('<p>[if-any foo]bar[end]</p>') |
55 | 55 | |
56 | 56 |
# boolean |
57 |
output = StringIO() |
|
57 |
output = io.StringIO()
|
|
58 | 58 |
template.generate(output, {'foo': True}) |
59 | 59 |
assert output.getvalue() == '<p>bar</p>' |
60 | 60 | |
61 | 61 |
# no value |
62 |
output = StringIO() |
|
62 |
output = io.StringIO()
|
|
63 | 63 |
template.generate(output, {}) |
64 | 64 |
assert output.getvalue() == '<p></p>' |
65 | 65 | |
66 | 66 |
# defined but evaluating to False |
67 |
output = StringIO() |
|
67 |
output = io.StringIO()
|
|
68 | 68 |
template.generate(output, {'foo': False}) |
69 | 69 |
assert output.getvalue() == '<p>bar</p>' |
70 | 70 | |
... | ... | |
73 | 73 |
template = Template() |
74 | 74 |
template.parse('<p>[if-any foo]bar[else]baz[end]</p>') |
75 | 75 | |
76 |
output = StringIO() |
|
76 |
output = io.StringIO()
|
|
77 | 77 |
template.generate(output, {'foo': True}) |
78 | 78 |
assert output.getvalue() == '<p>bar</p>' |
79 | 79 | |
80 |
output = StringIO() |
|
80 |
output = io.StringIO()
|
|
81 | 81 |
template.generate(output, {}) |
82 | 82 |
assert output.getvalue() == '<p>baz</p>' |
83 | 83 | |
... | ... | |
87 | 87 |
template.parse('<p>[is foo "bar"]bar[end]</p>') |
88 | 88 | |
89 | 89 |
# no value |
90 |
output = StringIO() |
|
90 |
output = io.StringIO()
|
|
91 | 91 |
template.generate(output, {}) |
92 | 92 |
assert output.getvalue() == '<p></p>' |
93 | 93 | |
94 | 94 |
# defined but other value |
95 |
output = StringIO() |
|
95 |
output = io.StringIO()
|
|
96 | 96 |
template.generate(output, {'foo': 'baz'}) |
97 | 97 |
assert output.getvalue() == '<p></p>' |
98 | 98 | |
99 | 99 |
# defined with correct value |
100 |
output = StringIO() |
|
100 |
output = io.StringIO()
|
|
101 | 101 |
template.generate(output, {'foo': 'bar'}) |
102 | 102 |
assert output.getvalue() == '<p>bar</p>' |
103 | 103 | |
... | ... | |
105 | 105 |
def test_callable_qualifier(): |
106 | 106 |
template = Template() |
107 | 107 |
template.parse('<p>[foo]</p>') |
108 |
output = StringIO() |
|
108 |
output = io.StringIO()
|
|
109 | 109 |
template.generate(output, {'foo': lambda x: x.write('bar')}) |
110 | 110 |
assert output.getvalue() == '<p>bar</p>' |
111 | 111 | |
... | ... | |
113 | 113 |
def test_date_qualifier(pub): |
114 | 114 |
template = Template() |
115 | 115 |
template.parse('<p>[foo]</p>') |
116 |
output = StringIO() |
|
116 |
output = io.StringIO()
|
|
117 | 117 |
template.generate(output, {'foo': datetime.date(2019, 1, 2)}) |
118 | 118 |
assert output.getvalue() == '<p>2019-01-02</p>' |
119 | 119 | |
120 | 120 |
pub.cfg['language'] = {'language': 'fr'} |
121 | 121 |
pub.write_cfg() |
122 |
output = StringIO() |
|
122 |
output = io.StringIO()
|
|
123 | 123 |
template.generate(output, {'foo': datetime.date(2019, 1, 2)}) |
124 | 124 |
assert output.getvalue() == '<p>02/01/2019</p>' |
125 | 125 | |
... | ... | |
127 | 127 |
def test_datetime_qualifier(pub): |
128 | 128 |
template = Template() |
129 | 129 |
template.parse('<p>[foo]</p>') |
130 |
output = StringIO() |
|
130 |
output = io.StringIO()
|
|
131 | 131 |
template.generate(output, {'foo': datetime.datetime(2019, 1, 2, 14, 4)}) |
132 | 132 |
assert output.getvalue() == '<p>2019-01-02 14:04</p>' |
133 | 133 | |
134 | 134 |
pub.cfg['language'] = {'language': 'fr'} |
135 | 135 |
pub.write_cfg() |
136 |
output = StringIO() |
|
136 |
output = io.StringIO()
|
|
137 | 137 |
template.generate(output, {'foo': datetime.datetime(2019, 1, 2, 14, 4)}) |
138 | 138 |
assert output.getvalue() == '<p>02/01/2019 14:04</p>' |
139 | 139 | |
... | ... | |
181 | 181 |
def test_array_index(): |
182 | 182 |
template = Template() |
183 | 183 |
template.parse('<p>[foo.0]</p>') |
184 |
output = StringIO() |
|
184 |
output = io.StringIO()
|
|
185 | 185 |
template.generate(output, {'foo': ['bar']}) |
186 | 186 |
assert output.getvalue() == '<p>bar</p>' |
187 | 187 | |
188 | 188 |
template = Template() |
189 | 189 |
template.parse('<p>[foo.bar]</p>') |
190 |
output = StringIO() |
|
190 |
output = io.StringIO()
|
|
191 | 191 |
template.generate(output, {'foo': ['bar']}) |
192 | 192 |
assert output.getvalue() == '<p>[foo.bar]</p>' |
193 | 193 | |
... | ... | |
195 | 195 |
def test_array_subindex(): |
196 | 196 |
template = Template() |
197 | 197 |
template.parse('<p>[foo.0.1]</p>') |
198 |
output = StringIO() |
|
198 |
output = io.StringIO()
|
|
199 | 199 |
template.generate(output, {'foo': [['bar', 'baz']]}) |
200 | 200 |
assert output.getvalue() == '<p>baz</p>' |
201 | 201 | |
... | ... | |
203 | 203 |
def test_dict_index(): |
204 | 204 |
template = Template() |
205 | 205 |
template.parse('<p>[foo.a]</p>') |
206 |
output = StringIO() |
|
206 |
output = io.StringIO()
|
|
207 | 207 |
template.generate(output, {'foo': {'a': 'bar'}}) |
208 | 208 |
assert output.getvalue() == '<p>bar</p>' |
209 | 209 | |
210 | 210 |
template = Template() |
211 | 211 |
template.parse('<p>[foo.b]</p>') |
212 |
output = StringIO() |
|
212 |
output = io.StringIO()
|
|
213 | 213 |
template.generate(output, {'foo': {'a': 'bar'}}) |
214 | 214 |
assert output.getvalue() == '<p>[foo.b]</p>' |
215 | 215 | |
... | ... | |
223 | 223 |
vars = {'script': ScriptsSubstitutionProxy()} |
224 | 224 |
template = Template() |
225 | 225 |
template.parse('<p>[script.hello_world]</p>') |
226 |
output = StringIO() |
|
226 |
output = io.StringIO()
|
|
227 | 227 |
template.generate(output, vars) |
228 | 228 |
assert output.getvalue() == '<p>Hello world</p>' |
229 | 229 | |
230 | 230 |
vars = {'script': ScriptsSubstitutionProxy()} |
231 | 231 |
template = Template() |
232 | 232 |
template.parse('<p>[script.hello_world "fred"]</p>') |
233 |
output = StringIO() |
|
233 |
output = io.StringIO()
|
|
234 | 234 |
template.generate(output, vars) |
235 | 235 |
assert output.getvalue() == '<p>Hello fred</p>' |
236 | 236 |
tests/test_fc_auth.py | ||
---|---|---|
1 | 1 |
import base64 |
2 | 2 |
import json |
3 |
import urllib.parse |
|
3 | 4 | |
4 | 5 |
from django.utils.encoding import force_bytes, force_text |
5 |
from django.utils.six.moves.urllib import parse as urllib |
|
6 |
from django.utils.six.moves.urllib import parse as urlparse |
|
7 | 6 |
from quixote import cleanup, get_session_manager |
8 | 7 | |
9 | 8 |
from utilities import get_app, create_temporary_pub |
... | ... | |
126 | 125 |
resp = app.get('/login/') |
127 | 126 |
assert resp.status_int == 302 |
128 | 127 |
assert resp.location.startswith('https://fcp.integ01.dev-franceconnect.fr/api/v1/authorize') |
129 |
qs = urlparse.parse_qs(resp.location.split('?')[1]) |
|
128 |
qs = urllib.parse.parse_qs(resp.location.split('?')[1])
|
|
130 | 129 |
nonce = qs['nonce'][0] |
131 | 130 |
state = qs['state'][0] |
132 | 131 | |
... | ... | |
152 | 151 |
http_get_page.return_value = (None, 200, json.dumps(user_info_result), None) |
153 | 152 |
resp = app.get( |
154 | 153 |
'/ident/fc/callback?%s' |
155 |
% urllib.urlencode( |
|
154 |
% urllib.parse.urlencode(
|
|
156 | 155 |
{ |
157 | 156 |
'code': '1234', |
158 | 157 |
'state': state, |
... | ... | |
176 | 175 |
assert session.extra_user_variables['fc_sub'] == 'ymca' |
177 | 176 | |
178 | 177 |
resp = app.get('/logout') |
179 |
splitted = urlparse.urlsplit(resp.location) |
|
178 |
splitted = urllib.parse.urlsplit(resp.location)
|
|
180 | 179 |
assert ( |
181 |
urlparse.urlunsplit((splitted.scheme, splitted.netloc, splitted.path, '', '')) |
|
180 |
urllib.parse.urlunsplit((splitted.scheme, splitted.netloc, splitted.path, '', ''))
|
|
182 | 181 |
== 'https://fcp.integ01.dev-franceconnect.fr/api/v1/logout' |
183 | 182 |
) |
184 |
assert urlparse.parse_qs(splitted.query)['post_logout_redirect_uri'] == ['http://example.net'] |
|
185 |
assert urlparse.parse_qs(splitted.query)['id_token_hint'] |
|
183 |
assert urllib.parse.parse_qs(splitted.query)['post_logout_redirect_uri'] == ['http://example.net']
|
|
184 |
assert urllib.parse.parse_qs(splitted.query)['id_token_hint']
|
|
186 | 185 |
assert not get_session(app) |
187 | 186 | |
188 | 187 |
# Test error handling path |
189 | 188 |
resp = app.get( |
190 | 189 |
'/ident/fc/callback?%s' |
191 |
% urllib.urlencode( |
|
190 |
% urllib.parse.urlencode(
|
|
192 | 191 |
{ |
193 | 192 |
'state': state, |
194 | 193 |
'error': 'access_denied', |
... | ... | |
198 | 197 |
assert 'user did not authorize login' in caplog.records[-1].message |
199 | 198 |
resp = app.get( |
200 | 199 |
'/ident/fc/callback?%s' |
201 |
% urllib.urlencode( |
|
200 |
% urllib.parse.urlencode(
|
|
202 | 201 |
{ |
203 | 202 |
'state': state, |
204 | 203 |
'error': 'whatever', |
... | ... | |
212 | 211 |
resp = app.get(login_url) |
213 | 212 |
assert resp.status_int == 302 |
214 | 213 |
assert resp.location.startswith('https://fcp.integ01.dev-franceconnect.fr/api/v1/authorize') |
215 |
qs = urlparse.parse_qs(resp.location.split('?')[1]) |
|
214 |
qs = urllib.parse.parse_qs(resp.location.split('?')[1])
|
|
216 | 215 |
state = qs['state'][0] |
217 | 216 |
id_token['nonce'] = qs['nonce'][0] |
218 | 217 |
token_result['id_token'] = '.%s.' % force_text(base64url_encode(json.dumps(id_token))) |
... | ... | |
224 | 223 |
http_get_page.return_value = (None, 200, json.dumps(user_info_result), None) |
225 | 224 |
resp = app.get( |
226 | 225 |
'/ident/fc/callback?%s' |
227 |
% urllib.urlencode( |
|
226 |
% urllib.parse.urlencode(
|
|
228 | 227 |
{ |
229 | 228 |
'code': '1234', |
230 | 229 |
'state': state, |
... | ... | |
263 | 262 |
resp = app.get('/login/') |
264 | 263 |
assert resp.status_int == 302 |
265 | 264 |
assert resp.location.startswith('https://fcp.integ01.dev-franceconnect.fr/api/v1/authorize') |
266 |
qs = urlparse.parse_qs(resp.location.split('?')[1]) |
|
265 |
qs = urllib.parse.parse_qs(resp.location.split('?')[1])
|
|
267 | 266 |
state = qs['state'][0] |
268 | 267 |
id_token['nonce'] = qs['nonce'][0] |
269 | 268 |
token_result['id_token'] = '.%s.' % force_text(base64url_encode(json.dumps(id_token))) |
... | ... | |
280 | 279 |
http_get_page.return_value = (None, 200, json.dumps(bad_user_info_result), None) |
281 | 280 |
resp = app.get( |
282 | 281 |
'/ident/fc/callback?%s' |
283 |
% urllib.urlencode( |
|
282 |
% urllib.parse.urlencode(
|
|
284 | 283 |
{ |
285 | 284 |
'code': '1234', |
286 | 285 |
'state': state, |
tests/test_formdef.py | ||
---|---|---|
1 | 1 |
import datetime |
2 | 2 |
import glob |
3 |
import io |
|
3 | 4 |
import json |
4 | 5 |
import os |
5 | 6 |
import pickle |
... | ... | |
9 | 10 | |
10 | 11 |
import pytest |
11 | 12 | |
12 |
from django.utils import six |
|
13 | 13 |
from django.utils.encoding import force_bytes |
14 |
from django.utils.six import BytesIO |
|
15 | 14 |
from quixote import cleanup |
16 | 15 |
from wcs import fields |
17 | 16 |
from wcs.blocks import BlockDef |
... | ... | |
438 | 437 |
formdata.store() |
439 | 438 | |
440 | 439 |
formdata.evolution[-1].parts = [ |
441 |
AttachmentEvolutionPart('hello.txt', fp=BytesIO(b'hello world'), varname='testfile') |
|
440 |
AttachmentEvolutionPart('hello.txt', fp=io.BytesIO(b'hello world'), varname='testfile')
|
|
442 | 441 |
] |
443 | 442 |
formdata.store() |
444 | 443 |
assert len(glob.glob(os.path.join(pub.app_dir, 'attachments', '*/*'))) == 1 |
tests/test_formdef_import.py | ||
---|---|---|
1 | 1 |
# -*- coding: utf-8 -*- |
2 | 2 | |
3 |
import io |
|
3 | 4 |
import pytest |
4 | 5 |
import shutil |
5 | 6 |
import time |
6 | 7 |
import xml.etree.ElementTree as ET |
7 | 8 | |
8 |
from django.utils.six import BytesIO, StringIO |
|
9 | 9 |
from quixote import cleanup |
10 | 10 | |
11 | 11 |
from wcs.categories import Category |
... | ... | |
52 | 52 | |
53 | 53 |
def assert_json_import_export_works(formdef, include_id=False): |
54 | 54 |
formdef2 = FormDef.import_from_json( |
55 |
StringIO(formdef.export_to_json(include_id=include_id)), include_id=include_id |
|
55 |
io.StringIO(formdef.export_to_json(include_id=include_id)), include_id=include_id
|
|
56 | 56 |
) |
57 | 57 |
assert_compare_formdef(formdef, formdef2, include_id=include_id) |
58 | 58 |
return formdef2 |
... | ... | |
216 | 216 | |
217 | 217 |
upload = Upload('/foo/bar', content_type='application/vnd.oasis.opendocument.text') |
218 | 218 |
file_content = b'''PK\x03\x04\x14\x00\x00\x08\x00\x00\'l\x8eG^\xc62\x0c\'\x00''' |
219 |
upload.fp = BytesIO() |
|
219 |
upload.fp = io.BytesIO()
|
|
220 | 220 |
upload.fp.write(file_content) |
221 | 221 |
upload.fp.seek(0) |
222 | 222 |
model_file = UploadedFile(pub.APP_DIR, None, upload) |
... | ... | |
339 | 339 |
formdef.fields = [fields.StringField(id='1', type='XXX')] |
340 | 340 |
export = ET.tostring(export_to_indented_xml(formdef)) |
341 | 341 |
with pytest.raises(FormdefImportError): |
342 |
FormDef.import_from_xml(BytesIO(export), include_id=True) |
|
342 |
FormDef.import_from_xml(io.BytesIO(export), include_id=True)
|
|
343 | 343 | |
344 | 344 | |
345 | 345 |
def test_invalid_data_source(): |
... | ... | |
352 | 352 |
export = export.replace( |
353 | 353 |
b'<data_source><type>xxx</type></data_source>', b'<data_source><type/></data_source>' |
354 | 354 |
) |
355 |
formdef2 = FormDef.import_from_xml(BytesIO(export)) |
|
355 |
formdef2 = FormDef.import_from_xml(io.BytesIO(export))
|
|
356 | 356 |
assert formdef2.fields[0].data_source == {} |
357 | 357 | |
358 | 358 |
export = export.replace(b'<data_source><type/></data_source>', b'<data_source> </data_source>') |
359 |
formdef2 = FormDef.import_from_xml(BytesIO(export)) |
|
359 |
formdef2 = FormDef.import_from_xml(io.BytesIO(export))
|
|
360 | 360 |
assert formdef2.fields[0].data_source == {} |
361 | 361 | |
362 | 362 | |
... | ... | |
368 | 368 |
] |
369 | 369 |
export = ET.tostring(export_to_indented_xml(formdef)) |
370 | 370 | |
371 |
FormDef.import_from_xml(BytesIO(export)) |
|
371 |
FormDef.import_from_xml(io.BytesIO(export))
|
|
372 | 372 | |
373 | 373 |
formdef.fields = [fields.StringField(id='1', type='string', data_source={'type': 'foobar'})] |
374 | 374 |
export = ET.tostring(export_to_indented_xml(formdef)) |
375 | 375 |
with pytest.raises(FormdefImportError, match='Unknown datasources'): |
376 |
FormDef.import_from_xml(BytesIO(export)) |
|
376 |
FormDef.import_from_xml(io.BytesIO(export))
|
|
377 | 377 | |
378 | 378 |
# carddef as datasource |
379 | 379 |
carddef = CardDef() |
... | ... | |
385 | 385 | |
386 | 386 |
formdef.fields = [fields.StringField(id='1', type='string', data_source={'type': 'carddef:foo'})] |
387 | 387 |
export = ET.tostring(export_to_indented_xml(formdef)) |
388 |
FormDef.import_from_xml(BytesIO(export)) |
|
388 |
FormDef.import_from_xml(io.BytesIO(export))
|
|
389 | 389 | |
390 | 390 |
formdef.fields = [fields.StringField(id='1', type='string', data_source={'type': 'carddef:unknown'})] |
391 | 391 |
export = ET.tostring(export_to_indented_xml(formdef)) |
392 | 392 |
with pytest.raises(FormdefImportError): |
393 |
FormDef.import_from_xml(BytesIO(export)) |
|
393 |
FormDef.import_from_xml(io.BytesIO(export))
|
|
394 | 394 | |
395 | 395 |
# carddef custom view as datasource |
396 | 396 |
pub.custom_view_class.wipe() |
... | ... | |
406 | 406 |
fields.StringField(id='1', type='string', data_source={'type': 'carddef:foo:card-view'}) |
407 | 407 |
] |
408 | 408 |
export = ET.tostring(export_to_indented_xml(formdef)) |
409 |
FormDef.import_from_xml(BytesIO(export)) |
|
409 |
FormDef.import_from_xml(io.BytesIO(export))
|
|
410 | 410 | |
411 | 411 |
formdef.fields = [fields.StringField(id='1', type='string', data_source={'type': 'carddef:foo:unknown'})] |
412 | 412 |
export = ET.tostring(export_to_indented_xml(formdef)) |
413 | 413 |
with pytest.raises(FormdefImportError): |
414 |
FormDef.import_from_xml(BytesIO(export)) |
|
414 |
FormDef.import_from_xml(io.BytesIO(export))
|
|
415 | 415 | |
416 | 416 | |
417 | 417 |
def test_duplicated_field_ids(): |
... | ... | |
425 | 425 |
export = ET.tostring(export_to_indented_xml(formdef, include_id=True)) |
426 | 426 | |
427 | 427 |
with pytest.raises(FormdefImportError): |
428 |
FormDef.import_from_xml(BytesIO(export)) |
|
428 |
FormDef.import_from_xml(io.BytesIO(export))
|
|
429 | 429 | |
430 | 430 |
with pytest.raises(FormdefImportError): |
431 |
FormDef.import_from_xml(BytesIO(export), include_id=True) |
|
431 |
FormDef.import_from_xml(io.BytesIO(export), include_id=True)
|
|
432 | 432 | |
433 |
formdef2 = FormDef.import_from_xml(BytesIO(export), fix_on_error=True) |
|
433 |
formdef2 = FormDef.import_from_xml(io.BytesIO(export), fix_on_error=True)
|
|
434 | 434 |
assert formdef2.fields[0].id == '1' |
435 | 435 |
assert formdef2.fields[1].id == '2' |
436 | 436 |
assert formdef2.fields[2].id == '3' |
... | ... | |
446 | 446 |
formdef.max_field_id = 1 |
447 | 447 |
export = ET.tostring(export_to_indented_xml(formdef, include_id=True)) |
448 | 448 | |
449 |
formdef2 = FormDef.import_from_xml(BytesIO(export), include_id=True) |
|
449 |
formdef2 = FormDef.import_from_xml(io.BytesIO(export), include_id=True)
|
|
450 | 450 |
assert formdef2.max_field_id == 2 |
451 | 451 | |
452 | 452 | |
... | ... | |
639 | 639 |
old_format = ET.tostring(formdef_xml).replace( |
640 | 640 |
b'<validation><type>regex</type><value>\\d</value></validation>', b'<validation>\\d</validation>' |
641 | 641 |
) |
642 |
f2 = FormDef.import_from_xml(BytesIO(old_format)) |
|
642 |
f2 = FormDef.import_from_xml(io.BytesIO(old_format))
|
|
643 | 643 |
assert len(f2.fields) == len(formdef.fields) |
644 | 644 |
assert f2.fields[0].validation == {'type': 'regex', 'value': '\\d'} |
645 | 645 | |
... | ... | |
745 | 745 |
formdef.data_class().wipe() |
746 | 746 |
pub.custom_view_class.wipe() |
747 | 747 | |
748 |
formdef2 = FormDef.import_from_xml(BytesIO(ET.tostring(formdef_xml))) |
|
748 |
formdef2 = FormDef.import_from_xml(io.BytesIO(ET.tostring(formdef_xml)))
|
|
749 | 749 |
assert formdef2.name == 'foo' |
750 | 750 |
assert formdef2._custom_views |
751 | 751 |
tests/test_hobo.py | ||
---|---|---|
13 | 13 | |
14 | 14 |
from utilities import create_temporary_pub, clean_temporary_pub |
15 | 15 | |
16 |
from django.utils.six.moves.urllib import parse as urlparse
|
|
16 |
import urllib.parse
|
|
17 | 17 |
from quixote import cleanup |
18 | 18 | |
19 | 19 |
from wcs.qommon import force_str |
... | ... | |
243 | 243 |
key = '109fca71e7dc8ec49708a08fa7c02795de13f34f7d29d27bd150f203b3e0ab40' |
244 | 244 |
assert pub.get_site_option('authentic.example.net', 'api-secrets') == key |
245 | 245 |
assert pub.get_site_option('authentic.example.net', 'wscall-secrets') == key |
246 |
self_domain = urlparse.urlsplit(service.get('base_url')).netloc |
|
246 |
self_domain = urllib.parse.urlsplit(service.get('base_url')).netloc
|
|
247 | 247 |
assert pub.get_site_option(self_domain, 'wscall-secrets') != '0' |
248 | 248 | |
249 | 249 |
service['variables']['xxx'] = None |
tests/test_mail_templates.py | ||
---|---|---|
1 | 1 |
# -*- coding: utf-8 -*- |
2 | 2 | |
3 | 3 |
import base64 |
4 |
import io |
|
4 | 5 |
import os |
5 | 6 |
import pytest |
6 | 7 |
from webtest import Upload |
7 | 8 |
import xml.etree.ElementTree as ET |
8 | 9 | |
9 | 10 |
from django.utils.encoding import force_bytes |
10 |
from django.utils.six import StringIO |
|
11 | 11 |
from quixote import cleanup |
12 | 12 |
from wcs.formdef import FormDef |
13 | 13 |
from wcs.fields import FileField |
... | ... | |
325 | 325 |
resp = resp.click(href='export') |
326 | 326 |
xml_export = resp.text |
327 | 327 | |
328 |
ds = StringIO(xml_export) |
|
328 |
ds = io.StringIO(xml_export)
|
|
329 | 329 |
mail_template2 = MailTemplate.import_from_xml(ds) |
330 | 330 |
assert mail_template2.name == 'test MT' |
331 | 331 |
tests/test_publisher.py | ||
---|---|---|
1 |
import io |
|
1 | 2 |
import json |
2 | 3 |
import re |
3 | 4 |
import stat |
... | ... | |
17 | 18 |
from django.core.management.base import CommandError |
18 | 19 |
from django.http import Http404 |
19 | 20 |
from django.test import override_settings |
20 |
from django.utils import six |
|
21 | 21 |
from django.utils.encoding import force_text |
22 |
from django.utils.six import BytesIO, StringIO |
|
23 | 22 |
from quixote import cleanup |
24 | 23 |
from wcs.qommon import get_publisher_class |
25 | 24 |
from wcs.qommon.afterjobs import AfterJob |
... | ... | |
115 | 114 | |
116 | 115 |
def test_finish_interrupted_request(): |
117 | 116 |
req = HTTPRequest( |
118 |
StringIO(''), |
|
117 |
io.StringIO(''),
|
|
119 | 118 |
{ |
120 | 119 |
'SERVER_NAME': 'example.net', |
121 | 120 |
'SCRIPT_NAME': '', |
... | ... | |
125 | 124 |
response = pub.process_request(req) |
126 | 125 |
assert b'invalid content-length header' in response.getvalue() |
127 | 126 |
req = HTTPRequest( |
128 |
StringIO(''), |
|
127 |
io.StringIO(''),
|
|
129 | 128 |
{ |
130 | 129 |
'SERVER_NAME': 'example.net', |
131 | 130 |
'SCRIPT_NAME': '', |
... | ... | |
136 | 135 |
response = pub.process_request(req) |
137 | 136 |
assert b'Invalid request: unexpected end of request body' in response.getvalue() |
138 | 137 |
req = HTTPRequest( |
139 |
StringIO(''), |
|
138 |
io.StringIO(''),
|
|
140 | 139 |
{ |
141 | 140 |
'SERVER_NAME': 'example.net', |
142 | 141 |
'SCRIPT_NAME': '', |
... | ... | |
148 | 147 |
assert b'Invalid request: multipart/form-data missing boundary' in response.getvalue() |
149 | 148 |
with pytest.raises(Http404): |
150 | 149 |
req = HTTPRequest( |
151 |
StringIO(''), |
|
150 |
io.StringIO(''),
|
|
152 | 151 |
{ |
153 | 152 |
'SERVER_NAME': 'example.net', |
154 | 153 |
'SCRIPT_NAME': '', |
... | ... | |
184 | 183 |
pub.cfg['sp'] = {'what': 'ever'} |
185 | 184 |
pub.write_cfg() |
186 | 185 | |
187 |
c = BytesIO() |
|
186 |
c = io.BytesIO()
|
|
188 | 187 |
z = zipfile.ZipFile(c, 'w') |
189 | 188 |
z.writestr('config.pck', pickle.dumps({'language': {'language': 'fr'}, 'whatever': ['a', 'b', 'c']})) |
190 | 189 |
z.close() |
... | ... | |
195 | 194 |
assert pub.cfg['whatever'] == ['a', 'b', 'c'] |
196 | 195 |
assert pub.cfg['sp'] == {'what': 'ever'} |
197 | 196 | |
198 |
c = BytesIO() |
|
197 |
c = io.BytesIO()
|
|
199 | 198 |
z = zipfile.ZipFile(c, 'w') |
200 | 199 |
z.writestr( |
201 | 200 |
'config.json', json.dumps({'language': {'language': 'en'}, 'whatever2': ['a', 'b', {'c': 'd'}]}) |
tests/test_saml2utils.py | ||
---|---|---|
1 | 1 |
import shutil |
2 | 2 | |
3 |
from django.utils import six |
|
4 | ||
5 | 3 |
from quixote import cleanup |
6 | 4 | |
7 | 5 |
from wcs.qommon import x509utils |
... | ... | |
34 | 32 |
) |
35 | 33 |
assert meta != None |
36 | 34 |
content = meta.get_saml2_metadata(pkey, '', True, True) |
37 |
assert isinstance(content, six.string_types) and content != ''
|
|
35 |
assert isinstance(content, str) and content != ''
|
|
38 | 36 |
assert 'EntityDescriptor' in content |
39 | 37 |
assert 'SPSSODescriptor' in content |
tests/test_saml_auth.py | ||
---|---|---|
10 | 10 | |
11 | 11 |
import pytest |
12 | 12 | |
13 |
from django.utils.six.moves.urllib import parse as urlparse
|
|
13 |
import urllib.parse
|
|
14 | 14 |
from quixote import cleanup |
15 | 15 |
from quixote import get_session, get_session_manager |
16 | 16 |
from wcs.qommon.http_request import HTTPRequest |
... | ... | |
350 | 350 |
assert resp.status_int == 302 |
351 | 351 |
assert resp.location.startswith('http://sso.example.net/saml2/sso?SAMLRequest=') |
352 | 352 |
request = lasso.Samlp2AuthnRequest() |
353 |
request.initFromQuery(urlparse.urlparse(resp.location).query) |
|
353 |
request.initFromQuery(urllib.parse.urlparse(resp.location).query)
|
|
354 | 354 |
assert request.forceAuthn is False |
355 | 355 | |
356 | 356 | |
... | ... | |
359 | 359 |
assert resp.status_int == 302 |
360 | 360 |
assert resp.location.startswith('http://sso.example.net/saml2/sso?SAMLRequest=') |
361 | 361 |
request = lasso.Samlp2AuthnRequest() |
362 |
request.initFromQuery(urlparse.urlparse(resp.location).query) |
|
362 |
request.initFromQuery(urllib.parse.urlparse(resp.location).query)
|
|
363 | 363 |
assert request.forceAuthn is True |
364 | 364 | |
365 | 365 | |
... | ... | |
380 | 380 |
assert resp.location.startswith('http://example.net/login/?next=') |
381 | 381 |
resp = resp.follow() |
382 | 382 |
assert resp.location.startswith('http://sso.example.net/saml2/sso') |
383 |
assert urlparse.parse_qs(urlparse.urlparse(resp.location).query)['SAMLRequest']
|
|
384 |
assert urlparse.parse_qs(urlparse.urlparse(resp.location).query)['RelayState'] == [
|
|
383 |
assert urllib.parse.parse_qs(urllib.parse.urlparse(resp.location).query)['SAMLRequest']
|
|
384 |
assert urllib.parse.parse_qs(urllib.parse.urlparse(resp.location).query)['RelayState'] == [
|
|
385 | 385 |
'http://example.net/backoffice/' |
386 | 386 |
] |
387 | 387 | |
388 | 388 |
request = lasso.Samlp2AuthnRequest() |
389 |
request.initFromQuery(urlparse.urlparse(resp.location).query) |
|
389 |
request.initFromQuery(urllib.parse.urlparse(resp.location).query)
|
|
390 | 390 |
assert ':next_url>http://example.net/backoffice/<' in request.getOriginalXmlnode() |
391 | 391 | |
392 | 392 | |
... | ... | |
395 | 395 |
assert resp.status_int == 302 |
396 | 396 |
assert resp.location.startswith('http://sso.example.net/saml2/sso') |
397 | 397 |
request = lasso.Samlp2AuthnRequest() |
398 |
request.initFromQuery(urlparse.urlparse(resp.location).query) |
|
398 |
request.initFromQuery(urllib.parse.urlparse(resp.location).query)
|
|
399 | 399 |
assert 'login-hint' not in request.getOriginalXmlnode() |
400 | 400 | |
401 | 401 |
resp = get_app(pub).get('/backoffice/') |
... | ... | |
404 | 404 |
resp = resp.follow() |
405 | 405 |
assert resp.location.startswith('http://sso.example.net/saml2/sso') |
406 | 406 |
request = lasso.Samlp2AuthnRequest() |
407 |
request.initFromQuery(urlparse.urlparse(resp.location).query) |
|
407 |
request.initFromQuery(urllib.parse.urlparse(resp.location).query)
|
|
408 | 408 |
assert ':login-hint>backoffice<' in request.getOriginalXmlnode() |
409 | 409 | |
410 | 410 |
resp = get_app(pub).get('http://example.net/login/?next=/backoffice/') |
411 | 411 |
request = lasso.Samlp2AuthnRequest() |
412 |
request.initFromQuery(urlparse.urlparse(resp.location).query) |
|
412 |
request.initFromQuery(urllib.parse.urlparse(resp.location).query)
|
|
413 | 413 |
assert ':login-hint>backoffice<' in request.getOriginalXmlnode() |
414 | 414 | |
415 | 415 | |
... | ... | |
507 | 507 |
logout.buildRequestMsg() |
508 | 508 | |
509 | 509 |
# process logout message |
510 |
saml2.slo_idp(urlparse.urlparse(logout.msgUrl).query) |
|
510 |
saml2.slo_idp(urllib.parse.urlparse(logout.msgUrl).query)
|
|
511 | 511 |
assert req.response.headers['location'].startswith( |
512 | 512 |
'http://sso.example.net/saml2/slo_return?SAMLResponse=' |
513 | 513 |
) |
tests/test_snapshots.py | ||
---|---|---|
1 |
import io |
|
1 | 2 |
import os |
2 | 3 |
import shutil |
3 | 4 |
import xml.etree.ElementTree as ET |
4 | 5 | |
5 | 6 |
import pytest |
6 | 7 | |
7 |
from django.utils.six import BytesIO |
|
8 | 8 |
from quixote.http_request import Upload |
9 | 9 | |
10 | 10 |
from wcs.blocks import BlockDef |
... | ... | |
410 | 410 |
export_to.label = 'test' |
411 | 411 |
upload = Upload('/foo/bar', content_type='application/vnd.oasis.opendocument.text') |
412 | 412 |
file_content = b'''PK\x03\x04\x14\x00\x00\x08\x00\x00\'l\x8eG^\xc62\x0c\'\x00''' |
413 |
upload.fp = BytesIO() |
|
413 |
upload.fp = io.BytesIO()
|
|
414 | 414 |
upload.fp.write(file_content) |
415 | 415 |
upload.fp.seek(0) |
416 | 416 |
export_to.model_file = UploadedFile('models', 'tmp', upload) |
tests/test_workflow_import.py | ||
---|---|---|
1 | 1 |
# -*- coding: utf-8 -*- |
2 | 2 | |
3 |
import io |
|
3 | 4 |
import pytest |
4 | 5 |
import xml.etree.ElementTree as ET |
5 | 6 | |
6 | 7 |
from quixote.http_request import Upload |
7 | 8 | |
8 |
from django.utils.six import BytesIO |
|
9 | ||
10 | 9 |
from wcs.carddef import CardDef |
11 | 10 |
from wcs.formdef import FormDef |
12 | 11 |
from wcs.mail_templates import MailTemplate |
... | ... | |
202 | 201 |
b'<item role_id="2">Test Role named existing role</item>', |
203 | 202 |
b'<item>Test Role named existing role</item>', |
204 | 203 |
) |
205 |
wf3 = Workflow.import_from_xml_tree(ET.parse(BytesIO(xml_export))) |
|
204 |
wf3 = Workflow.import_from_xml_tree(ET.parse(io.BytesIO(xml_export)))
|
|
206 | 205 |
assert wf3.possible_status[0].items[0].by == ['2'] |
207 | 206 | |
208 | 207 | |
... | ... | |
235 | 234 |
xml_export = xml_export_orig.replace( |
236 | 235 |
b'<item role_id="3">Test Role A</item>', b'<item role_id="4">Test Role A</item>' |
237 | 236 |
) |
238 |
wf3 = Workflow.import_from_xml_tree(ET.parse(BytesIO(xml_export))) |
|
237 |
wf3 = Workflow.import_from_xml_tree(ET.parse(io.BytesIO(xml_export)))
|
|
239 | 238 |
assert wf3.possible_status[0].items[0].by == ['3'] |
240 | 239 | |
241 | 240 |
# check that it creates a new role if there's no match on id and name |
... | ... | |
243 | 242 |
b'<item role_id="3">Test Role A</item>', b'<item role_id="999">foobar</item>' |
244 | 243 |
) |
245 | 244 |
nb_roles = Role.count() |
246 |
wf3 = Workflow.import_from_xml_tree(ET.parse(BytesIO(xml_export))) |
|
245 |
wf3 = Workflow.import_from_xml_tree(ET.parse(io.BytesIO(xml_export)))
|
|
247 | 246 |
assert Role.count() == nb_roles + 1 |
248 | 247 | |
249 | 248 |
# check that it doesn't fallback on the id if there's no match on the |
... | ... | |
252 | 251 |
xml_export = xml_export_orig.replace( |
253 | 252 |
b'<item role_id="3">Test Role A</item>', b'<item role_id="3">Test Role C</item>' |
254 | 253 |
) |
255 |
wf3 = Workflow.import_from_xml_tree(ET.parse(BytesIO(xml_export))) |
|
254 |
wf3 = Workflow.import_from_xml_tree(ET.parse(io.BytesIO(xml_export)))
|
|
256 | 255 |
assert wf3.possible_status[0].items[0].by != ['3'] |
257 | 256 |
assert Role.count() == nb_roles + 1 |
258 | 257 | |
259 | 258 |
# on the other hand, check that it uses the id when included_id is True |
260 |
wf3 = Workflow.import_from_xml_tree(ET.parse(BytesIO(xml_export)), include_id=True) |
|
259 |
wf3 = Workflow.import_from_xml_tree(ET.parse(io.BytesIO(xml_export)), include_id=True)
|
|
261 | 260 |
assert wf3.possible_status[0].items[0].by == ['3'] |
262 | 261 | |
263 | 262 | |
... | ... | |
291 | 290 |
export_to.label = 'test' |
292 | 291 |
upload = Upload('/foo/bar', content_type='application/vnd.oasis.opendocument.text') |
293 | 292 |
file_content = b'''PK\x03\x04\x14\x00\x00\x08\x00\x00\'l\x8eG^\xc62\x0c\'\x00''' |
294 |
upload.fp = BytesIO() |
|
293 |
upload.fp = io.BytesIO()
|
|
295 | 294 |
upload.fp.write(file_content) |
296 | 295 |
upload.fp.seek(0) |
297 | 296 |
export_to.model_file = UploadedFile(pub.APP_DIR, None, upload) |
... | ... | |
308 | 307 |
export_to.label = 'test' |
309 | 308 |
upload = Upload('/foo/bar', content_type='text/rtf') |
310 | 309 |
file_content = b'' |
311 |
upload.fp = BytesIO() |
|
310 |
upload.fp = io.BytesIO()
|
|
312 | 311 |
upload.fp.write(file_content) |
313 | 312 |
upload.fp.seek(0) |
314 | 313 |
export_to.model_file = UploadedFile(pub.APP_DIR, None, upload) |
... | ... | |
397 | 396 |
assert b'<required>True</required>' in xml_export |
398 | 397 |
xml_export = xml_export.replace(b'<required>True</required>', b'') |
399 | 398 |
assert b'<required>True</required>' not in xml_export |
400 |
wf2 = Workflow.import_from_xml_tree(ET.parse(BytesIO(xml_export))) |
|
399 |
wf2 = Workflow.import_from_xml_tree(ET.parse(io.BytesIO(xml_export)))
|
|
401 | 400 |
assert wf2.possible_status[0].items[0].required is False |
402 | 401 | |
403 | 402 |
commentable.button_label = 'button label' |
... | ... | |
930 | 929 |
for wf in [wf1, wf2, wf3]: |
931 | 930 |
export = ET.tostring(export_to_indented_xml(wf)) |
932 | 931 |
with pytest.raises(WorkflowImportError, match='Unknown datasources'): |
933 |
Workflow.import_from_xml(BytesIO(export)) |
|
932 |
Workflow.import_from_xml(io.BytesIO(export))
|
|
934 | 933 | |
935 | 934 |
# carddef as datasource |
936 | 935 |
CardDef.wipe() |
... | ... | |
945 | 944 | |
946 | 945 |
for wf in [wf1, wf2, wf3]: |
947 | 946 |
export = ET.tostring(export_to_indented_xml(wf)) |
948 |
Workflow.import_from_xml(BytesIO(export)) |
|
947 |
Workflow.import_from_xml(io.BytesIO(export))
|
|
949 | 948 | |
950 | 949 |
display_form.formdef.fields[0].data_source = {'type': 'carddef:unknown'} |
951 | 950 |
wf2.variables_formdef.fields[0].data_source = {'type': 'carddef:unknown'} |
... | ... | |
954 | 953 |
for wf in [wf1, wf2, wf3]: |
955 | 954 |
export = ET.tostring(export_to_indented_xml(wf)) |
956 | 955 |
with pytest.raises(WorkflowImportError, match='Unknown datasources'): |
957 |
Workflow.import_from_xml(BytesIO(export)) |
|
956 |
Workflow.import_from_xml(io.BytesIO(export))
|
|
958 | 957 | |
959 | 958 |
# carddef custom view as datasource |
960 | 959 |
pub.custom_view_class.wipe() |
... | ... | |
972 | 971 | |
973 | 972 |
for wf in [wf1, wf2, wf3]: |
974 | 973 |
export = ET.tostring(export_to_indented_xml(wf)) |
975 |
Workflow.import_from_xml(BytesIO(export)) |
|
974 |
Workflow.import_from_xml(io.BytesIO(export))
|
|
976 | 975 | |
977 | 976 |
display_form.formdef.fields[0].data_source = {'type': 'carddef:foo:unknown'} |
978 | 977 |
wf2.variables_formdef.fields[0].data_source = {'type': 'carddef:foo:unknown'} |
... | ... | |
981 | 980 |
for wf in [wf1, wf2, wf3]: |
982 | 981 |
export = ET.tostring(export_to_indented_xml(wf)) |
983 | 982 |
with pytest.raises(WorkflowImportError, match='Unknown datasources'): |
984 |
Workflow.import_from_xml(BytesIO(export)) |
|
983 |
Workflow.import_from_xml(io.BytesIO(export)) |
tests/test_workflows.py | ||
---|---|---|
1 | 1 |
import base64 |
2 |
import io |
|
2 | 3 |
import json |
3 | 4 |
import datetime |
4 | 5 |
import os |
... | ... | |
14 | 15 |
except ImportError: |
15 | 16 |
Image = None |
16 | 17 | |
17 |
from django.utils import six |
|
18 | 18 |
from django.utils.encoding import force_bytes, force_text |
19 |
from django.utils.six import BytesIO, StringIO |
|
20 |
from django.utils.six.moves.urllib import parse as urlparse |
|
19 |
import urllib.parse |
|
21 | 20 | |
22 | 21 |
from quixote import cleanup, get_response |
23 | 22 |
from wcs.qommon.errors import ConnectionError |
... | ... | |
1008 | 1007 |
shutil.rmtree(os.path.join(get_publisher().app_dir, 'attachments')) |
1009 | 1008 | |
1010 | 1009 |
formdata.evolution[-1].parts = [ |
1011 |
AttachmentEvolutionPart('hello.txt', fp=BytesIO(b'hello world'), varname='testfile') |
|
1010 |
AttachmentEvolutionPart('hello.txt', fp=io.BytesIO(b'hello world'), varname='testfile')
|
|
1012 | 1011 |
] |
1013 | 1012 |
formdata.store() |
1014 | 1013 |
assert len(os.listdir(os.path.join(get_publisher().app_dir, 'attachments'))) == 1 |
... | ... | |
1961 | 1960 |
pub.substitutions.feed(formdata) |
1962 | 1961 |
item.perform(formdata) |
1963 | 1962 |
assert http_requests.get_last('method') == 'GET' |
1964 |
qs = urlparse.parse_qs(http_requests.get_last('url').split('?')[1]) |
|
1963 |
qs = urllib.parse.parse_qs(http_requests.get_last('url').split('?')[1])
|
|
1965 | 1964 |
assert set(qs.keys()) == set(['in_url', 'str', 'one', 'evalme', 'django', 'ezt']) |
1966 | 1965 |
assert qs['in_url'] == ['1', '2'] |
1967 | 1966 |
assert qs['one'] == ['1'] |
... | ... | |
3188 | 3187 |
) |
3189 | 3188 |
item.perform(formdata) |
3190 | 3189 |
assert 'https://nominatim.entrouvert.org/search' in http_get_page.call_args[0][0] |
3191 |
assert urlparse.quote('169 rue du chateau, paris') in http_get_page.call_args[0][0] |
|
3190 |
assert urllib.parse.quote('169 rue du chateau, paris') in http_get_page.call_args[0][0]
|
|
3192 | 3191 |
assert int(formdata.geolocations['base']['lat']) == 48 |
3193 | 3192 |
assert int(formdata.geolocations['base']['lon']) == 2 |
3194 | 3193 | |
... | ... | |
3203 | 3202 |
) |
3204 | 3203 |
item.perform(formdata) |
3205 | 3204 |
assert 'https://nominatim.entrouvert.org/search' in http_get_page.call_args[0][0] |
3206 |
assert urlparse.quote('169 rue du chateau, paris') in http_get_page.call_args[0][0] |
|
3205 |
assert urllib.parse.quote('169 rue du chateau, paris') in http_get_page.call_args[0][0]
|
|
3207 | 3206 |
assert 'key=KEY' in http_get_page.call_args[0][0] |
3208 | 3207 |
assert int(formdata.geolocations['base']['lat']) == 48 |
3209 | 3208 |
assert int(formdata.geolocations['base']['lon']) == 2 |
... | ... | |
3435 | 3434 |
template_filename = os.path.join(os.path.dirname(__file__), 'template-with-image.odt') |
3436 | 3435 |
template = open(template_filename, 'rb').read() |
3437 | 3436 |
upload = QuixoteUpload('/foo/template.odt', content_type='application/octet-stream') |
3438 |
upload.fp = BytesIO() |
|
3437 |
upload.fp = io.BytesIO()
|
|
3439 | 3438 |
upload.fp.write(template) |
3440 | 3439 |
upload.fp.seek(0) |
3441 | 3440 |
item.model_file = UploadedFile(pub.app_dir, None, upload) |
... | ... | |
3499 | 3498 |
template_filename = os.path.join(os.path.dirname(__file__), 'template.odt') |
3500 | 3499 |
template = open(template_filename, 'rb').read() |
3501 | 3500 |
upload = QuixoteUpload('/foo/template.odt', content_type='application/octet-stream') |
3502 |
upload.fp = BytesIO() |
|
3501 |
upload.fp = io.BytesIO()
|
|
3503 | 3502 |
upload.fp.write(template) |
3504 | 3503 |
upload.fp.seek(0) |
3505 | 3504 |
item.model_file = UploadedFile(pub.app_dir, None, upload) |
... | ... | |
3557 | 3556 |
template_filename = os.path.join(os.path.dirname(__file__), 'template-django.odt') |
3558 | 3557 |
template = open(template_filename, 'rb').read() |
3559 | 3558 |
upload = QuixoteUpload('/foo/template-django.odt', content_type='application/octet-stream') |
3560 |
upload.fp = BytesIO() |
|
3559 |
upload.fp = io.BytesIO()
|
|
3561 | 3560 |
upload.fp.write(template) |
3562 | 3561 |
upload.fp.seek(0) |
3563 | 3562 |
item.model_file = UploadedFile(pub.app_dir, None, upload) |
... | ... | |
3629 | 3628 |
template_filename = os.path.join(os.path.dirname(__file__), filename) |
3630 | 3629 |
template = open(template_filename, 'rb').read() |
3631 | 3630 |
upload = QuixoteUpload(filename, content_type='application/octet-stream') |
3632 |
upload.fp = BytesIO() |
|
3631 |
upload.fp = io.BytesIO()
|
|
3633 | 3632 |
upload.fp.write(template) |
3634 | 3633 |
upload.fp.seek(0) |
3635 | 3634 |
item.model_file = UploadedFile(pub.app_dir, None, upload) |
tests/utilities.py | ||
---|---|---|
14 | 14 |
from quixote import cleanup, get_publisher |
15 | 15 |
from django.conf import settings |
16 | 16 |
from django.utils.encoding import force_bytes, force_text |
17 |
from django.utils.six.moves.urllib import parse as urlparse
|
|
17 |
import urllib.parse
|
|
18 | 18 | |
19 | 19 |
from wcs.qommon import force_str |
20 | 20 |
import wcs |
... | ... | |
330 | 330 |
{'url': url, 'method': method, 'body': body, 'headers': headers, 'timeout': timeout} |
331 | 331 |
) |
332 | 332 | |
333 |
scheme, netloc, path, params, query, fragment = urlparse.urlparse(url) |
|
334 |
base_url = urlparse.urlunparse((scheme, netloc, path, '', '', '')) |
|
333 |
scheme, netloc, path, params, query, fragment = urllib.parse.urlparse(url)
|
|
334 |
base_url = urllib.parse.urlunparse((scheme, netloc, path, '', '', ''))
|
|
335 | 335 | |
336 | 336 |
with open(os.path.join(os.path.dirname(__file__), 'idp_metadata.xml')) as fd: |
337 | 337 |
metadata = fd.read() |
wcs/admin/forms.py | ||
---|---|---|
16 | 16 |
# You should have received a copy of the GNU General Public License |
17 | 17 |
# along with this program; if not, see <http://www.gnu.org/licenses/>. |
18 | 18 | |
19 |
import io |
|
19 | 20 |
import xml.etree.ElementTree as ET |
20 | 21 |
import datetime |
21 | 22 |
import difflib |
22 | 23 |
import tarfile |
23 | 24 |
import time |
24 | 25 | |
25 |
from django.utils.six import BytesIO, StringIO |
|
26 | ||
27 | 26 |
from quixote import get_publisher, get_response, redirect |
28 | 27 |
from quixote.directory import Directory, AccessControlled |
29 | 28 |
from quixote.html import TemplateIO, htmltext |
... | ... | |
1247 | 1246 |
if form.get_widget('file').parse(): |
1248 | 1247 |
fp = form.get_widget('file').parse().fp |
1249 | 1248 |
elif form.get_widget('new_formdef').parse(): |
1250 |
fp = StringIO(form.get_widget('new_formdef').parse()) |
|
1249 |
fp = io.StringIO(form.get_widget('new_formdef').parse())
|
|
1251 | 1250 |
elif form.get_widget('url').parse(): |
1252 | 1251 |
url = form.get_widget('url').parse() |
1253 | 1252 |
try: |
... | ... | |
1513 | 1512 |
date = time.strptime(date, misc.date_format()) |
1514 | 1513 |
all_forms = [x for x in all_forms if x.last_update_time < date] |
1515 | 1514 | |
1516 |
self.fd = BytesIO() |
|
1515 |
self.fd = io.BytesIO()
|
|
1517 | 1516 |
t = tarfile.open('wcs.tar.gz', 'w:gz', fileobj=self.fd) |
1518 | 1517 |
t.add(self.formdef.get_object_filename(), 'formdef') |
1519 | 1518 |
for formdata in all_forms: |
wcs/admin/settings.py | ||
---|---|---|
16 | 16 | |
17 | 17 |
import copy |
18 | 18 |
import hashlib |
19 |
import io |
|
19 | 20 |
import mimetypes |
20 | 21 |
import os |
21 | 22 | |
... | ... | |
29 | 30 |
import xml.etree.ElementTree as ET |
30 | 31 | |
31 | 32 |
from django.utils.encoding import force_bytes |
32 |
from django.utils.six import BytesIO, StringIO |
|
33 | 33 | |
34 | 34 |
from quixote import get_publisher, get_request, get_response, redirect |
35 | 35 |
from quixote.directory import Directory |
... | ... | |
814 | 814 |
return redirect('themes') |
815 | 815 | |
816 | 816 |
parent_theme_directory = os.path.dirname(theme_directory) |
817 |
c = BytesIO() |
|
817 |
c = io.BytesIO()
|
|
818 | 818 |
z = zipfile.ZipFile(c, 'w') |
819 | 819 |
for base, dirnames, filenames in os.walk(theme_directory): |
820 | 820 |
basetheme = base[len(parent_theme_directory) + 1 :] |
... | ... | |
877 | 877 |
get_session().message = ('error', _('Theme is missing a desc.xml file.')) |
878 | 878 |
return redirect('themes') |
879 | 879 |
desc_xml = z.read('%s/desc.xml' % theme_name) |
880 |
theme_dict = template.get_theme_dict(StringIO(force_text(desc_xml))) |
|
880 |
theme_dict = template.get_theme_dict(io.StringIO(force_text(desc_xml)))
|
|
881 | 881 |
if theme_dict.get('name') != theme_name: |
882 | 882 |
get_session().message = ('error', _('desc.xml is missing a name attribute.')) |
883 | 883 |
return redirect('themes') |
... | ... | |
901 | 901 |
get_session().message = ('error', _('Error loading theme (%s).') % str(e)) |
902 | 902 |
return redirect('themes') |
903 | 903 | |
904 |
return self.install_theme_from_file(StringIO(fp.read())) |
|
904 |
return self.install_theme_from_file(io.StringIO(fp.read()))
|
|
905 | 905 | |
906 | 906 |
def template(self): |
907 | 907 |
from wcs.qommon.template import get_default_ezt_template |
... | ... | |
994 | 994 |
self.settings = settings |
995 | 995 | |
996 | 996 |
def export(self, job): |
997 |
c = BytesIO() |
|
997 |
c = io.BytesIO()
|
|
998 | 998 |
z = zipfile.ZipFile(c, 'w') |
999 | 999 |
for d in self.dirs: |
1000 | 1000 |
if d not in ( |
wcs/admin/users.py | ||
---|---|---|
14 | 14 |
# You should have received a copy of the GNU General Public License |
15 | 15 |
# along with this program; if not, see <http://www.gnu.org/licenses/>. |
16 | 16 | |
17 |
from django.utils import six |
|
18 | ||
19 | 17 |
from quixote import get_publisher, get_response, get_request, get_session, redirect |
20 | 18 |
from quixote.directory import Directory |
21 | 19 |
from quixote.html import TemplateIO, htmltext |
... | ... | |
327 | 325 |
checked_roles = None |
328 | 326 |
if get_request().form.get('filter'): |
329 | 327 |
checked_roles = get_request().form.get('role', []) |
330 |
if isinstance(checked_roles, six.string_types):
|
|
328 |
if isinstance(checked_roles, str):
|
|
331 | 329 |
checked_roles = [checked_roles] |
332 | 330 | |
333 | 331 |
if checked_roles: |
wcs/admin/workflows.py | ||
---|---|---|
18 | 18 | |
19 | 19 |
from __future__ import print_function |
20 | 20 | |
21 |
import io |
|
21 | 22 |
import time |
22 | 23 |
from subprocess import Popen, PIPE |
23 | 24 |
import textwrap |
24 | 25 |
import xml.etree.ElementTree as ET |
25 | 26 | |
26 |
from django.utils import six |
|
27 | 27 |
from django.utils.encoding import force_bytes |
28 |
from django.utils.six import StringIO |
|
29 | 28 | |
30 | 29 |
from quixote import redirect, get_publisher, get_response |
31 | 30 |
from quixote.directory import Directory |
... | ... | |
132 | 131 | |
133 | 132 | |
134 | 133 |
def graphviz(workflow, url_prefix='', select=None, svg=True, include=False): |
135 |
out = StringIO() |
|
134 |
out = io.StringIO()
|
|
136 | 135 |
# a list of colours known to graphviz, they will serve as key to get back |
137 | 136 |
# to the colours defined in wcs, they are used as color attributes in |
138 | 137 |
# graphviz (<= 2.38) then as class attribute on node elements for 2.40 and |
wcs/api.py | ||
---|---|---|
24 | 24 |
from quixote.directory import Directory |
25 | 25 | |
26 | 26 |
from django.utils.encoding import force_text |
27 |
from django.utils.six.moves.urllib import parse as urllib |
|
27 |
import urllib.parse |
|
28 | 28 |
from django.http import HttpResponse, HttpResponseBadRequest, JsonResponse |
29 | 29 | |
30 | 30 |
from .qommon import _ |
... | ... | |
956 | 956 | |
957 | 957 |
if 'url' in info: |
958 | 958 |
url = info['url'] |
959 |
url += urllib.quote(get_request().form['q']) |
|
959 |
url += urllib.parse.quote(get_request().form['q'])
|
|
960 | 960 |
url = sign_url_auto_orig(url) |
961 | 961 |
get_response().set_content_type('application/json') |
962 | 962 |
return misc.urlopen(url).read() |
... | ... | |
1050 | 1050 |
url += '&' |
1051 | 1051 |
else: |
1052 | 1052 |
url += '?' |
1053 |
url += 'format=json&q=%s' % urllib.quote(q.encode('utf-8')) |
|
1053 |
url += 'format=json&q=%s' % urllib.parse.quote(q.encode('utf-8'))
|
|
1054 | 1054 |
url += '&accept-language=%s' % (get_publisher().get_site_language() or 'en') |
1055 | 1055 |
return HttpResponse(misc.urlopen(url).read(), content_type='application/json') |
1056 | 1056 |
wcs/api_utils.py | ||
---|---|---|
20 | 20 |
import datetime |
21 | 21 |
import random |
22 | 22 |
import os |
23 |
import urllib.parse |
|
23 | 24 |
import errno |
24 | 25 |
import calendar |
25 | 26 | |
26 |
from django.utils import six |
|
27 | 27 |
from django.utils.encoding import force_bytes, force_text |
28 |
from django.utils.six.moves.urllib import parse as urllib |
|
29 |
from django.utils.six.moves.urllib import parse as urlparse |
|
30 | 28 | |
31 | 29 |
from quixote import get_request, get_publisher |
32 | 30 |
from .api_access import ApiAccess |
... | ... | |
43 | 41 |
if not query_string: |
44 | 42 |
return False |
45 | 43 |
signature = get_request().form.get('signature') |
46 |
if not isinstance(signature, six.string_types):
|
|
44 |
if not isinstance(signature, str):
|
|
47 | 45 |
return False |
48 | 46 |
signature = force_bytes(signature) |
49 | 47 |
# verify signature |
50 | 48 |
orig = get_request().form.get('orig') |
51 |
if not isinstance(orig, six.string_types):
|
|
49 |
if not isinstance(orig, str):
|
|
52 | 50 |
raise AccessForbiddenError('missing/multiple orig field') |
53 | 51 |
key = ApiAccess.get_access_key(orig) or get_publisher().get_site_option(orig, 'api-secrets') |
54 | 52 |
if not key: |
55 | 53 |
raise AccessForbiddenError('invalid orig') |
56 | 54 |
algo = get_request().form.get('algo') |
57 |
if not isinstance(algo, six.string_types):
|
|
55 |
if not isinstance(algo, str):
|
|
58 | 56 |
raise AccessForbiddenError('missing/multiple algo field') |
59 | 57 |
if algo not in hashlib.algorithms_guaranteed: |
60 | 58 |
raise AccessForbiddenError('invalid algo') |
... | ... | |
69 | 67 |
): |
70 | 68 |
raise AccessForbiddenError('invalid signature') |
71 | 69 |
timestamp = get_request().form.get('timestamp') |
72 |
if not isinstance(timestamp, six.string_types):
|
|
70 |
if not isinstance(timestamp, str):
|
|
73 | 71 |
raise AccessForbiddenError('missing/multiple timestamp field') |
74 | 72 |
try: |
75 | 73 |
timestamp = datetime.datetime.strptime(timestamp, '%Y-%m-%dT%H:%M:%SZ') |
... | ... | |
133 | 131 |
user = None |
134 | 132 |
if get_request().form.get('email'): |
135 | 133 |
email = get_request().form.get('email') |
136 |
if not isinstance(email, six.string_types):
|
|
134 |
if not isinstance(email, str):
|
|
137 | 135 |
raise AccessForbiddenError('multiple email field') |
138 | 136 |
users = list(get_publisher().user_class.get_users_with_email(email)) |
139 | 137 |
if users: |
... | ... | |
142 | 140 |
raise AccessForbiddenError('unknown email') |
143 | 141 |
elif get_request().form.get('NameID'): |
144 | 142 |
ni = get_request().form.get('NameID') |
145 |
if not isinstance(ni, six.string_types):
|
|
143 |
if not isinstance(ni, str):
|
|
146 | 144 |
raise AccessForbiddenError('multiple NameID field') |
147 | 145 |
users = list(get_publisher().user_class.get_users_with_name_identifier(ni)) |
148 | 146 |
if users: |
... | ... | |
158 | 156 | |
159 | 157 | |
160 | 158 |
def sign_url(url, key, algo='sha256', timestamp=None, nonce=None): |
161 |
parsed = urlparse.urlparse(url) |
|
159 |
parsed = urllib.parse.urlparse(url)
|
|
162 | 160 |
new_query = sign_query(parsed.query, key, algo, timestamp, nonce) |
163 |
return urlparse.urlunparse(parsed[:4] + (new_query,) + parsed[5:]) |
|
161 |
return urllib.parse.urlunparse(parsed[:4] + (new_query,) + parsed[5:])
|
|
164 | 162 | |
165 | 163 | |
166 | 164 |
def sign_query(query, key, algo='sha256', timestamp=None, nonce=None): |
... | ... | |
173 | 171 |
new_query = query |
174 | 172 |
if new_query: |
175 | 173 |
new_query += '&' |
176 |
new_query += urllib.urlencode((('algo', algo), ('timestamp', timestamp), ('nonce', nonce))) |
|
174 |
new_query += urllib.parse.urlencode((('algo', algo), ('timestamp', timestamp), ('nonce', nonce)))
|
|
177 | 175 |
signature = base64.b64encode(sign_string(new_query, key, algo=algo)) |
178 |
new_query += '&signature=' + urllib.quote(signature) |
|
176 |
new_query += '&signature=' + urllib.parse.quote(signature)
|
|
179 | 177 |
return new_query |
180 | 178 | |
181 | 179 | |
... | ... | |
191 | 189 | |
192 | 190 |
def get_secret_and_orig(url): |
193 | 191 |
frontoffice_url = get_publisher().get_frontoffice_url() |
194 |
orig = urlparse.urlparse(frontoffice_url).netloc.rsplit('@', 1)[-1].rsplit(':', 1)[0] |
|
195 |
target_orig = urlparse.urlparse(url).netloc.rsplit('@', 1)[-1].rsplit(':', 1)[0] |
|
192 |
orig = urllib.parse.urlparse(frontoffice_url).netloc.rsplit('@', 1)[-1].rsplit(':', 1)[0]
|
|
193 |
target_orig = urllib.parse.urlparse(url).netloc.rsplit('@', 1)[-1].rsplit(':', 1)[0]
|
|
196 | 194 |
secret = get_publisher().get_site_option(target_orig, 'wscall-secrets') |
197 | 195 |
if not secret: |
198 | 196 |
raise MissingSecret() |
... | ... | |
204 | 202 |
signature_key, orig = get_secret_and_orig(url) |
205 | 203 |
except MissingSecret: |
206 | 204 |
return url |
207 |
parsed = urlparse.urlparse(url) |
|
208 |
querystring = urlparse.parse_qsl(parsed.query) |
|
205 |
parsed = urllib.parse.urlparse(url)
|
|
206 |
querystring = urllib.parse.parse_qsl(parsed.query)
|
|
209 | 207 |
querystring.append(('orig', orig)) |
210 |
querystring = urllib.urlencode(querystring) |
|
211 |
url = urlparse.urlunparse(parsed[:4] + (querystring,) + parsed[5:6]) |
|
208 |
querystring = urllib.parse.urlencode(querystring)
|
|
209 |
url = urllib.parse.urlunparse(parsed[:4] + (querystring,) + parsed[5:6])
|
|
212 | 210 |
return sign_url(url, signature_key) |
213 | 211 | |
214 | 212 |
wcs/backoffice/data_management.py | ||
---|---|---|
16 | 16 | |
17 | 17 |
import csv |
18 | 18 |
import datetime |
19 |
import io |
|
19 | 20 | |
20 | 21 |
from quixote import get_publisher, get_request, get_response, redirect |
21 | 22 |
from quixote.html import TemplateIO, htmltext, htmlescape |
22 | 23 | |
23 | 24 |
from django.utils.encoding import force_text |
24 |
from django.utils.six import StringIO |
|
25 | 25 | |
26 | 26 | |
27 | 27 |
from ..qommon import _, N_ |
... | ... | |
169 | 169 | |
170 | 170 |
def data_sample_csv(self): |
171 | 171 |
carddef_fields = self.get_import_csv_fields() |
172 |
output = StringIO() |
|
172 |
output = io.StringIO()
|
|
173 | 173 |
csv_output = csv.writer(output) |
174 | 174 |
csv_output.writerow([f.label for f in carddef_fields]) |
175 | 175 |
sample_line = [] |
wcs/backoffice/management.py | ||
---|---|---|
16 | 16 | |
17 | 17 |
import csv |
18 | 18 |
import datetime |
19 |
import io |
|
19 | 20 |
import json |
20 | 21 |
import re |
21 | 22 |
import time |
22 | 23 |
import types |
24 |
import urllib.parse |
|
23 | 25 |
import vobject |
24 | 26 |
import zipfile |
25 | 27 | |
26 | 28 |
from django.conf import settings |
27 |
from django.utils import six |
|
28 | 29 |
from django.utils.encoding import force_text |
29 |
from django.utils.six.moves.urllib import parse as urllib |
|
30 |
from django.utils.six import BytesIO, StringIO |
|
31 | 30 | |
32 | 31 |
from quixote import get_session, get_publisher, get_request, get_response, redirect |
33 | 32 |
from quixote.directory import Directory |
... | ... | |
904 | 903 |
total_count = sql.AnyFormData.count(criterias) |
905 | 904 |
if offset > total_count: |
906 | 905 |
get_request().form['offset'] = '0' |
907 |
return redirect('listing?' + urllib.urlencode(get_request().form)) |
|
906 |
return redirect('listing?' + urllib.parse.urlencode(get_request().form))
|
|
908 | 907 |
formdatas = sql.AnyFormData.select(criterias, order_by=order_by, limit=limit, offset=offset) |
909 | 908 |
include_submission_channel = bool(get_publisher().get_site_option('welco_url', 'variables')) |
910 | 909 | |
... | ... | |
2937 | 2936 | |
2938 | 2937 |
def download_as_zip(self): |
2939 | 2938 |
formdata = self.filled |
2940 |
zip_content = BytesIO() |
|
2939 |
zip_content = io.BytesIO()
|
|
2941 | 2940 |
zip_file = zipfile.ZipFile(zip_content, 'w') |
2942 | 2941 |
counter = {'value': 0} |
2943 | 2942 | |
... | ... | |
3281 | 3280 |
else: |
3282 | 3281 |
r += htmltext('<li><code title="%s">%s</code>') % (k, k) |
3283 | 3282 |
r += htmltext(' <div class="value"><span>%s</span>') % ellipsize(safe(v), 10000) |
3284 |
if not isinstance(v, six.string_types):
|
|
3283 |
if not isinstance(v, str):
|
|
3285 | 3284 |
r += htmltext(' <span class="type">(%r)</span>') % type(v) |
3286 | 3285 |
r += htmltext('</div></li>') |
3287 | 3286 |
r += htmltext('</div>') |
... | ... | |
3706 | 3705 |
return self.create_export(formdef, fields, items, total_count) |
3707 | 3706 | |
3708 | 3707 |
def create_export(self, formdef, fields, items, total_count): |
3709 |
output = StringIO() |
|
3708 |
output = io.StringIO()
|
|
3710 | 3709 |
csv_output = csv.writer(output) |
3711 | 3710 | |
3712 | 3711 |
csv_output.writerow(self.csv_tuple_heading(fields)) |
... | ... | |
3752 | 3751 |
native_value=item['native_value'], |
3753 | 3752 |
) |
3754 | 3753 | |
3755 |
output = BytesIO() |
|
3754 |
output = io.BytesIO()
|
|
3756 | 3755 |
workbook.save(output) |
3757 | 3756 | |
3758 | 3757 |
self.file_content = output.getvalue() |
wcs/carddata.py | ||
---|---|---|
14 | 14 |
# You should have received a copy of the GNU General Public License |
15 | 15 |
# along with this program; if not, see <http://www.gnu.org/licenses/>. |
16 | 16 | |
17 |
from django.utils import six |
|
18 | ||
19 | 17 |
from wcs.formdata import FormData |
20 | 18 | |
21 | 19 | |
... | ... | |
43 | 41 |
if not field.varname: |
44 | 42 |
continue |
45 | 43 |
value = self.data and self.data.get(field.id) |
46 |
if isinstance(value, six.string_types):
|
|
44 |
if isinstance(value, str):
|
|
47 | 45 |
item[field.varname] = value |
48 | 46 |
return item |
49 | 47 |
wcs/ctl/check_hobos.py | ||
---|---|---|
23 | 23 |
import tempfile |
24 | 24 |
import hashlib |
25 | 25 | |
26 |
from django.utils import six |
|
27 | 26 |
from django.utils.encoding import force_bytes, force_text |
28 |
from django.utils.six.moves import configparser as ConfigParser
|
|
29 |
from django.utils.six.moves.urllib import parse as urlparse
|
|
27 |
import configparser
|
|
28 |
import urllib.parse
|
|
30 | 29 | |
31 | 30 |
from quixote import cleanup |
32 | 31 |
from wcs.qommon import force_str |
... | ... | |
365 | 364 |
pub.write_cfg() |
366 | 365 | |
367 | 366 |
def get_instance_path(self, service): |
368 |
parsed_url = urlparse.urlsplit(service.get('base_url')) |
|
367 |
parsed_url = urllib.parse.urlsplit(service.get('base_url'))
|
|
369 | 368 |
instance_path = parsed_url.netloc |
370 | 369 |
if parsed_url.path: |
371 | 370 |
instance_path += '+%s' % parsed_url.path.replace('/', '+') |
... | ... | |
373 | 372 | |
374 | 373 |
def configure_site_options(self, current_service, pub, ignore_timestamp=False): |
375 | 374 |
# configure site-options.cfg |
376 |
config = ConfigParser.RawConfigParser()
|
|
375 |
config = configparser.RawConfigParser()
|
|
377 | 376 |
site_options_filepath = os.path.join(pub.app_dir, 'site-options.cfg') |
378 | 377 |
if os.path.exists(site_options_filepath): |
379 | 378 |
config.read(site_options_filepath) |
... | ... | |
382 | 381 |
try: |
383 | 382 |
if config.get('hobo', 'timestamp') == self.all_services.get('timestamp'): |
384 | 383 |
raise NoChange() |
385 |
except (ConfigParser.NoOptionError, ConfigParser.NoSectionError):
|
|
384 |
except (configparser.NoOptionError, configparser.NoSectionError):
|
|
386 | 385 |
pass |
387 | 386 | |
388 | 387 |
if not 'hobo' in config.sections(): |
... | ... | |
404 | 403 |
if not service.get('secret_key'): |
405 | 404 |
continue |
406 | 405 | |
407 |
domain = urlparse.urlparse(service_url).netloc.split(':')[0] |
|
406 |
domain = urllib.parse.urlparse(service_url).netloc.split(':')[0]
|
|
408 | 407 |
if service is current_service: |
409 | 408 |
if config.has_option('api-secrets', domain): |
410 | 409 |
api_secrets[domain] = config.get('api-secrets', domain) |
... | ... | |
455 | 454 |
if value is None: |
456 | 455 |
config.remove_option('variables', key) |
457 | 456 |
continue |
458 |
if not isinstance(value, six.string_types):
|
|
457 |
if not isinstance(value, str):
|
|
459 | 458 |
value = str(value) |
460 | 459 |
value = force_str(value) |
461 | 460 |
config.set('variables', key, value) |
... | ... | |
480 | 479 | |
481 | 480 |
try: |
482 | 481 |
portal_agent_url = config.get('variables', 'portal_agent_url') |
483 |
except ConfigParser.NoOptionError:
|
|
482 |
except configparser.NoOptionError:
|
|
484 | 483 |
pass |
485 | 484 |
else: |
486 | 485 |
if portal_agent_url.endswith('/'): |
... | ... | |
531 | 530 |
if not createdb_cfg: |
532 | 531 |
createdb_cfg = {} |
533 | 532 |
for k, v in pub.cfg['postgresql'].items(): |
534 |
if v and isinstance(v, six.string_types):
|
|
533 |
if v and isinstance(v, str):
|
|
535 | 534 |
createdb_cfg[k] = v |
536 | 535 | |
537 | 536 |
try: |
wcs/ctl/delete_tenant.py | ||
---|---|---|
23 | 23 |
from datetime import datetime |
24 | 24 |
from shutil import rmtree |
25 | 25 | |
26 |
from django.utils import six |
|
27 | ||
28 | 26 |
from ..qommon.ctl import Command, make_option |
29 | 27 | |
30 | 28 | |
... | ... | |
61 | 59 |
if pub.is_using_postgresql(): |
62 | 60 |
postgresql_cfg = {} |
63 | 61 |
for k, v in pub.cfg['postgresql'].items(): |
64 |
if v and isinstance(v, six.string_types):
|
|
62 |
if v and isinstance(v, str):
|
|
65 | 63 |
postgresql_cfg[k] = v |
66 | 64 | |
67 | 65 |
# if there's a createdb-connection-params, we can do a DROP DATABASE with |
wcs/ctl/management/commands/convert_to_sql.py | ||
---|---|---|
14 | 14 |
# You should have received a copy of the GNU General Public License |
15 | 15 |
# along with this program; if not, see <http://www.gnu.org/licenses/>. |
16 | 16 | |
17 |
import io |
|
17 | 18 |
import os |
18 | 19 |
import sys |
19 | 20 |
import psycopg2 |
... | ... | |
22 | 23 |
from django.utils.encoding import force_bytes |
23 | 24 |
from django.core.management.base import BaseCommand |
24 | 25 |
from django.core.management.base import CommandError |
25 |
from django.utils.six import StringIO |
|
26 | 26 | |
27 | 27 |
from wcs.qommon.publisher import get_publisher_class |
28 | 28 | |
... | ... | |
82 | 82 |
self.publisher.site_options.add_section('options') |
83 | 83 |
self.publisher.site_options.set('options', 'postgresql', 'true') |
84 | 84 |
options_file = os.path.join(self.publisher.app_dir, 'site-options.cfg') |
85 |
stringio = StringIO() |
|
85 |
stringio = io.StringIO()
|
|
86 | 86 |
self.publisher.site_options.write(stringio) |
87 | 87 |
atomic_write(options_file, force_bytes(stringio.getvalue())) |
88 | 88 |
wcs/custom_views.py | ||
---|---|---|
16 | 16 | |
17 | 17 |
import xml.etree.ElementTree as ET |
18 | 18 | |
19 |
from django.utils.six.moves.urllib import parse as urlparse
|
|
19 |
import urllib.parse
|
|
20 | 20 |
from django.utils.encoding import force_text |
21 | 21 |
from quixote import get_publisher |
22 | 22 | |
... | ... | |
73 | 73 |
return True |
74 | 74 | |
75 | 75 |
def set_from_qs(self, qs): |
76 |
parsed_qs = urlparse.parse_qsl(qs) |
|
76 |
parsed_qs = urllib.parse.parse_qsl(qs)
|
|
77 | 77 |
self.columns = { |
78 | 78 |
'list': [ |
79 | 79 |
{'id': key} for (key, value) in parsed_qs if value == 'on' and not key.startswith('filter-') |
wcs/data_sources.py | ||
---|---|---|
16 | 16 | |
17 | 17 |
import collections |
18 | 18 |
import hashlib |
19 |
import urllib.parse |
|
19 | 20 |
import xml.etree.ElementTree as ET |
20 | 21 | |
21 | 22 |
from django.template import TemplateSyntaxError, VariableDoesNotExist |
22 |
from django.utils import six |
|
23 | 23 |
from django.utils.encoding import force_text, force_bytes |
24 |
from django.utils.six.moves.urllib import parse as urllib |
|
25 |
from django.utils.six.moves.urllib import parse as urlparse |
|
26 | 24 | |
27 | 25 |
from quixote import get_publisher, get_request, get_session |
28 | 26 |
from quixote.html import TemplateIO |
... | ... | |
288 | 286 |
elif len(value[0]) == 1: |
289 | 287 |
return [{'id': x[0], 'text': x[0]} for x in value] |
290 | 288 |
return value |
291 |
elif isinstance(value[0], six.string_types):
|
|
289 |
elif isinstance(value[0], str):
|
|
292 | 290 |
return [{'id': x, 'text': x} for x in value] |
293 | 291 |
return value |
294 | 292 |
except: |
... | ... | |
619 | 617 |
url += '?' |
620 | 618 |
else: |
621 | 619 |
url += '&' |
622 |
url += param_name + '=' + urllib.quote(param_value) |
|
620 |
url += param_name + '=' + urllib.parse.quote(param_value)
|
|
623 | 621 | |
624 | 622 |
def find_item(items, name, value): |
625 | 623 |
for item in items: |
... | ... | |
769 | 767 | |
770 | 768 |
def chrono_url(publisher, url): |
771 | 769 |
chrono_url = publisher.get_site_option('chrono_url') |
772 |
return urlparse.urljoin(chrono_url, url) |
|
770 |
return urllib.parse.urljoin(chrono_url, url)
|
|
773 | 771 | |
774 | 772 | |
775 | 773 |
def collect_agenda_data(publisher): |
wcs/fields.py | ||
---|---|---|
31 | 31 |
from quixote import get_request, get_publisher |
32 | 32 |
from quixote.html import htmltag, htmltext, TemplateIO |
33 | 33 | |
34 |
from django.utils import six |
|
35 | 34 |
from django.utils.encoding import force_bytes, force_text, smart_text |
36 | 35 |
from django.utils.formats import date_format as django_date_format |
37 | 36 |
from django.utils.html import urlize |
... | ... | |
313 | 312 |
atname = 'item' |
314 | 313 |
for v in val: |
315 | 314 |
ET.SubElement(el, atname).text = force_text(v, charset, errors='replace') |
316 |
elif isinstance(val, six.string_types):
|
|
315 |
elif isinstance(val, str):
|
|
317 | 316 |
el.text = force_text(val, charset, errors='replace') |
318 | 317 |
else: |
319 | 318 |
el.text = str(val) |
... | ... | |
1039 | 1038 | |
1040 | 1039 |
def migrate(self): |
1041 | 1040 |
changed = super(StringField, self).migrate() |
1042 |
if isinstance(self.validation, six.string_types):
|
|
1041 |
if isinstance(self.validation, str):
|
|
1043 | 1042 |
self.validation = {'type': 'regex', 'value': self.validation} |
1044 | 1043 |
changed = True |
1045 | 1044 |
return changed |
... | ... | |
2498 | 2497 | |
2499 | 2498 |
def migrate(self): |
2500 | 2499 |
changed = super(PageField, self).migrate() |
2501 |
if isinstance(self.condition, six.string_types):
|
|
2500 |
if isinstance(self.condition, str):
|
|
2502 | 2501 |
if self.condition: |
2503 | 2502 |
self.condition = {'type': 'python', 'value': self.condition} |
2504 | 2503 |
else: |
... | ... | |
2506 | 2505 |
changed = True |
2507 | 2506 |
for post_condition in self.post_conditions or []: |
2508 | 2507 |
condition = post_condition.get('condition') |
2509 |
if isinstance(condition, six.string_types):
|
|
2508 |
if isinstance(condition, str):
|
|
2510 | 2509 |
if condition: |
2511 | 2510 |
post_condition['condition'] = {'type': 'python', 'value': condition} |
2512 | 2511 |
else: |
wcs/formdata.py | ||
---|---|---|
22 | 22 |
import sys |
23 | 23 |
import time |
24 | 24 | |
25 |
from django.utils import six |
|
26 | ||
27 | 25 |
from quixote import get_request, get_publisher, get_session |
28 | 26 |
from quixote.http_request import Upload |
29 | 27 | |
... | ... | |
438 | 436 |
if field.prefill and field.prefill.get('type') == 'user': |
439 | 437 |
form_user_data[field.prefill['value']] = self.data.get(field.id) |
440 | 438 |
user_label = ' '.join( |
441 |
[ |
|
442 |
form_user_data.get(x) |
|
443 |
for x in field_name_values |
|
444 |
if isinstance(form_user_data.get(x), six.string_types) |
|
445 |
] |
|
439 |
[form_user_data.get(x) for x in field_name_values if isinstance(form_user_data.get(x), str)] |
|
446 | 440 |
) |
447 | 441 |
if user_label != self.user_label: |
448 | 442 |
self.user_label = user_label |
wcs/formdef.py | ||
---|---|---|
26 | 26 |
import xml.etree.ElementTree as ET |
27 | 27 |
import datetime |
28 | 28 | |
29 |
from django.utils import six |
|
30 | 29 |
from django.utils.encoding import force_bytes, force_text |
31 | 30 | |
32 | 31 |
from quixote import get_request, get_publisher |
... | ... | |
839 | 838 |
return dict([(unicode2str(k), unicode2str(v)) for k, v in v.items()]) |
840 | 839 |
elif isinstance(v, list): |
841 | 840 |
return [unicode2str(x) for x in v] |
842 |
elif isinstance(v, six.string_types):
|
|
841 |
elif isinstance(v, str):
|
|
843 | 842 |
return force_str(v) |
844 | 843 |
else: |
845 | 844 |
return v |
... | ... | |
869 | 868 |
): |
870 | 869 |
formdef.workflow_id = value['workflow'].get('id') |
871 | 870 |
elif 'workflow' in value: |
872 |
if isinstance(value['workflow'], six.string_types):
|
|
871 |
if isinstance(value['workflow'], str):
|
|
873 | 872 |
workflow = value.get('workflow') |
874 | 873 |
else: |
875 | 874 |
workflow = value['workflow'].get('name') |
... | ... | |
1009 | 1008 |
element = ET.SubElement(options, 'option') |
1010 | 1009 |
element.attrib['varname'] = option |
1011 | 1010 |
option_value = self.workflow_options.get(option) |
1012 |
if isinstance(option_value, six.string_types):
|
|
1011 |
if isinstance(option_value, str):
|
|
1013 | 1012 |
element.text = force_text(self.workflow_options.get(option, ''), charset) |
1014 | 1013 |
elif hasattr(option_value, 'base_filename'): |
1015 | 1014 |
ET.SubElement(element, 'filename').text = option_value.base_filename |
wcs/forms/backoffice.py | ||
---|---|---|
14 | 14 |
# You should have received a copy of the GNU General Public License |
15 | 15 |
# along with this program; if not, see <http://www.gnu.org/licenses/>. |
16 | 16 | |
17 |
from django.utils.six.moves.urllib import parse as urllib |
|
17 |
import urllib.parse |
|
18 | 18 | |
19 | 19 |
from quixote import get_request, get_publisher, get_session, redirect |
20 | 20 |
from quixote.html import htmltext, TemplateIO |
... | ... | |
63 | 63 | |
64 | 64 |
if offset > total_count: |
65 | 65 |
get_request().form['offset'] = '0' |
66 |
return redirect('?' + urllib.urlencode(get_request().form)) |
|
66 |
return redirect('?' + urllib.parse.urlencode(get_request().form))
|
|
67 | 67 | |
68 | 68 |
r = TemplateIO(html=True) |
69 | 69 |
wcs/forms/common.py | ||
---|---|---|
16 | 16 | |
17 | 17 |
import time |
18 | 18 | |
19 |
from django.utils.six.moves.urllib import parse as urllib |
|
19 |
import urllib.parse |
|
20 | 20 | |
21 | 21 |
from quixote import get_publisher, get_request, get_response, get_session, redirect |
22 | 22 |
from quixote.directory import Directory |
... | ... | |
73 | 73 |
# no such file |
74 | 74 |
raise errors.TraversalError() |
75 | 75 | |
76 |
if component and component not in (file.base_filename, urllib.quote(file.base_filename)): |
|
76 |
if component and component not in (file.base_filename, urllib.parse.quote(file.base_filename)):
|
|
77 | 77 |
raise errors.TraversalError() |
78 | 78 | |
79 | 79 |
if file.has_redirect_url(): |
... | ... | |
688 | 688 |
raise errors.TraversalError() |
689 | 689 | |
690 | 690 |
if getattr(file, 'base_filename'): |
691 |
file_url += urllib.quote(file.base_filename) |
|
691 |
file_url += urllib.parse.quote(file.base_filename)
|
|
692 | 692 |
return redirect(file_url) |
693 | 693 | |
694 | 694 |
@classmethod |
wcs/forms/root.py | ||
---|---|---|
15 | 15 |
# along with this program; if not, see <http://www.gnu.org/licenses/>. |
16 | 16 | |
17 | 17 |
import copy |
18 |
import io |
|
18 | 19 |
import json |
19 | 20 |
import time |
20 | 21 | |
... | ... | |
23 | 24 |
except ImportError: |
24 | 25 |
qrcode = None |
25 | 26 | |
26 |
from django.utils import six |
|
27 | 27 |
from django.utils.http import quote |
28 |
from django.utils.six import BytesIO |
|
29 | 28 |
from django.utils.safestring import mark_safe |
30 | 29 | |
31 | 30 |
import ratelimit.utils |
... | ... | |
1164 | 1163 |
continue |
1165 | 1164 |
v, locked = field.get_prefill_value(user=prefill_user) |
1166 | 1165 |
if locked: |
1167 |
if not isinstance(v, six.string_types) and field.convert_value_to_str:
|
|
1166 |
if not isinstance(v, str) and field.convert_value_to_str:
|
|
1168 | 1167 |
# convert structured data to strings as if they were |
1169 | 1168 |
# submitted by the browser. |
1170 | 1169 |
v = field.convert_value_to_str(v) |
... | ... | |
1522 | 1521 | |
1523 | 1522 |
def qrcode(self): |
1524 | 1523 |
img = qrcode.make(self.formdef.get_url()) |
1525 |
s = BytesIO() |
|
1524 |
s = io.BytesIO()
|
|
1526 | 1525 |
img.save(s) |
1527 | 1526 |
if get_request().get_query() == 'download': |
1528 | 1527 |
get_response().set_header( |
wcs/middleware.py | ||
---|---|---|
20 | 20 | |
21 | 21 |
from django.http import HttpResponseBadRequest, HttpResponseRedirect |
22 | 22 |
from django.utils.deprecation import MiddlewareMixin |
23 |
from django.utils.six.moves.urllib import parse as urllib |
|
23 |
import urllib.parse |
|
24 | 24 | |
25 | 25 |
from quixote import get_publisher |
26 | 26 |
from quixote.errors import RequestError |
... | ... | |
80 | 80 |
pub.finish_successful_request() # commits session |
81 | 81 |
new_query_string = '' |
82 | 82 |
if compat_request.form: |
83 |
new_query_string = '?' + urllib.urlencode(compat_request.form) |
|
83 |
new_query_string = '?' + urllib.parse.urlencode(compat_request.form)
|
|
84 | 84 |
response = HttpResponseRedirect(compat_request.get_path() + new_query_string) |
85 | 85 |
for name, value in compat_request.response.generate_headers(): |
86 | 86 |
if name == 'Content-Length': |
wcs/monkeypatch.py | ||
---|---|---|
19 | 19 | |
20 | 20 |
import django.template.base |
21 | 21 |
import django.template.defaulttags |
22 |
from django.utils import six |
|
23 |
from django.utils.six.moves.urllib import parse as urlparse |
|
22 |
import urllib.parse |
|
24 | 23 | |
25 | 24 |
import quixote |
26 | 25 |
import quixote.publish |
... | ... | |
78 | 77 |
not honor the redirect). |
79 | 78 |
""" |
80 | 79 |
request = _thread_local.publisher.get_request() |
81 |
location = urlparse.urljoin(request.get_url(), str(location)) |
|
80 |
location = urllib.parse.urljoin(request.get_url(), str(location))
|
|
82 | 81 |
return request.response.redirect(location, permanent) |
83 | 82 | |
84 | 83 | |
... | ... | |
105 | 104 | |
106 | 105 | |
107 | 106 |
for key, value in list(locals().items()): |
108 |
if type(value) in (types.FunctionType,) + six.class_types:
|
|
107 |
if type(value) in (types.FunctionType, type):
|
|
109 | 108 |
setattr(quixote, key, value) |
110 | 109 |
setattr(quixote.publish, key, value) |
111 | 110 |
wcs/portfolio.py | ||
---|---|---|
17 | 17 |
import json |
18 | 18 |
import hashlib |
19 | 19 |
import base64 |
20 |
import urllib.parse |
|
20 | 21 | |
21 | 22 |
from django.utils.encoding import force_text |
22 |
from django.utils.six.moves.urllib import parse as urllib |
|
23 |
from django.utils.six.moves.urllib import parse as urlparse |
|
24 | 23 | |
25 | 24 |
from .qommon import N_, get_logger |
26 | 25 |
from .qommon.misc import http_get_page, json_loads, http_post_request, urlopen |
... | ... | |
37 | 36 | |
38 | 37 |
def fargo_url(url): |
39 | 38 |
fargo_url = get_publisher().get_site_option('fargo_url') |
40 |
url = urlparse.urljoin(fargo_url, url) |
|
39 |
url = urllib.parse.urljoin(fargo_url, url)
|
|
41 | 40 |
secret, orig = get_secret_and_orig(url) |
42 | 41 |
if '?' in url: |
43 | 42 |
url += '&orig=%s' % orig |
... | ... | |
70 | 69 |
payload['user_nameid'] = force_text(user.name_identifiers[0], 'ascii') |
71 | 70 |
elif user.email: |
72 | 71 |
payload['user_email'] = force_text(user.email, 'ascii') |
73 |
payload['origin'] = urlparse.urlparse(get_publisher().get_frontoffice_url()).netloc |
|
72 |
payload['origin'] = urllib.parse.urlparse(get_publisher().get_frontoffice_url()).netloc
|
|
74 | 73 |
payload['file_name'] = force_text(filename, charset) |
75 | 74 |
stream.seek(0) |
76 | 75 |
payload['file_b64_content'] = force_text(base64.b64encode(stream.read())) |
... | ... | |
108 | 107 |
# FIXME: handle error cases |
109 | 108 |
url = request.form['url'] |
110 | 109 |
document = urlopen(request.form['url']).read() |
111 |
scheme, netloc, path, qs, frag = urlparse.urlsplit(url) |
|
110 |
scheme, netloc, path, qs, frag = urllib.parse.urlsplit(url)
|
|
112 | 111 |
path = path.split('/') |
113 |
name = urllib.unquote(path[-1]) |
|
112 |
name = urllib.parse.unquote(path[-1])
|
|
114 | 113 |
from .qommon.form import PicklableUpload |
115 | 114 | |
116 | 115 |
download = PicklableUpload(name, content_type='application/pdf') |
... | ... | |
122 | 121 |
frontoffice_url = get_publisher().get_frontoffice_url() |
123 | 122 |
self_url = frontoffice_url |
124 | 123 |
self_url += '/fargo/pick' |
125 |
return redirect('%spick/?pick=%s' % (self.fargo_url, urllib.quote(self_url))) |
|
124 |
return redirect('%spick/?pick=%s' % (self.fargo_url, urllib.parse.quote(self_url)))
|
|
126 | 125 | |
127 | 126 |
def set_token(self, token, title): |
128 | 127 |
get_response().add_javascript(['jquery.js']) |
wcs/publisher.py | ||
---|---|---|
18 | 18 |
import decimal |
19 | 19 |
import json |
20 | 20 |
import os |
21 |
import pickle |
|
21 | 22 |
import random |
22 | 23 |
import re |
23 | 24 |
import socket |
... | ... | |
25 | 26 |
import traceback |
26 | 27 |
import zipfile |
27 | 28 | |
28 |
from django.utils import six |
|
29 | 29 |
from django.utils.encoding import force_text |
30 | 30 |
from django.utils.formats import localize |
31 |
from django.utils.six.moves import cPickle |
|
32 | 31 | |
33 | 32 |
from .Defaults import * |
34 | 33 | |
... | ... | |
194 | 193 |
def _decode_list(data): |
195 | 194 |
rv = [] |
196 | 195 |
for item in data: |
197 |
if isinstance(item, six.string_types):
|
|
196 |
if isinstance(item, str):
|
|
198 | 197 |
item = force_str(item) |
199 | 198 |
elif isinstance(item, list): |
200 | 199 |
item = _decode_list(item) |
... | ... | |
207 | 206 |
rv = {} |
208 | 207 |
for key, value in data.items(): |
209 | 208 |
key = force_str(key) |
210 |
if isinstance(value, six.string_types):
|
|
209 |
if isinstance(value, str):
|
|
211 | 210 |
value = force_str(value) |
212 | 211 |
elif isinstance(value, list): |
213 | 212 |
value = _decode_list(value) |
... | ... | |
231 | 230 |
if f in ('config.pck', 'config.json'): |
232 | 231 |
results['settings'] = 1 |
233 | 232 |
if f == 'config.pck': |
234 |
d = cPickle.loads(data)
|
|
233 |
d = pickle.loads(data)
|
|
235 | 234 |
else: |
236 | 235 |
d = json.loads(force_text(data), object_hook=_decode_dict) |
237 | 236 |
if 'sp' in self.cfg: |
wcs/qommon/__init__.py | ||
---|---|---|
19 | 19 | |
20 | 20 |
import django.apps |
21 | 21 |
from django.conf import settings |
22 |
from django.utils import six |
|
23 | 22 |
from django.utils.encoding import force_text |
24 |
from django.utils.six.moves import configparser as ConfigParser
|
|
23 |
import configparser
|
|
25 | 24 | |
26 | 25 |
from quixote import get_publisher |
27 | 26 | |
... | ... | |
153 | 152 |
name = 'wcs.qommon' |
154 | 153 | |
155 | 154 |
def ready(self): |
156 |
config = ConfigParser.ConfigParser()
|
|
155 |
config = configparser.ConfigParser()
|
|
157 | 156 |
if settings.WCS_LEGACY_CONFIG_FILE: |
158 | 157 |
config.read(settings.WCS_LEGACY_CONFIG_FILE) |
159 | 158 |
if hasattr(settings, 'WCS_EXTRA_MODULES') and settings.WCS_EXTRA_MODULES: |
wcs/qommon/backoffice/listing.py | ||
---|---|---|
14 | 14 |
# You should have received a copy of the GNU General Public License |
15 | 15 |
# along with this program; if not, see <http://www.gnu.org/licenses/>. |
16 | 16 | |
17 |
from django.utils.six.moves.urllib import parse as urllib |
|
17 |
import urllib.parse |
|
18 | 18 |
from quixote.html import htmltext, TemplateIO |
19 | 19 |
from quixote import get_request, get_response, get_publisher |
20 | 20 | |
... | ... | |
41 | 41 |
query['limit'] = limit |
42 | 42 |
r += htmltext( |
43 | 43 |
'<a class="previous-page" data-limit="%s" data-offset="%s" href="?%s"><!--%s--></a>' |
44 |
) % (limit, query['offset'], urllib.urlencode(query, doseq=1), _('Previous Page')) |
|
44 |
) % (limit, query['offset'], urllib.parse.urlencode(query, doseq=1), _('Previous Page'))
|
|
45 | 45 |
else: |
46 | 46 |
r += htmltext('<span class="previous-page"><!--%s--></span>') % _('Previous Page') |
47 | 47 | |
... | ... | |
77 | 77 |
klass, |
78 | 78 |
limit, |
79 | 79 |
query['offset'], |
80 |
urllib.urlencode(query, doseq=1), |
|
80 |
urllib.parse.urlencode(query, doseq=1),
|
|
81 | 81 |
page_number, |
82 | 82 |
) |
83 | 83 |
r += htmltext('</span>') # <!-- .pages --> |
... | ... | |
91 | 91 |
r += htmltext('<a class="next-page" data-limit="%s" data-offset="%s" href="?%s"><!--%s--></a>') % ( |
92 | 92 |
limit, |
93 | 93 |
query['offset'], |
94 |
urllib.urlencode(query, doseq=1), |
|
94 |
urllib.parse.urlencode(query, doseq=1),
|
|
95 | 95 |
_('Next Page'), |
96 | 96 |
) |
97 | 97 |
else: |
... | ... | |
112 | 112 |
else: |
113 | 113 |
r += htmltext('<a data-limit="%s" data-offset="0"" href="?%s">%s</a>') % ( |
114 | 114 |
page_size, |
115 |
urllib.urlencode(query, doseq=1), |
|
115 |
urllib.parse.urlencode(query, doseq=1),
|
|
116 | 116 |
page_size, |
117 | 117 |
) |
118 | 118 |
if page_size >= total_count: |
wcs/qommon/ctl.py | ||
---|---|---|
25 | 25 |
'Command', |
26 | 26 |
] |
27 | 27 | |
28 |
from django.utils.six.moves import configparser as ConfigParser
|
|
28 |
import configparser
|
|
29 | 29 | |
30 | 30 |
from wcs import qommon |
31 | 31 |
from . import _ |
... | ... | |
39 | 39 |
usage_args = '[ options ... ]' |
40 | 40 | |
41 | 41 |
def __init__(self, options=[]): |
42 |
self.config = ConfigParser.ConfigParser()
|
|
42 |
self.config = configparser.ConfigParser()
|
|
43 | 43 |
self.options = options + [ |
44 | 44 |
make_option('--extra', metavar='DIR', action='append', dest='extra', default=[]), |
45 | 45 |
make_option('--app-dir', metavar='DIR', action='store', dest='app_dir', default=None), |
... | ... | |
53 | 53 |
sys.exit(1) |
54 | 54 |
try: |
55 | 55 |
self.config.read(base_options.configfile) |
56 |
except ConfigParser.ParsingError as e:
|
|
56 |
except configparser.ParsingError as e:
|
|
57 | 57 |
print('Invalid configuration file %s' % base_options.configfile, file=sys.stderr) |
58 | 58 |
print(e, file=sys.stderr) |
59 | 59 |
sys.exit(1) |
wcs/qommon/errors.py | ||
---|---|---|
14 | 14 |
# You should have received a copy of the GNU General Public License |
15 | 15 |
# along with this program; if not, see <http://www.gnu.org/licenses/>. |
16 | 16 | |
17 |
from django.utils.six.moves.urllib import parse as urllib |
|
17 |
import urllib.parse |
|
18 | 18 | |
19 | 19 |
from quixote import get_publisher |
20 | 20 |
import quixote |
... | ... | |
61 | 61 |
if self.public_msg: |
62 | 62 |
session.message = ('error', self.public_msg) |
63 | 63 |
login_url = get_publisher().get_root_url() + 'login/' |
64 |
login_url += '?' + urllib.urlencode({'next': request.get_frontoffice_url()}) |
|
64 |
login_url += '?' + urllib.parse.urlencode({'next': request.get_frontoffice_url()})
|
|
65 | 65 |
return quixote.redirect(login_url) |
66 | 66 | |
67 | 67 |
wcs/qommon/evalutils.py | ||
---|---|---|
21 | 21 |
""" |
22 | 22 |
import base64 |
23 | 23 |
import datetime |
24 |
import io |
|
24 | 25 |
import time |
25 | 26 | |
26 | 27 |
from django.utils.encoding import force_bytes |
27 |
from django.utils.six import BytesIO |
|
28 | 28 | |
29 | 29 |
from wcs.qommon import force_str |
30 | 30 |
from wcs.qommon.upload_storage import UploadStorage |
... | ... | |
155 | 155 | |
156 | 156 |
if strip_metadata and Image: |
157 | 157 |
try: |
158 |
image = Image.open(BytesIO(upload.get_content())) |
|
158 |
image = Image.open(io.BytesIO(upload.get_content()))
|
|
159 | 159 |
except OSError: |
160 | 160 |
pass |
161 | 161 |
else: |
162 | 162 |
image_without_exif = Image.new(image.mode, image.size) |
163 | 163 |
image_without_exif.putdata(image.getdata()) |
164 |
content = BytesIO() |
|
164 |
content = io.BytesIO()
|
|
165 | 165 |
image_without_exif.save(content, image.format) |
166 | 166 |
upload = FileField.convert_value_from_anything( |
167 | 167 |
{ |
wcs/qommon/ezt.py | ||
---|---|---|
225 | 225 | |
226 | 226 |
import datetime |
227 | 227 |
import html |
228 |
import io |
|
228 | 229 |
import re |
229 | 230 |
import os |
230 |
from django.utils import six |
|
231 |
from django.utils.six import StringIO as cStringIO |
|
232 | 231 | |
233 | 232 |
# |
234 | 233 |
# Formatting types |
... | ... | |
472 | 471 |
to the file object 'fp' and functions are called. |
473 | 472 |
""" |
474 | 473 |
for step in program: |
475 |
if isinstance(step, six.string_types):
|
|
474 |
if isinstance(step, str):
|
|
476 | 475 |
fp.write(step) |
477 | 476 |
else: |
478 | 477 |
step[0](step[1], fp, ctx) |
... | ... | |
562 | 561 |
list = _get_value(valref, ctx) |
563 | 562 |
except UnknownReference: |
564 | 563 |
return |
565 |
if isinstance(list, six.string_types):
|
|
564 |
if isinstance(list, str):
|
|
566 | 565 |
raise NeedSequenceError() |
567 | 566 |
refname = valref[0] |
568 | 567 |
ctx.for_index[refname] = idx = [list, 0] |
... | ... | |
573 | 572 | |
574 | 573 |
def _cmd_define(self, args, fp, ctx): |
575 | 574 |
((name,), unused, section) = args |
576 |
valfp = cStringIO.StringIO()
|
|
575 |
valfp = io.StringIO()
|
|
577 | 576 |
if section is not None: |
578 | 577 |
self._execute(section, valfp, ctx) |
579 | 578 |
ctx.defines[name] = valfp.getvalue() |
... | ... | |
673 | 672 |
raise UnknownReference(refname) |
674 | 673 | |
675 | 674 |
# make sure we return a string instead of some various Python types |
676 |
if isinstance(ob, six.integer_types + (float,)):
|
|
675 |
if isinstance(ob, (int, float)):
|
|
677 | 676 |
return str(ob) |
678 | 677 |
if ob is None: |
679 | 678 |
return '' |
wcs/qommon/form.py | ||
---|---|---|
64 | 64 |
from quixote.util import randbytes |
65 | 65 | |
66 | 66 |
from django.utils.encoding import force_bytes, force_text |
67 |
from django.utils import six |
|
68 | 67 | |
69 | 68 |
from django.conf import settings |
70 | 69 |
from django.utils.safestring import mark_safe |
wcs/qommon/http_request.py | ||
---|---|---|
19 | 19 |
import re |
20 | 20 |
import time |
21 | 21 | |
22 |
from django.utils import six |
|
23 | 22 |
from django.utils.encoding import force_bytes, force_text |
24 | 23 |
from quixote import get_session, get_publisher |
25 | 24 |
import quixote.http_request |
wcs/qommon/ident/franceconnect.py | ||
---|---|---|
20 | 20 |
import uuid |
21 | 21 | |
22 | 22 |
from django.utils.encoding import force_bytes |
23 |
from django.utils.six.moves.urllib import parse as urllib |
|
23 |
import urllib.parse |
|
24 | 24 | |
25 | 25 |
from quixote import redirect, get_session, get_publisher, get_request, get_session_manager |
26 | 26 |
from quixote.directory import Directory |
... | ... | |
313 | 313 | |
314 | 314 |
nonce = hashlib.sha256(force_bytes(session.id)).hexdigest() |
315 | 315 |
fc_callback = pub.get_frontoffice_url() + '/ident/fc/callback' |
316 |
qs = urllib.urlencode( |
|
316 |
qs = urllib.parse.urlencode(
|
|
317 | 317 |
{ |
318 | 318 |
'response_type': 'code', |
319 | 319 |
'client_id': client_id, |
... | ... | |
345 | 345 |
} |
346 | 346 |
response, status, data, auth_header = http_post_request( |
347 | 347 |
self.get_token_url(), |
348 |
urllib.urlencode(body), |
|
348 |
urllib.parse.urlencode(body),
|
|
349 | 349 |
headers={ |
350 | 350 |
'Content-Type': 'application/x-www-form-urlencoded', |
351 | 351 |
}, |
... | ... | |
519 | 519 |
get_session_manager().expire_session() |
520 | 520 |
logout_url = self.get_logout_url() |
521 | 521 |
post_logout_redirect_uri = get_publisher().get_frontoffice_url() |
522 |
logout_url += '?' + urllib.urlencode( |
|
522 |
logout_url += '?' + urllib.parse.urlencode(
|
|
523 | 523 |
{ |
524 | 524 |
'id_token_hint': id_token, |
525 | 525 |
'post_logout_redirect_uri': post_logout_redirect_uri, |
wcs/qommon/ident/idp.py | ||
---|---|---|
15 | 15 |
# along with this program; if not, see <http://www.gnu.org/licenses/>. |
16 | 16 | |
17 | 17 |
import re |
18 |
import urllib.parse |
|
18 | 19 | |
19 | 20 |
try: |
20 | 21 |
import lasso |
... | ... | |
22 | 23 |
lasso = None |
23 | 24 | |
24 | 25 |
from django.utils.encoding import force_bytes, force_text |
25 |
from django.utils.six.moves.urllib import parse as urllib |
|
26 |
from django.utils.six.moves.urllib import parse as urlparse |
|
27 | 26 | |
28 | 27 |
from quixote.directory import Directory |
29 | 28 |
from quixote import redirect, get_session, get_response, get_publisher |
... | ... | |
159 | 158 |
login_url = get_publisher().get_root_url() + 'login/idp/' |
160 | 159 |
else: |
161 | 160 |
login_url = get_publisher().get_root_url() + 'login/' |
162 |
login_url += '?' + urllib.urlencode({'next': get_request().get_frontoffice_url()}) |
|
161 |
login_url += '?' + urllib.parse.urlencode({'next': get_request().get_frontoffice_url()})
|
|
163 | 162 |
return redirect(login_url) |
164 | 163 | |
165 | 164 |
if get_request().user: |
... | ... | |
1066 | 1065 |
new_domain = None |
1067 | 1066 | |
1068 | 1067 |
if old_common_domain_getter_url: |
1069 |
old_domain = urlparse.urlparse(old_common_domain_getter_url)[1] |
|
1068 |
old_domain = urllib.parse.urlparse(old_common_domain_getter_url)[1]
|
|
1070 | 1069 |
if ':' in old_domain: |
1071 | 1070 |
old_domain = old_domain.split(':')[0] |
1072 | 1071 |
old_domain_dir = os.path.normpath(os.path.join(dir, '..', old_domain)) |
... | ... | |
1077 | 1076 |
# bad luck, but ignore this |
1078 | 1077 |
pass |
1079 | 1078 |
if new_common_domain_getter_url: |
1080 |
new_domain = urlparse.urlparse(new_common_domain_getter_url)[1] |
|
1079 |
new_domain = urllib.parse.urlparse(new_common_domain_getter_url)[1]
|
|
1081 | 1080 |
if ':' in new_domain: |
1082 | 1081 |
new_domain = new_domain.split(':')[0] |
1083 | 1082 |
new_domain_dir = os.path.normpath(os.path.join(dir, '..', new_domain)) |
wcs/qommon/logger.py | ||
---|---|---|
17 | 17 |
import logging |
18 | 18 |
import os |
19 | 19 | |
20 |
from django.utils import six |
|
21 | ||
22 | 20 |
from quixote import get_publisher, get_session, get_request |
23 | 21 |
from quixote.logger import DefaultLogger |
24 | 22 | |
... | ... | |
78 | 76 | |
79 | 77 |
user = request.user |
80 | 78 |
if user: |
81 |
if isinstance(user, six.string_types + six.integer_types):
|
|
79 |
if isinstance(user, (str, int)):
|
|
82 | 80 |
user_id = user |
83 | 81 |
else: |
84 | 82 |
user_id = user.id |
wcs/qommon/misc.py | ||
---|---|---|
18 | 18 |
import decimal |
19 | 19 |
import calendar |
20 | 20 |
import html |
21 |
import io |
|
21 | 22 |
import re |
22 | 23 |
import os |
23 | 24 |
import time |
25 |
import urllib.parse |
|
24 | 26 |
import base64 |
25 | 27 |
import json |
26 | 28 |
import subprocess |
... | ... | |
37 | 39 | |
38 | 40 |
from django.conf import settings |
39 | 41 |
from django.utils import datetime_safe |
40 |
from django.utils import six |
|
41 | 42 |
from django.utils.encoding import force_text |
42 | 43 |
from django.utils.formats import localize |
43 | 44 |
from django.utils.html import strip_tags |
44 | 45 |
from django.template import TemplateSyntaxError, VariableDoesNotExist |
45 |
from django.utils.six.moves.urllib.parse import quote, urlencode |
|
46 |
from django.utils.six.moves.urllib import parse as urlparse |
|
47 | 46 |
from django.utils.text import Truncator |
48 | 47 | |
49 | 48 |
from quixote import get_publisher, get_response, get_request, redirect |
... | ... | |
54 | 53 |
from .errors import ConnectionError, RequestError |
55 | 54 |
from .template import Template |
56 | 55 | |
57 |
from django.utils.six import BytesIO, StringIO |
|
58 | ||
59 | 56 |
try: |
60 | 57 |
subprocess.check_call(['which', 'pdftoppm'], stdout=subprocess.DEVNULL) |
61 | 58 |
HAS_PDFTOPPM = True |
... | ... | |
167 | 164 |
def simplify(s, space='-'): |
168 | 165 |
if s is None: |
169 | 166 |
return '' |
170 |
if not isinstance(s, six.text_type):
|
|
167 |
if not isinstance(s, str):
|
|
171 | 168 |
if get_publisher() and get_publisher().site_charset: |
172 | 169 |
s = force_text('%s' % s, get_publisher().site_charset, errors='ignore') |
173 | 170 |
else: |
... | ... | |
269 | 266 |
return None |
270 | 267 |
if isinstance(s, str): |
271 | 268 |
return s |
272 |
if not isinstance(s, six.string_types):
|
|
269 |
if not isinstance(s, str):
|
|
273 | 270 |
s = force_text(s) |
274 | 271 |
return s.encode(get_publisher().site_charset) |
275 | 272 | |
... | ... | |
346 | 343 |
): |
347 | 344 |
get_publisher().reload_cfg() |
348 | 345 | |
349 |
splitted_url = urlparse.urlsplit(url) |
|
346 |
splitted_url = urllib.parse.urlsplit(url)
|
|
350 | 347 |
if splitted_url.scheme not in ('http', 'https'): |
351 | 348 |
raise ConnectionError('invalid scheme in URL %s' % url) |
352 | 349 | |
... | ... | |
391 | 388 |
response, status, data, auth_header = _http_request( |
392 | 389 |
url, 'GET' if data is None else 'POST', body=data, raise_on_http_errors=True |
393 | 390 |
) |
394 |
return BytesIO(data) |
|
391 |
return io.BytesIO(data)
|
|
395 | 392 | |
396 | 393 | |
397 | 394 |
def http_get_page(url, **kwargs): |
... | ... | |
418 | 415 |
if '{{' in url or '{%' in url: |
419 | 416 |
try: |
420 | 417 |
url = Template(url).render(variables) |
421 |
p = urlparse.urlsplit(url) |
|
418 |
p = urllib.parse.urlsplit(url)
|
|
422 | 419 |
scheme, netloc, path, query, fragment = (p.scheme, p.netloc, p.path, p.query, p.fragment) |
423 | 420 |
if path.startswith('//'): |
424 | 421 |
# don't let double slash happen at the root of the URL, this |
425 | 422 |
# happens when a template such as {{url}}/path is used (with |
426 | 423 |
# {{url}} already ending with a slash). |
427 | 424 |
path = path[1:] |
428 |
return urlparse.urlunsplit((scheme, netloc, path, query, fragment)) |
|
425 |
return urllib.parse.urlunsplit((scheme, netloc, path, query, fragment))
|
|
429 | 426 |
except (TemplateSyntaxError, VariableDoesNotExist): |
430 | 427 |
return url |
431 | 428 | |
... | ... | |
433 | 430 |
def ezt_substitute(template, variables): |
434 | 431 |
tmpl = ezt.Template() |
435 | 432 |
tmpl.parse(template) |
436 |
fd = StringIO() |
|
433 |
fd = io.StringIO()
|
|
437 | 434 |
tmpl.generate(fd, variables) |
438 | 435 |
return fd.getvalue() |
439 | 436 | |
440 | 437 |
def partial_quote(string): |
441 | 438 |
# unquote brackets, as there may be further processing that needs them |
442 | 439 |
# intact. |
443 |
return quote(string).replace('%5B', '[').replace('%5D', ']') |
|
440 |
return urllib.parse.quote(string).replace('%5B', '[').replace('%5D', ']')
|
|
444 | 441 | |
445 |
p = urlparse.urlsplit(url) |
|
442 |
p = urllib.parse.urlsplit(url)
|
|
446 | 443 |
scheme, netloc, path, query, fragment = p.scheme, p.netloc, p.path, p.query, p.fragment |
447 | 444 |
if netloc and '[' in netloc: |
448 | 445 |
netloc = ezt_substitute(netloc, variables) |
... | ... | |
456 | 453 |
# there were no / in the original path (the two / comes from |
457 | 454 |
# the scheme/netloc separation, this means there is no path) |
458 | 455 |
before_path = ezt_substitute(path, variables) |
459 |
p2 = urlparse.urlsplit(before_path) |
|
456 |
p2 = urllib.parse.urlsplit(before_path)
|
|
460 | 457 |
scheme, netloc, path = p2.scheme, p2.netloc, p2.path |
461 | 458 |
else: |
462 | 459 |
# there is a path, we need to get back to the original URL and |
... | ... | |
467 | 464 |
else: |
468 | 465 |
before_path, path = path, '' |
469 | 466 |
before_path = ezt_substitute(before_path, variables) |
470 |
p2 = urlparse.urlsplit(before_path) |
|
467 |
p2 = urllib.parse.urlsplit(before_path)
|
|
471 | 468 |
scheme, netloc = p2.scheme, p2.netloc |
472 | 469 |
if p2.path: |
473 | 470 |
if not path: |
... | ... | |
487 | 484 |
if fragment and '[' in fragment: |
488 | 485 |
fragment = partial_quote(ezt_substitute(fragment, variables)) |
489 | 486 |
if query and '[' in query: |
490 |
p_qs = urlparse.parse_qsl(query) |
|
487 |
p_qs = urllib.parse.parse_qsl(query)
|
|
491 | 488 |
if len(p_qs) == 0: |
492 | 489 |
# this happened because the query string has no key/values, |
493 | 490 |
# probably because it's a single substitution variable (ex: |
... | ... | |
502 | 499 |
v = ezt_substitute(v, variables) |
503 | 500 |
query.append((k, v)) |
504 | 501 |
if encode_query: |
505 |
query = urlencode(query) |
|
502 |
query = urllib.parse.urlencode(query)
|
|
506 | 503 |
else: |
507 | 504 |
query = '&'.join('%s=%s' % (k, v) for (k, v) in query) |
508 |
return urlparse.urlunsplit((scheme, netloc, path, query, fragment)) |
|
505 |
return urllib.parse.urlunsplit((scheme, netloc, path, query, fragment))
|
|
509 | 506 | |
510 | 507 | |
511 | 508 |
def get_foreground_colour(background_colour): |
... | ... | |
662 | 659 | |
663 | 660 |
if content_type == 'application/pdf': |
664 | 661 |
try: |
665 |
fp = BytesIO( |
|
662 |
fp = io.BytesIO(
|
|
666 | 663 |
subprocess.check_output( |
667 | 664 |
['pdftoppm', '-png', '-scale-to-x', '500', '-scale-to-y', '-1', filepath] |
668 | 665 |
) |
... | ... | |
705 | 702 |
# * File "PIL/PngImagePlugin.py", line 119, in read |
706 | 703 |
# * raise SyntaxError("broken PNG file (chunk %s)" % repr(cid)) |
707 | 704 |
raise IOError |
708 |
image_thumb_fp = BytesIO() |
|
705 |
image_thumb_fp = io.BytesIO()
|
|
709 | 706 |
image.save(image_thumb_fp, "PNG") |
710 | 707 |
except IOError: |
711 | 708 |
# failed to create thumbnail. |
wcs/qommon/publisher.py | ||
---|---|---|
18 | 18 | |
19 | 19 |
import codecs |
20 | 20 |
import collections |
21 |
from django.utils.six.moves import builtins
|
|
22 |
from django.utils.six.moves import configparser as ConfigParser
|
|
23 |
from django.utils.six.moves import cPickle
|
|
24 |
from django.utils.six.moves.urllib import parse as urllib |
|
21 |
import builtins |
|
22 |
import configparser
|
|
23 |
import pickle
|
|
24 |
import urllib.parse |
|
25 | 25 |
import datetime |
26 | 26 |
from decimal import Decimal |
27 | 27 |
import importlib |
28 | 28 |
import inspect |
29 |
import io |
|
29 | 30 |
import os |
30 | 31 |
import fcntl |
31 | 32 |
import hashlib |
... | ... | |
43 | 44 | |
44 | 45 |
from django.conf import settings |
45 | 46 |
from django.http import Http404 |
46 |
from django.utils import six |
|
47 | 47 |
from django.utils import translation |
48 | 48 |
from django.utils.encoding import force_text, force_bytes |
49 |
from django.utils.six import StringIO |
|
50 | 49 | |
51 | 50 |
from quixote.publish import Publisher, get_request, get_response, get_publisher, redirect |
52 | 51 |
from .http_request import HTTPRequest |
... | ... | |
128 | 127 |
return '%s://%s%s' % ( |
129 | 128 |
req.get_scheme(), |
130 | 129 |
req.get_server(), |
131 |
urllib.quote(req.environ.get('SCRIPT_NAME')), |
|
130 |
urllib.parse.quote(req.environ.get('SCRIPT_NAME')),
|
|
132 | 131 |
) |
133 | 132 |
return 'https://%s' % os.path.basename(get_publisher().app_dir) |
134 | 133 | |
... | ... | |
141 | 140 |
return '%s://%s%s/backoffice' % ( |
142 | 141 |
req.get_scheme(), |
143 | 142 |
req.get_server(), |
144 |
urllib.quote(req.environ.get('SCRIPT_NAME')), |
|
143 |
urllib.parse.quote(req.environ.get('SCRIPT_NAME')),
|
|
145 | 144 |
) |
146 | 145 |
return 'https://%s/backoffice' % os.path.basename(self.app_dir) |
147 | 146 | |
... | ... | |
206 | 205 |
self, request, original_response, exc_type, exc_value, tb |
207 | 206 |
) |
208 | 207 | |
209 |
error_file = StringIO() |
|
208 |
error_file = io.StringIO()
|
|
210 | 209 | |
211 | 210 |
if limit is None: |
212 | 211 |
if hasattr(sys, 'tracebacklimit'): |
... | ... | |
356 | 355 |
self.ngettext = translation.ngettext |
357 | 356 | |
358 | 357 |
def load_site_options(self): |
359 |
self.site_options = ConfigParser.ConfigParser()
|
|
358 |
self.site_options = configparser.ConfigParser()
|
|
360 | 359 |
site_options_filename = os.path.join(self.app_dir, 'site-options.cfg') |
361 | 360 |
if not os.path.exists(site_options_filename): |
362 | 361 |
return |
... | ... | |
378 | 377 |
self.load_site_options() |
379 | 378 |
try: |
380 | 379 |
return self.site_options.get('options', option) == 'true' |
381 |
except ConfigParser.NoSectionError:
|
|
380 |
except configparser.NoSectionError:
|
|
382 | 381 |
return defaults.get(option, False) |
383 |
except ConfigParser.NoOptionError:
|
|
382 |
except configparser.NoOptionError:
|
|
384 | 383 |
return defaults.get(option, False) |
385 | 384 | |
386 | 385 |
def get_site_option(self, option, section='options'): |
... | ... | |
394 | 393 |
self.load_site_options() |
395 | 394 |
try: |
396 | 395 |
return self.site_options.get(section, option) |
397 |
except ConfigParser.NoSectionError:
|
|
396 |
except configparser.NoSectionError:
|
|
398 | 397 |
return defaults.get(section, {}).get(option) |
399 |
except ConfigParser.NoOptionError:
|
|
398 |
except configparser.NoOptionError:
|
|
400 | 399 |
return defaults.get(section, {}).get(option) |
401 | 400 | |
402 | 401 |
def get_site_storages(self): |
... | ... | |
712 | 711 |
cfg = None |
713 | 712 | |
714 | 713 |
def write_cfg(self): |
715 |
s = cPickle.dumps(self.cfg, protocol=2)
|
|
714 |
s = pickle.dumps(self.cfg, protocol=2)
|
|
716 | 715 |
filename = os.path.join(self.app_dir, 'config.pck') |
717 | 716 |
storage.atomic_write(filename, s) |
718 | 717 | |
... | ... | |
720 | 719 |
filename = os.path.join(self.app_dir, 'config.pck') |
721 | 720 |
try: |
722 | 721 |
with open(filename, 'rb') as fd: |
723 |
self.cfg = cPickle.load(fd, encoding='utf-8')
|
|
722 |
self.cfg = pickle.load(fd, encoding='utf-8')
|
|
724 | 723 |
except: |
725 | 724 |
self.cfg = {} |
726 | 725 | |
... | ... | |
908 | 907 |
self.load_site_options() |
909 | 908 |
try: |
910 | 909 |
d.update(dict(self.site_options.items('variables', raw=True))) |
911 |
except ConfigParser.NoSectionError:
|
|
910 |
except configparser.NoSectionError:
|
|
912 | 911 |
pass |
913 | 912 |
d['manager_homepage_url'] = d.get('portal_agent_url') |
914 | 913 |
d['manager_homepage_title'] = d.get('portal_agent_title') |
915 | 914 |
return d |
916 | 915 | |
917 | 916 |
def is_relatable_url(self, url): |
918 |
parsed_url = urllib.urlparse(url) |
|
917 |
parsed_url = urllib.parse.urlparse(url)
|
|
919 | 918 |
if not parsed_url.netloc: |
920 | 919 |
return True |
921 |
if parsed_url.netloc == urllib.urlparse(self.get_frontoffice_url()).netloc: |
|
920 |
if parsed_url.netloc == urllib.parse.urlparse(self.get_frontoffice_url()).netloc:
|
|
922 | 921 |
return True |
923 |
if parsed_url.netloc == urllib.urlparse(self.get_backoffice_url()).netloc: |
|
922 |
if parsed_url.netloc == urllib.parse.urlparse(self.get_backoffice_url()).netloc:
|
|
924 | 923 |
return True |
925 | 924 |
if parsed_url.netloc in [x.strip() for x in self.get_site_option('relatable-hosts').split(',')]: |
926 | 925 |
return True |
927 | 926 |
try: |
928 | 927 |
if parsed_url.netloc in self.site_options.options('api-secrets'): |
929 | 928 |
return True |
930 |
except ConfigParser.NoSectionError:
|
|
929 |
except configparser.NoSectionError:
|
|
931 | 930 |
pass |
932 | 931 |
return False |
933 | 932 |
wcs/qommon/saml2.py | ||
---|---|---|
25 | 25 |
lasso = None |
26 | 26 | |
27 | 27 |
from django.utils.encoding import force_text |
28 |
from django.utils.six.moves.urllib import parse as urlparse
|
|
28 |
import urllib.parse
|
|
29 | 29 | |
30 | 30 |
from quixote import get_request, get_response, redirect, get_field, get_publisher |
31 | 31 |
from quixote.http_request import parse_header |
... | ... | |
186 | 186 |
login.msgRelayState = get_request().form.get('next') |
187 | 187 | |
188 | 188 |
next_url = login.msgRelayState or get_publisher().get_frontoffice_url() |
189 |
parsed_url = urlparse.urlparse(next_url) |
|
189 |
parsed_url = urllib.parse.urlparse(next_url)
|
|
190 | 190 |
request = get_request() |
191 | 191 |
scheme = parsed_url.scheme or request.get_scheme() |
192 | 192 |
netloc = parsed_url.netloc or request.get_server() |
193 |
next_url = urlparse.urlunsplit( |
|
193 |
next_url = urllib.parse.urlunsplit(
|
|
194 | 194 |
(scheme, netloc, parsed_url.path, parsed_url.query, parsed_url.fragment) |
195 | 195 |
) |
196 | 196 |
samlp_extensions = '''<samlp:Extensions |
... | ... | |
372 | 372 |
if relay_state == 'backoffice': |
373 | 373 |
after_url = get_publisher().get_backoffice_url() |
374 | 374 |
elif relay_state: |
375 |
parsed_url = urlparse.urlparse(relay_state) |
|
375 |
parsed_url = urllib.parse.urlparse(relay_state)
|
|
376 | 376 |
scheme = parsed_url.scheme or request.get_scheme() |
377 | 377 |
netloc = parsed_url.netloc or request.get_server() |
378 |
after_url = urlparse.urlunsplit( |
|
378 |
after_url = urllib.parse.urlunsplit(
|
|
379 | 379 |
(scheme, netloc, parsed_url.path, parsed_url.query, parsed_url.fragment) |
380 | 380 |
) |
381 | 381 |
if not ( |
wcs/qommon/sms.py | ||
---|---|---|
16 | 16 | |
17 | 17 |
import re |
18 | 18 | |
19 |
from django.utils.six.moves.urllib import parse as urllib |
|
19 |
import urllib.parse |
|
20 | 20 | |
21 | 21 |
from . import N_, errors, misc |
22 | 22 |
from . import get_cfg, get_logger |
wcs/qommon/storage.py | ||
---|---|---|
25 | 25 |
import sys |
26 | 26 |
import tempfile |
27 | 27 | |
28 |
from django.utils import six |
|
29 | 28 |
from django.utils.encoding import force_bytes |
30 |
from django.utils.six.moves import builtins
|
|
31 |
from django.utils.six.moves import _thread
|
|
29 |
import builtins |
|
30 |
import _thread |
|
32 | 31 | |
33 | 32 |
from .vendor import locket |
34 | 33 | |
... | ... | |
150 | 149 |
self.typed_none = '' |
151 | 150 |
if isinstance(self.value, bool): |
152 | 151 |
self.typed_none = False |
153 |
elif isinstance(self.value, six.integer_types + (float,)):
|
|
152 |
elif isinstance(self.value, (int, float)):
|
|
154 | 153 |
self.typed_none = -sys.maxsize |
155 | 154 |
elif isinstance(self.value, time.struct_time): |
156 | 155 |
self.typed_none = time.gmtime(-(10 ** 10)) # 1653 |
wcs/qommon/template.py | ||
---|---|---|
14 | 14 |
# You should have received a copy of the GNU General Public License |
15 | 15 |
# along with this program; if not, see <http://www.gnu.org/licenses/>. |
16 | 16 | |
17 |
import io |
|
17 | 18 |
import os |
18 | 19 |
import glob |
19 | 20 |
import re |
... | ... | |
28 | 29 |
from django.template.loader import render_to_string |
29 | 30 |
from django.utils.encoding import force_text, smart_text |
30 | 31 |
from django.utils.safestring import SafeString, SafeText |
31 |
from django.utils import six |
|
32 |
from django.utils.six import StringIO |
|
33 | 32 | |
34 | 33 |
from quixote import get_session, get_request, get_response, get_publisher |
35 | 34 |
from quixote.directory import Directory |
... | ... | |
112 | 111 |
desc = force_str(tree.findtext('desc') or '') |
113 | 112 |
author = force_str(tree.findtext('author') or '') |
114 | 113 |
icon = None |
115 |
if isinstance(theme_xml, six.string_types):
|
|
114 |
if isinstance(theme_xml, str):
|
|
116 | 115 |
icon = os.path.join(os.path.dirname(theme_xml), 'icon.png') |
117 | 116 |
if not os.path.exists(icon): |
118 | 117 |
icon = None |
... | ... | |
412 | 411 |
else: |
413 | 412 |
template = default_template |
414 | 413 | |
415 |
fd = StringIO() |
|
414 |
fd = io.StringIO()
|
|
416 | 415 |
vars = get_decorate_vars(body, response, generate_breadcrumb=generate_breadcrumb) |
417 | 416 | |
418 | 417 |
template.generate(fd, vars) |
... | ... | |
527 | 526 |
return re.sub(r'[\uE000-\uF8FF]', '', rendered) |
528 | 527 | |
529 | 528 |
def ezt_render(self, context={}): |
530 |
fd = StringIO() |
|
529 |
fd = io.StringIO()
|
|
531 | 530 |
try: |
532 | 531 |
self.template.generate(fd, context) |
533 | 532 |
except ezt.EZTException as e: |
wcs/qommon/templatetags/qommon.py | ||
---|---|---|
36 | 36 |
from django import template |
37 | 37 |
from django.template import defaultfilters |
38 | 38 |
from django.utils import dateparse |
39 |
from django.utils import six |
|
40 | 39 |
from django.utils.encoding import force_bytes, force_text |
41 | 40 |
from django.utils.safestring import mark_safe |
42 | 41 |
from django.utils.timezone import is_naive, make_aware |
... | ... | |
222 | 221 |
# treat all booleans as 0 (contrary to Python behaviour where |
223 | 222 |
# decimal(True) == 1). |
224 | 223 |
value = 0 |
225 |
if isinstance(value, six.string_types):
|
|
224 |
if isinstance(value, str):
|
|
226 | 225 |
# replace , by . for French users comfort |
227 | 226 |
value = value.replace(',', '.') |
228 | 227 |
try: |
... | ... | |
505 | 504 |
def sum_(list_): |
506 | 505 |
if hasattr(list_, 'get_value'): |
507 | 506 |
list_ = list_.get_value() # unlazy |
508 |
if isinstance(list_, six.string_types):
|
|
507 |
if isinstance(list_, str):
|
|
509 | 508 |
# do not consider string as iterable, to avoid misusage |
510 | 509 |
return '' |
511 | 510 |
try: |
... | ... | |
581 | 580 |
return float(obj['lat']), float(obj['lng']) |
582 | 581 |
except (TypeError, ValueError): |
583 | 582 |
pass |
584 |
if isinstance(obj, six.string_types) and ';' in obj:
|
|
583 |
if isinstance(obj, str) and ';' in obj:
|
|
585 | 584 |
try: |
586 | 585 |
return float(obj.split(';')[0]), float(obj.split(';')[1]) |
587 | 586 |
except ValueError: |
... | ... | |
732 | 731 | |
733 | 732 |
if hasattr(value, 'get_value'): |
734 | 733 |
value = value.get_value() # unlazy |
735 |
if not value or not isinstance(value, six.string_types):
|
|
734 |
if not value or not isinstance(value, str):
|
|
736 | 735 |
return value |
737 | 736 |
number = value.strip() |
738 | 737 |
if not number: |
... | ... | |
767 | 766 |
def is_empty(value): |
768 | 767 |
from wcs.variables import LazyFormDefObjectsManager, LazyList |
769 | 768 | |
770 |
if isinstance(value, (six.string_types, list, dict)):
|
|
769 |
if isinstance(value, (str, list, dict)):
|
|
771 | 770 |
return not value |
772 | 771 |
if isinstance(value, (LazyFormDefObjectsManager, LazyList)): |
773 | 772 |
return not list(value) |
wcs/qommon/upload_storage.py | ||
---|---|---|
15 | 15 |
# along with this program; if not, see <http://www.gnu.org/licenses/>. |
16 | 16 | |
17 | 17 |
import base64 |
18 |
import io |
|
18 | 19 |
import os |
19 | 20 | |
20 | 21 |
from quixote import get_publisher |
... | ... | |
22 | 23 | |
23 | 24 |
from django.utils.encoding import force_text |
24 | 25 |
from django.utils.module_loading import import_string |
25 |
from django.utils.six import BytesIO |
|
26 | 26 | |
27 | 27 |
from .errors import ConnectionError |
28 | 28 |
from .misc import json_loads, file_digest, can_thumbnail |
... | ... | |
51 | 51 |
self.__dict__.update(dict) |
52 | 52 |
if hasattr(self, 'data'): |
53 | 53 |
# backward compatibility with older w.c.s. version |
54 |
self.fp = BytesIO(self.data) |
|
54 |
self.fp = io.BytesIO(self.data)
|
|
55 | 55 |
del self.data |
56 | 56 | |
57 | 57 |
def get_file(self): |
wcs/qommon/x509utils.py | ||
---|---|---|
21 | 21 |
import subprocess |
22 | 22 |
import stat |
23 | 23 | |
24 |
from django.utils import six |
|
25 | 24 |
from django.utils.encoding import force_text |
26 | 25 | |
27 | 26 |
_openssl = 'openssl' |
... | ... | |
29 | 28 | |
30 | 29 |
def decapsulate_pem_file(file_or_string): |
31 | 30 |
'''Remove PEM header lines''' |
32 |
if not isinstance(file_or_string, six.string_types):
|
|
31 |
if not isinstance(file_or_string, str):
|
|
33 | 32 |
content = file_or_string.read() |
34 | 33 |
else: |
35 | 34 |
content = file_or_string |
wcs/root.py | ||
---|---|---|
19 | 19 |
import os |
20 | 20 |
import re |
21 | 21 | |
22 |
from django.utils.six.moves.urllib import parse as urllib |
|
22 |
import urllib.parse |
|
23 | 23 | |
24 | 24 |
from quixote import get_publisher, get_response, get_session, redirect, get_session_manager, get_request |
25 | 25 |
from quixote.directory import Directory |
... | ... | |
113 | 113 |
if get_publisher().ident_methods.get(method)().is_interactive(): |
114 | 114 |
login_url = '../ident/%s/login' % method |
115 | 115 |
if get_request().form.get('next'): |
116 |
login_url += '?' + urllib.urlencode({'next': get_request().form.get('next')}) |
|
116 |
login_url += '?' + urllib.parse.urlencode({'next': get_request().form.get('next')})
|
|
117 | 117 |
return redirect(login_url) |
118 | 118 |
else: |
119 | 119 |
try: |
wcs/sql.py | ||
---|---|---|
15 | 15 |
# along with this program; if not, see <http://www.gnu.org/licenses/>. |
16 | 16 | |
17 | 17 |
import copy |
18 |
import io |
|
18 | 19 |
import psycopg2 |
19 | 20 |
import psycopg2.extensions |
20 | 21 |
import psycopg2.extras |
... | ... | |
28 | 29 |
except ImportError: |
29 | 30 |
import pickle |
30 | 31 | |
31 |
from django.utils import six |
|
32 | 32 |
from django.utils.encoding import force_bytes, force_text |
33 |
from django.utils.six import BytesIO |
|
34 | 33 | |
35 | 34 |
from quixote import get_publisher |
36 | 35 |
from . import qommon |
... | ... | |
84 | 83 |
def pickle_loads(value): |
85 | 84 |
if hasattr(value, 'tobytes'): |
86 | 85 |
value = value.tobytes() |
87 |
obj = UnpicklerClass(BytesIO(force_bytes(value)), **PICKLE_KWARGS).load() |
|
86 |
obj = UnpicklerClass(io.BytesIO(force_bytes(value)), **PICKLE_KWARGS).load()
|
|
88 | 87 |
obj = deep_bytes2str(obj) |
89 | 88 |
return obj |
90 | 89 | |
... | ... | |
1613 | 1612 |
# turn {'poire': 2, 'abricot': 1, 'pomme': 3} into an array |
1614 | 1613 |
value = [[force_str(x), force_str(y)] for x, y in value.items()] |
1615 | 1614 |
elif sql_type == 'varchar': |
1616 |
assert isinstance(value, six.string_types)
|
|
1615 |
assert isinstance(value, str)
|
|
1617 | 1616 |
elif sql_type == 'date': |
1618 | 1617 |
assert type(value) is time.struct_time |
1619 | 1618 |
value = datetime.datetime(value.tm_year, value.tm_mon, value.tm_mday) |
... | ... | |
2012 | 2011 |
elif field.key in ('item', 'items'): |
2013 | 2012 |
value = self.data.get('%s_display' % field.id) |
2014 | 2013 |
if value: |
2015 |
if isinstance(value, six.string_types):
|
|
2014 |
if isinstance(value, str):
|
|
2016 | 2015 |
fts_strings.append(value) |
2017 | 2016 |
elif type(value) in (tuple, list): |
2018 | 2017 |
fts_strings.extend(value) |
... | ... | |
2322 | 2321 |
elif field.key in ('item', 'items'): |
2323 | 2322 |
value = self.form_data.get('%s_display' % field.id) |
2324 | 2323 |
if value: |
2325 |
if isinstance(value, six.string_types):
|
|
2324 |
if isinstance(value, str):
|
|
2326 | 2325 |
fts_strings.append(value) |
2327 | 2326 |
elif type(value) in (tuple, list): |
2328 | 2327 |
fts_strings.extend(value) |
wcs/wf/attachment.py | ||
---|---|---|
16 | 16 | |
17 | 17 |
import xml.etree.ElementTree as ET |
18 | 18 | |
19 |
from django.utils.six.moves.urllib import parse as urllib |
|
19 |
import urllib.parse |
|
20 | 20 |
from quixote import redirect |
21 | 21 | |
22 | 22 |
from ..qommon import _, N_ |
... | ... | |
39 | 39 |
else: |
40 | 40 |
file_reference = None |
41 | 41 | |
42 |
filenames = [filename, urllib.unquote(filename)] |
|
42 |
filenames = [filename, urllib.parse.unquote(filename)]
|
|
43 | 43 |
for p in self.formdata.iter_evolution_parts(): |
44 | 44 |
if not isinstance(p, AttachmentEvolutionPart): |
45 | 45 |
continue |
... | ... | |
69 | 69 |
% ( |
70 | 70 |
self.filled.get_url(backoffice=is_in_backoffice), |
71 | 71 |
fn, |
72 |
urllib.quote(p.base_filename), |
|
72 |
urllib.parse.quote(p.base_filename),
|
|
73 | 73 |
) |
74 | 74 |
) |
75 | 75 |
wcs/wf/export_to_model.py | ||
---|---|---|
16 | 16 | |
17 | 17 |
import base64 |
18 | 18 |
import collections |
19 |
import io |
|
19 | 20 |
from xml.etree import ElementTree as ET |
20 | 21 |
import zipfile |
21 | 22 |
import os |
... | ... | |
25 | 26 |
import time |
26 | 27 |
import shutil |
27 | 28 | |
28 |
from django.utils import six |
|
29 | 29 |
from django.utils.encoding import force_bytes, force_text |
30 |
from django.utils.six import BytesIO, StringIO |
|
31 | 30 | |
32 | 31 |
from quixote import get_response, get_request, get_publisher |
33 | 32 |
from quixote.directory import Directory |
... | ... | |
503 | 502 |
try: |
504 | 503 |
# force ezt_only=True because an RTF file may contain {{ characters |
505 | 504 |
# and would be seen as a Django template |
506 |
return BytesIO( |
|
505 |
return io.BytesIO(
|
|
507 | 506 |
force_bytes( |
508 | 507 |
template_on_formdata( |
509 | 508 |
formdata, |
... | ... | |
629 | 628 |
node.append(child) |
630 | 629 |
node.tail = current_tail |
631 | 630 | |
632 |
outstream = BytesIO() |
|
631 |
outstream = io.BytesIO()
|
|
633 | 632 |
transform_opendocument(self.model_file.get_file(), outstream, process_root) |
634 | 633 |
outstream.seek(0) |
635 | 634 |
return outstream |
... | ... | |
721 | 720 |
filename = 'export_to_model-%s-%s-%s.upload' % ids |
722 | 721 | |
723 | 722 |
upload = Upload(base_filename, content_type) |
724 |
upload.fp = BytesIO() |
|
723 |
upload.fp = io.BytesIO()
|
|
725 | 724 |
upload.fp.write(content) |
726 | 725 |
upload.fp.seek(0) |
727 | 726 |
self.model_file = UploadedFile('models', filename, upload) |
wcs/wf/geolocate.py | ||
---|---|---|
24 | 24 |
except ImportError: |
25 | 25 |
Image = None |
26 | 26 | |
27 |
from django.utils.six.moves.urllib import parse as urlparse
|
|
27 |
import urllib.parse
|
|
28 | 28 |
from quixote import get_publisher |
29 | 29 | |
30 | 30 |
from ..qommon import _, N_, force_str |
... | ... | |
153 | 153 |
url += '&' |
154 | 154 |
else: |
155 | 155 |
url += '?' |
156 |
url += 'q=%s' % urlparse.quote(address) |
|
156 |
url += 'q=%s' % urllib.parse.quote(address)
|
|
157 | 157 |
url += '&format=json' |
158 | 158 |
url += '&accept-language=%s' % (get_publisher().get_site_language() or 'en') |
159 | 159 |
wcs/wf/jump.py | ||
---|---|---|
19 | 19 |
import os |
20 | 20 |
import time |
21 | 21 | |
22 |
from django.utils import six |
|
23 | ||
24 | 22 |
from quixote import get_publisher, get_request, redirect |
25 | 23 | |
26 | 24 |
from quixote.directory import Directory |
... | ... | |
136 | 134 | |
137 | 135 |
def migrate(self): |
138 | 136 |
changed = super(JumpWorkflowStatusItem, self).migrate() |
139 |
if isinstance(self.condition, six.string_types):
|
|
137 |
if isinstance(self.condition, str):
|
|
140 | 138 |
if self.condition: |
141 | 139 |
self.condition = {'type': 'python', 'value': self.condition} |
142 | 140 |
else: |
wcs/wf/profile.py | ||
---|---|---|
20 | 20 |
import time |
21 | 21 |
import xml.etree.ElementTree as ET |
22 | 22 | |
23 |
from django.utils.six.moves.urllib import parse as urlparse
|
|
23 |
import urllib.parse
|
|
24 | 24 |
from quixote import get_publisher, get_response, get_request |
25 | 25 | |
26 | 26 |
from ..qommon import _, N_ |
... | ... | |
37 | 37 |
idps = get_cfg('idp', {}) |
38 | 38 |
entity_id = list(idps.values())[0]['metadata_url'] |
39 | 39 |
base_url = entity_id.split('idp/saml2/metadata')[0] |
40 |
url = urlparse.urljoin(base_url, '/api/users/%s/' % user_uuid) |
|
40 |
url = urllib.parse.urljoin(base_url, '/api/users/%s/' % user_uuid)
|
|
41 | 41 |
secret, orig = get_secret_and_orig(url) |
42 | 42 |
url += '?orig=%s' % orig |
43 | 43 |
return sign_url(url, secret) |
wcs/wf/roles.py | ||
---|---|---|
14 | 14 |
# You should have received a copy of the GNU General Public License |
15 | 15 |
# along with this program; if not, see <http://www.gnu.org/licenses/>. |
16 | 16 | |
17 |
import urllib.parse |
|
17 | 18 |
import sys |
18 | 19 | |
19 |
from django.utils.six.moves.urllib import parse as urllib |
|
20 |
from django.utils.six.moves.urllib import parse as urlparse |
|
21 | ||
22 | 20 |
from quixote import get_request, get_publisher, get_response |
23 | 21 | |
24 | 22 |
from ..qommon import _, N_ |
... | ... | |
35 | 33 |
idps = get_cfg('idp', {}) |
36 | 34 |
entity_id = list(idps.values())[0]['metadata_url'] |
37 | 35 |
base_url = entity_id.split('idp/saml2/metadata')[0] |
38 |
url = urlparse.urljoin( |
|
39 |
base_url, '/api/roles/%s/members/%s/' % (urllib.quote(role_uuid), urllib.quote(user_uuid))
|
|
36 |
url = urllib.parse.urljoin(
|
|
37 |
base_url, '/api/roles/%s/members/%s/' % (urllib.parse.quote(role_uuid), urllib.parse.quote(user_uuid))
|
|
40 | 38 |
) |
41 | 39 |
return url |
42 | 40 |
wcs/wf/wscall.py | ||
---|---|---|
15 | 15 |
# along with this program; if not, see <http://www.gnu.org/licenses/>. |
16 | 16 | |
17 | 17 |
import datetime |
18 |
import io |
|
18 | 19 |
import sys |
19 | 20 |
import traceback |
20 | 21 |
import xml.etree.ElementTree as ET |
21 | 22 |
import collections |
22 | 23 |
import mimetypes |
23 | 24 | |
24 |
from django.utils import six |
|
25 | 25 |
from django.utils.encoding import force_text |
26 |
from django.utils.six import BytesIO |
|
27 | 26 | |
28 | 27 |
from quixote.html import TemplateIO, htmltext |
29 | 28 | |
... | ... | |
463 | 462 |
filename, content_type = self.get_attachment_data(response) |
464 | 463 |
workflow_data['%s_content_type' % self.varname] = content_type |
465 | 464 |
workflow_data['%s_length' % self.varname] = len(data) |
466 |
fp_content = BytesIO(data) |
|
465 |
fp_content = io.BytesIO(data)
|
|
467 | 466 |
attachment = AttachmentEvolutionPart( |
468 | 467 |
filename, fp_content, content_type=content_type, varname=self.varname |
469 | 468 |
) |
... | ... | |
562 | 561 |
el = ET.SubElement(xml_item, attribute) |
563 | 562 |
for (key, value) in getattr(self, attribute).items(): |
564 | 563 |
item = ET.SubElement(el, 'item') |
565 |
if isinstance(key, six.string_types):
|
|
564 |
if isinstance(key, str):
|
|
566 | 565 |
ET.SubElement(item, 'name').text = force_text(key) |
567 | 566 |
else: |
568 | 567 |
raise AssertionError('unknown type for key (%r)' % key) |
569 |
if isinstance(value, six.string_types):
|
|
568 |
if isinstance(value, str):
|
|
570 | 569 |
ET.SubElement(item, 'value').text = force_text(value) |
571 | 570 |
else: |
572 | 571 |
raise AssertionError('unknown type for value (%r)' % key) |
wcs/workflows.py | ||
---|---|---|
25 | 25 |
import time |
26 | 26 |
import uuid |
27 | 27 | |
28 |
from django.utils import six |
|
29 | 28 |
from django.utils.encoding import force_text |
30 | 29 | |
31 | 30 |
from quixote import get_request, get_response, redirect |
... | ... | |
938 | 937 |
atname = 'item' |
939 | 938 |
for v in val: |
940 | 939 |
ET.SubElement(el, atname).text = force_text(str(v), charset, errors='replace') |
941 |
elif isinstance(val, six.string_types):
|
|
940 |
elif isinstance(val, str):
|
|
942 | 941 |
el.text = force_text(val, charset, errors='replace') |
943 | 942 |
else: |
944 | 943 |
el.text = str(val) |
... | ... | |
970 | 969 |
else: |
971 | 970 |
if el.text is None: |
972 | 971 |
setattr(self, attribute, None) |
973 |
elif el.text in ('False', 'True') and not isinstance( |
|
974 |
getattr(self, attribute), six.string_types |
|
975 |
): |
|
972 |
elif el.text in ('False', 'True') and not isinstance(getattr(self, attribute), str): |
|
976 | 973 |
# booleans |
977 | 974 |
setattr(self, attribute, el.text == 'True') |
978 | 975 |
elif type(getattr(self, attribute)) is int: |
... | ... | |
1311 | 1308 |
) |
1312 | 1309 |
elif isinstance(anchor_date, time.struct_time): |
1313 | 1310 |
anchor_date = datetime.datetime.fromtimestamp(time.mktime(anchor_date)) |
1314 |
elif isinstance(anchor_date, six.string_types) and anchor_date:
|
|
1311 |
elif isinstance(anchor_date, str) and anchor_date:
|
|
1315 | 1312 |
try: |
1316 | 1313 |
anchor_date = get_as_datetime(anchor_date) |
1317 | 1314 |
except ValueError: |
... | ... | |
2076 | 2073 |
formdata=None, |
2077 | 2074 |
status_item=None, |
2078 | 2075 |
): |
2079 |
if not isinstance(var, six.string_types):
|
|
2076 |
if not isinstance(var, str):
|
|
2080 | 2077 |
return var |
2081 | 2078 | |
2082 | 2079 |
expression = cls.get_expression(var) |
... | ... | |
2919 | 2916 |
# this works around the fact that parametric workflows only support |
2920 | 2917 |
# string values, so if we get set a string, we convert it here to an |
2921 | 2918 |
# array. |
2922 |
if isinstance(self.to, six.string_types):
|
|
2919 |
if isinstance(self.to, str):
|
|
2923 | 2920 |
self.to = [self.to] |
2924 | 2921 | |
2925 | 2922 |
addresses = [] |
... | ... | |
2935 | 2932 |
if isinstance(dest, list): |
2936 | 2933 |
addresses.extend(dest) |
2937 | 2934 |
continue |
2938 |
elif isinstance(dest, six.string_types) and ',' in dest:
|
|
2935 |
elif isinstance(dest, str) and ',' in dest:
|
|
2939 | 2936 |
# if the email contains a comma consider it as a serie of |
2940 | 2937 |
# emails |
2941 | 2938 |
addresses.extend([x.strip() for x in dest.split(',')]) |
wcs/wscalls.py | ||
---|---|---|
17 | 17 |
import collections |
18 | 18 |
import json |
19 | 19 |
import sys |
20 |
import urllib.parse |
|
20 | 21 |
import xml.etree.ElementTree as ET |
21 | 22 | |
22 | 23 |
from django.utils.encoding import force_text |
23 |
from django.utils.six.moves.urllib import parse as urllib |
|
24 |
from django.utils.six.moves.urllib import parse as urlparse |
|
25 | 24 | |
26 | 25 |
from quixote import get_publisher, get_request |
27 | 26 | |
... | ... | |
94 | 93 | |
95 | 94 |
if qs_data: # merge qs_data into url |
96 | 95 |
publisher = get_publisher() |
97 |
parsed = urlparse.urlparse(url) |
|
98 |
qs = list(urlparse.parse_qsl(parsed.query)) |
|
96 |
parsed = urllib.parse.urlparse(url)
|
|
97 |
qs = list(urllib.parse.parse_qsl(parsed.query))
|
|
99 | 98 |
for key, value in qs_data.items(): |
100 | 99 |
try: |
101 | 100 |
value = WorkflowStatusItem.compute(value, raises=True) |
... | ... | |
106 | 105 |
key = force_str(key) |
107 | 106 |
value = force_str(value) |
108 | 107 |
qs.append((key, value)) |
109 |
qs = urllib.urlencode(qs) |
|
110 |
url = urlparse.urlunparse(parsed[:4] + (qs,) + parsed[5:6]) |
|
108 |
qs = urllib.parse.urlencode(qs)
|
|
109 |
url = urllib.parse.urlunparse(parsed[:4] + (qs,) + parsed[5:6])
|
|
111 | 110 | |
112 | 111 |
unsigned_url = url |
113 | 112 | |
114 |
- |