0001-cards-add-import-data-from-CSV-39473.patch
tests/test_backoffice_pages.py | ||
---|---|---|
23 | 23 |
from django.utils.six.moves.urllib import parse as urllib |
24 | 24 |
from django.utils.six.moves.urllib import parse as urlparse |
25 | 25 | |
26 |
from webtest import Upload |
|
27 | ||
26 | 28 |
from quixote import cleanup, get_publisher |
27 | 29 |
from wcs.qommon import ods |
28 | 30 |
from wcs.api_utils import sign_url |
... | ... | |
5534 | 5536 |
resp.click('card plop') |
5535 | 5537 | |
5536 | 5538 | |
5539 |
def test_backoffice_cards_import_data_from_csv(pub, studio): |
|
5540 |
user = create_user(pub) |
|
5541 |
CardDef.wipe() |
|
5542 |
carddef = CardDef() |
|
5543 |
carddef.name = 'test' |
|
5544 |
carddef.fields = [ |
|
5545 |
fields.StringField(id='1', label='Test', type='string', varname='string'), |
|
5546 |
fields.ItemField(id='2', label='List', type='item', varname='list', |
|
5547 |
items=['item1', 'item2']) |
|
5548 |
] |
|
5549 |
carddef.backoffice_submission_roles = user.roles |
|
5550 |
carddef.workflow_roles = {'_editor': user.roles[0]} |
|
5551 |
carddef.store() |
|
5552 |
carddef.data_class().wipe() |
|
5553 | ||
5554 |
app = login(get_app(pub)) |
|
5555 | ||
5556 |
resp = app.get(carddef.get_url()) |
|
5557 |
assert 'import-csv' in resp |
|
5558 |
resp = resp.click('Import data from a CSV file') |
|
5559 |
resp.forms[0]['file'] = Upload('test.csv', b'', 'text/csv') |
|
5560 |
resp = resp.forms[0].submit() |
|
5561 |
assert 'Invalid CSV file.' in resp |
|
5562 | ||
5563 |
resp.forms[0]['file'] = Upload('test.csv', b'data1,data2', 'text/csv') |
|
5564 |
resp = resp.forms[0].submit() |
|
5565 |
assert 'File misses fields: list, string.' in resp |
|
5566 | ||
5567 |
resp.forms[0]['file'] = Upload('test.csv', b'string,list\ndata1,item1', 'text/csv') |
|
5568 |
resp = resp.forms[0].submit().follow() |
|
5569 |
assert 'Data imported successfully.' in resp |
|
5570 | ||
5571 | ||
5537 | 5572 |
def test_backoffice_cards_wscall_failure_display(http_requests, pub, studio): |
5538 | 5573 |
LoggedError.wipe() |
5539 | 5574 |
user = create_user(pub) |
wcs/backoffice/data_management.py | ||
---|---|---|
14 | 14 |
# You should have received a copy of the GNU General Public License |
15 | 15 |
# along with this program; if not, see <http://www.gnu.org/licenses/>. |
16 | 16 | |
17 |
import csv |
|
18 | ||
17 | 19 |
from quixote import get_request, get_response, get_session, redirect |
18 | 20 |
from quixote.html import TemplateIO, htmltext, htmlescape |
19 | 21 | |
22 |
from django.utils.encoding import force_text |
|
23 | ||
20 | 24 |
from ..qommon import _ |
21 | 25 |
from ..qommon import errors |
22 | 26 |
from ..qommon import template |
27 |
from ..qommon.form import Form, FileWidget |
|
23 | 28 |
from ..qommon.backoffice.menu import html_top |
24 | 29 | |
25 | 30 |
from wcs.carddef import CardDef |
... | ... | |
67 | 72 | |
68 | 73 | |
69 | 74 |
class CardPage(FormPage): |
70 |
_q_exports = ['', 'csv', 'xls', 'ods', 'json', 'export', 'map', 'geojson', 'add'] |
|
75 |
_q_exports = ['', 'csv', 'xls', 'ods', 'json', 'export', 'map', 'geojson', 'add', |
|
76 |
('import-csv', 'import_csv')] |
|
77 | ||
78 |
import_csv_message = N_( |
|
79 |
'You can add data to this card by uploading a file containing ' |
|
80 |
'var names in columns captions.') |
|
81 |
import_csv_success_message = N_( |
|
82 |
'Data imported successfully.' |
|
83 |
) |
|
71 | 84 | |
72 | 85 |
def __init__(self, component): |
73 | 86 |
try: |
... | ... | |
99 | 112 |
def get_filter_from_query(self, default='waiting'): |
100 | 113 |
return 'all' |
101 | 114 | |
115 | ||
116 |
def import_csv(self): |
|
117 |
form = Form(enctype='multipart/form-data', use_tokens=False) |
|
118 |
form.add(FileWidget, 'file', title=_('File'), required=False) |
|
119 |
form.add_submit('submit', _('Submit')) |
|
120 |
form.add_submit('cancel', _('Cancel')) |
|
121 |
if form.get_widget('cancel').parse(): |
|
122 |
return redirect('.') |
|
123 | ||
124 |
if form.is_submitted() and not form.has_errors(): |
|
125 |
try: |
|
126 |
return self.import_csv_submit(form) |
|
127 |
except ValueError: |
|
128 |
pass |
|
129 | ||
130 |
get_response().breadcrumb.append(('import_csv', _('Import CSV'))) |
|
131 |
html_top('data_management', _('Import CSV')) |
|
132 |
r = TemplateIO(html=True) |
|
133 |
r += htmltext('<h2>%s</h2>') % _('Import CSV') |
|
134 |
r += htmltext('<p>%s</p>') % _(self.import_csv_message) |
|
135 |
r += form.render() |
|
136 |
return r.getvalue() |
|
137 | ||
138 | ||
139 |
def import_csv_submit(self, form): |
|
140 |
if form.get_widget('file').parse(): |
|
141 |
data = form.get_widget('file').parse().fp.read() |
|
142 |
text_data = force_text(data) |
|
143 |
else: |
|
144 |
form.set_error('file', _('You have to enter a file.')) |
|
145 |
raise ValueError() |
|
146 | ||
147 |
carddef_fields = self.formdef.get_varname_fields() |
|
148 |
carddef_fields_names = [f.varname for f in carddef_fields] |
|
149 |
reader = csv.reader(text_data.splitlines(), delimiter=',') |
|
150 |
try: |
|
151 |
caption = next(reader) |
|
152 |
except StopIteration: |
|
153 |
form.set_error('file', _('Invalid CSV file.')) |
|
154 |
raise ValueError() |
|
155 | ||
156 |
if len(caption) < len(carddef_fields_names): |
|
157 |
form.set_error('file', _('File contains less columns that fields with varnames.')) |
|
158 |
raise ValueError() |
|
159 | ||
160 |
missing_fields = set(carddef_fields_names) - set(caption) |
|
161 |
if missing_fields: |
|
162 |
form.set_error('file', _('File misses fields: %s.' % ', '.join(missing_fields))) |
|
163 |
raise ValueError() |
|
164 |
data_class = self.formdef.data_class() |
|
165 |
for csv_line in reader: |
|
166 |
data_line = dict(zip(caption, csv_line)) |
|
167 |
data_instance = data_class() |
|
168 |
data = {} |
|
169 |
for field in carddef_fields: |
|
170 |
value = data_line[field.varname] |
|
171 |
field_id = str(field.id) |
|
172 |
if field.convert_value_from_anything: |
|
173 |
value = field.convert_value_from_anything(value) |
|
174 |
data[field_id] = value |
|
175 |
if field.store_display_value: |
|
176 |
display_value = field.store_display_value(data, field_id) |
|
177 |
data['%s_display' % field_id] = display_value |
|
178 |
if value and field.store_structured_value: |
|
179 |
structured_value = field.store_structured_value(data, field_id) |
|
180 |
if structured_value: |
|
181 |
if isinstance(structured_value, dict) and structured_value.get('id'): |
|
182 |
formdata.data[field_id] = str(structured_value.get('id')) |
|
183 |
data['%s_structured' % field_id] = structured_value |
|
184 |
data_instance.data = data |
|
185 |
data_instance.just_created() |
|
186 |
data_instance.store() |
|
187 | ||
188 |
get_session().message = ('info', self.import_csv_success_message) |
|
189 |
return redirect('.') |
|
190 | ||
191 | ||
102 | 192 |
def _q_lookup(self, component): |
103 | 193 |
try: |
104 | 194 |
filled = self.formdef.data_class().get(component) |
wcs/backoffice/management.py | ||
---|---|---|
1039 | 1039 |
qs, _('Plot on a Map')) |
1040 | 1040 |
if 'stats' in self._q_exports: |
1041 | 1041 |
r += htmltext(' <li class="stats"><a href="stats">%s</a></li>') % _('Statistics') |
1042 |
if get_publisher().has_site_option('studio') and ('import-csv', 'import_csv') in self._q_exports: |
|
1043 |
r += htmltext('<li><a rel="popup" href="import-csv">%s</a></li>') % _('Import data from a CSV file') |
|
1042 | 1044 |
r += htmltext('</ul>') |
1043 | 1045 |
return r.getvalue() |
1044 | 1046 |
wcs/formdef.py | ||
---|---|---|
385 | 385 |
def get_all_fields(self): |
386 | 386 |
return (self.fields or []) + self.workflow.get_backoffice_fields() |
387 | 387 | |
388 |
def get_varname_fields(self): |
|
389 |
return [field for field in self.fields or [] if isinstance(field, fields.WidgetField) and field.varname] |
|
390 | ||
388 | 391 |
def rebuild(self): |
389 | 392 |
if get_publisher().is_using_postgresql(): |
390 | 393 |
from . import sql |
391 |
- |