0001-add-sector-connector-56001.patch
passerelle/apps/sector/admin.py | ||
---|---|---|
1 |
# passerelle - uniform access to multiple data sources and services |
|
2 |
# Copyright (C) 2021 Entr'ouvert |
|
3 |
# |
|
4 |
# This program is free software: you can redistribute it and/or modify it |
|
5 |
# under the terms of the GNU Affero General Public License as published |
|
6 |
# by the Free Software Foundation, either version 3 of the License, or |
|
7 |
# (at your option) any later version. |
|
8 |
# |
|
9 |
# This program is distributed in the hope that it will be useful, |
|
10 |
# but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
11 |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
12 |
# GNU Affero General Public License for more details. |
|
13 |
# |
|
14 |
# You should have received a copy of the GNU Affero General Public License |
|
15 |
# along with this program. If not, see <http://www.gnu.org/licenses/>. |
|
16 | ||
17 |
from django.contrib import admin |
|
18 | ||
19 |
from passerelle.apps.sector.models import Sector, Sectorization |
|
20 | ||
21 | ||
22 |
class SectorAdmin(admin.ModelAdmin): |
|
23 |
prepopulated_fields = {'slug': ('title',)} |
|
24 |
list_display = ('title', 'slug', 'resource') |
|
25 |
list_filter = ('resource',) |
|
26 | ||
27 | ||
28 |
class SectorizationAdmin(admin.ModelAdmin): |
|
29 |
list_display = ( |
|
30 |
'id', |
|
31 |
'street_id', |
|
32 |
'parity', |
|
33 |
'min_housenumber', |
|
34 |
'max_housenumber', |
|
35 |
'sector', |
|
36 |
'resource', |
|
37 |
) |
|
38 |
list_filter = ('sector__resource',) |
|
39 | ||
40 | ||
41 |
admin.site.register(Sector, SectorAdmin) |
|
42 |
admin.site.register(Sectorization, SectorizationAdmin) |
passerelle/apps/sector/migrations/0001_initial.py | ||
---|---|---|
1 |
# Generated by Django 2.2.19 on 2021-08-15 19:40 |
|
2 | ||
3 |
import django.db.models.deletion |
|
4 |
from django.db import migrations, models |
|
5 | ||
6 |
import passerelle.apps.sector.models |
|
7 | ||
8 | ||
9 |
class Migration(migrations.Migration): |
|
10 | ||
11 |
initial = True |
|
12 | ||
13 |
dependencies = [ |
|
14 |
('base', '0029_auto_20210202_1627'), |
|
15 |
] |
|
16 | ||
17 |
operations = [ |
|
18 |
migrations.CreateModel( |
|
19 |
name='Sector', |
|
20 |
fields=[ |
|
21 |
( |
|
22 |
'id', |
|
23 |
models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID'), |
|
24 |
), |
|
25 |
('title', models.CharField(max_length=256, verbose_name='Title')), |
|
26 |
('slug', models.CharField(max_length=128, verbose_name='Identifier')), |
|
27 |
], |
|
28 |
options={ |
|
29 |
'ordering': ['resource', 'slug'], |
|
30 |
}, |
|
31 |
), |
|
32 |
migrations.CreateModel( |
|
33 |
name='SectorResource', |
|
34 |
fields=[ |
|
35 |
( |
|
36 |
'id', |
|
37 |
models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID'), |
|
38 |
), |
|
39 |
('title', models.CharField(max_length=50, verbose_name='Title')), |
|
40 |
('slug', models.SlugField(unique=True, verbose_name='Identifier')), |
|
41 |
('description', models.TextField(verbose_name='Description')), |
|
42 |
( |
|
43 |
'csv_file', |
|
44 |
models.FileField( |
|
45 |
help_text='CSV file', |
|
46 |
upload_to=passerelle.apps.sector.models.upload_to, |
|
47 |
verbose_name='Sectorization file', |
|
48 |
), |
|
49 |
), |
|
50 |
( |
|
51 |
'titles_in_first_line', |
|
52 |
models.BooleanField( |
|
53 |
default=True, |
|
54 |
help_text='If not, column titles are: street_id,parity,min_housenumber,max_housenumber,sector_id,sector_name, …', |
|
55 |
verbose_name='First line defines column titles', |
|
56 |
), |
|
57 |
), |
|
58 |
( |
|
59 |
'users', |
|
60 |
models.ManyToManyField( |
|
61 |
blank=True, |
|
62 |
related_name='_sectorresource_users_+', |
|
63 |
related_query_name='+', |
|
64 |
to='base.ApiUser', |
|
65 |
), |
|
66 |
), |
|
67 |
], |
|
68 |
options={ |
|
69 |
'abstract': False, |
|
70 |
}, |
|
71 |
), |
|
72 |
migrations.CreateModel( |
|
73 |
name='Sectorization', |
|
74 |
fields=[ |
|
75 |
( |
|
76 |
'id', |
|
77 |
models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID'), |
|
78 |
), |
|
79 |
('street_id', models.CharField(max_length=64, verbose_name='Street Identifier')), |
|
80 |
( |
|
81 |
'parity', |
|
82 |
models.SmallIntegerField( |
|
83 |
choices=[(0, 'all'), (1, 'odd'), (2, 'even')], |
|
84 |
default=0, |
|
85 |
verbose_name='Parity of numbers', |
|
86 |
), |
|
87 |
), |
|
88 |
( |
|
89 |
'min_housenumber', |
|
90 |
models.PositiveIntegerField(default=0, verbose_name='Minimal house number'), |
|
91 |
), |
|
92 |
( |
|
93 |
'max_housenumber', |
|
94 |
models.PositiveIntegerField(default=999999, verbose_name='Maximal house number'), |
|
95 |
), |
|
96 |
( |
|
97 |
'sector', |
|
98 |
models.ForeignKey( |
|
99 |
on_delete=django.db.models.deletion.CASCADE, to='sector.Sector', verbose_name='Sector' |
|
100 |
), |
|
101 |
), |
|
102 |
], |
|
103 |
options={ |
|
104 |
'ordering': ['street_id', 'min_housenumber', 'parity'], |
|
105 |
}, |
|
106 |
), |
|
107 |
migrations.AddField( |
|
108 |
model_name='sector', |
|
109 |
name='resource', |
|
110 |
field=models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='sector.SectorResource'), |
|
111 |
), |
|
112 |
migrations.AlterUniqueTogether( |
|
113 |
name='sector', |
|
114 |
unique_together={('resource', 'slug')}, |
|
115 |
), |
|
116 |
] |
passerelle/apps/sector/models.py | ||
---|---|---|
1 |
# passerelle - uniform access to multiple data sources and services |
|
2 |
# Copyright (C) 2021 Entr'ouvert |
|
3 |
# |
|
4 |
# This program is free software: you can redistribute it and/or modify it |
|
5 |
# under the terms of the GNU Affero General Public License as published |
|
6 |
# by the Free Software Foundation, either version 3 of the License, or |
|
7 |
# (at your option) any later version. |
|
8 |
# |
|
9 |
# This program is distributed in the hope that it will be useful, |
|
10 |
# but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
11 |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
12 |
# GNU Affero General Public License for more details. |
|
13 |
# |
|
14 |
# You should have received a copy of the GNU Affero General Public License |
|
15 |
# along with this program. If not, see <http://www.gnu.org/licenses/>. |
|
16 | ||
17 |
import csv |
|
18 |
import datetime |
|
19 |
import mimetypes |
|
20 |
import os |
|
21 | ||
22 |
from django.conf import settings |
|
23 |
from django.core.exceptions import ValidationError |
|
24 |
from django.core.files.base import ContentFile |
|
25 |
from django.db import models, transaction |
|
26 |
from django.db.models import Q |
|
27 |
from django.http import HttpResponse |
|
28 |
from django.utils.encoding import force_bytes, force_str, smart_text |
|
29 |
from django.utils.timezone import now |
|
30 |
from django.utils.translation import ugettext_lazy as _ |
|
31 | ||
32 |
from passerelle.base.models import BaseResource |
|
33 |
from passerelle.utils.api import endpoint |
|
34 |
from passerelle.utils.jsonresponse import APIError |
|
35 | ||
36 |
PARITY_ALL = 0 |
|
37 |
PARITY_ODD = 1 |
|
38 |
PARITY_EVEN = 2 |
|
39 | ||
40 |
PARITY_CHOICES = ( |
|
41 |
(PARITY_ALL, _('all')), |
|
42 |
(PARITY_ODD, _('odd')), |
|
43 |
(PARITY_EVEN, _('even')), |
|
44 |
) |
|
45 | ||
46 |
CSV_TITLES = ['street_id', 'parity', 'min_housenumber', 'max_housenumber', 'sector_id', 'sector_name'] |
|
47 | ||
48 |
MAX_HOUSENUMBER = 999_999 |
|
49 | ||
50 | ||
51 |
def upload_to(instance, filename): |
|
52 |
return '%s/%s/%s' % (instance.get_connector_slug(), instance.slug, filename) |
|
53 | ||
54 | ||
55 |
class SectorResource(BaseResource): |
|
56 |
csv_file = models.FileField( |
|
57 |
_('Sectorization file'), |
|
58 |
upload_to=upload_to, |
|
59 |
help_text=_('CSV file'), |
|
60 |
) |
|
61 |
titles_in_first_line = models.BooleanField( |
|
62 |
_('First line defines column titles'), |
|
63 |
default=True, |
|
64 |
help_text=_('If not, column titles are: %s, …') % ','.join(CSV_TITLES), |
|
65 |
) |
|
66 | ||
67 |
category = _('Geographic information system') |
|
68 |
hide_description_fields = ['csv_file'] |
|
69 | ||
70 |
def __str__(self): |
|
71 |
return '%s [%s]' % (self.title, self.slug) |
|
72 | ||
73 |
def daily(self): |
|
74 |
super().daily() |
|
75 |
self.clean_old_csv_files() |
|
76 | ||
77 |
def clean_old_csv_files(self): |
|
78 |
if not os.path.exists(self.csv_file.path): |
|
79 |
return |
|
80 |
base_dir = os.path.dirname(self.csv_file.path) |
|
81 |
if os.path.dirname(self.csv_file.name) != os.path.join(self.get_connector_slug(), self.slug): |
|
82 |
# path is not compliant with upload_to, do nothing |
|
83 |
return |
|
84 | ||
85 |
for filename in os.listdir(base_dir): |
|
86 |
filepath = os.path.join(base_dir, filename) |
|
87 |
if not os.path.isfile(filepath): |
|
88 |
continue |
|
89 |
if os.path.basename(self.csv_file.name) == filename: |
|
90 |
# current file |
|
91 |
continue |
|
92 |
mtime = os.stat(filepath).st_mtime |
|
93 |
if mtime > (now() + datetime.timedelta(days=-7)).timestamp(): |
|
94 |
# too young |
|
95 |
continue |
|
96 | ||
97 |
if getattr(settings, 'SECTOR_REMOVE_ON_CLEAN', False) is True: |
|
98 |
# remove |
|
99 |
os.unlink(filepath) |
|
100 |
else: |
|
101 |
# move file in unused-files dir |
|
102 |
unused_dir = os.path.join(base_dir, 'unused-files') |
|
103 |
os.makedirs(unused_dir, exist_ok=True) |
|
104 |
os.rename(filepath, os.path.join(unused_dir, filename)) |
|
105 | ||
106 |
def clean(self, *args, **kwargs): |
|
107 |
try: |
|
108 |
self.import_csv(validate_only=True) |
|
109 |
except Exception as e: |
|
110 |
raise ValidationError(_('Invalid CSV file: %s') % e) |
|
111 |
return super().clean(*args, **kwargs) |
|
112 | ||
113 |
def save(self, *args, **kwargs): |
|
114 |
import_csv = kwargs.pop('import_csv', True) |
|
115 |
result = super().save(*args, **kwargs) |
|
116 |
if import_csv: |
|
117 |
self.import_csv() |
|
118 |
return result |
|
119 | ||
120 |
@transaction.atomic |
|
121 |
def import_csv(self, validate_only=False): |
|
122 |
try: |
|
123 |
self.csv_file.seek(0) |
|
124 |
content = force_bytes(self.csv_file.read()) |
|
125 |
content = force_str(content.decode('utf-8-sig', 'ignore').encode('utf-8')) # handle BOM |
|
126 |
dialect = csv.Sniffer().sniff(content) |
|
127 |
reader = csv.reader(content.splitlines(), dialect) |
|
128 |
except Exception as e: |
|
129 |
raise ValidationError(_('failed to read CSV (%s)') % e) |
|
130 |
if self.titles_in_first_line: |
|
131 |
first_line = next(reader) |
|
132 |
titles = [name.strip().lower() for name in first_line] |
|
133 |
if not set(CSV_TITLES).issubset(titles): |
|
134 |
raise ValidationError( |
|
135 |
_('missing column(s) in header: %s.') % ', '.join(set(CSV_TITLES) - set(titles)) |
|
136 |
) |
|
137 |
else: |
|
138 |
titles = CSV_TITLES |
|
139 |
indexes = [titles.index(t) for t in titles if t] |
|
140 |
captions = [titles[i] for i in indexes] |
|
141 | ||
142 |
# now ready to import data, first delete all sectors |
|
143 |
# sectorization will also be delete (cascade) |
|
144 |
# (will be cancelled by transaction on any import error) |
|
145 |
if not validate_only: |
|
146 |
self.sector_set.all().delete() |
|
147 | ||
148 |
def get_cell(row, index): |
|
149 |
try: |
|
150 |
return row[index] |
|
151 |
except IndexError: |
|
152 |
return '' |
|
153 | ||
154 |
sector_id = sector_name = None |
|
155 |
sectors = {} |
|
156 |
for row in reader: |
|
157 |
if not row: |
|
158 |
continue # do not consider empty lines |
|
159 |
row = [smart_text(x).strip() for x in row] |
|
160 |
row = {caption: get_cell(row, index) for caption, index in zip(captions, indexes)} |
|
161 | ||
162 |
if row['sector_id']: |
|
163 |
sector_id = row['sector_id'] |
|
164 |
sector_name = row['sector_name'] or row['sector_id'] |
|
165 |
if not sector_id: |
|
166 |
raise ValidationError(_('missing sector_id, line %s') % reader.line_num) |
|
167 |
if not row['street_id']: |
|
168 |
raise ValidationError(_('missing street_id, line %s') % reader.line_num) |
|
169 |
if not row['parity']: |
|
170 |
parity = PARITY_ALL |
|
171 |
elif row['parity'].lower()[0] in (str(PARITY_EVEN), 'e', 'p'): # p = pair (even, in french) |
|
172 |
parity = PARITY_EVEN |
|
173 |
elif row['parity'].lower()[0] in (str(PARITY_ODD), 'o', 'i'): # i = impair (odd, in french) |
|
174 |
parity = PARITY_ODD |
|
175 |
else: |
|
176 |
parity = PARITY_ALL |
|
177 |
try: |
|
178 |
min_housenumber = int(row['min_housenumber']) |
|
179 |
except (ValueError, TypeError): |
|
180 |
min_housenumber = 0 |
|
181 |
try: |
|
182 |
max_housenumber = int(row['max_housenumber']) |
|
183 |
except (ValueError, TypeError): |
|
184 |
max_housenumber = MAX_HOUSENUMBER |
|
185 | ||
186 |
if not validate_only: |
|
187 |
if sector_id not in sectors: |
|
188 |
sectors[sector_id] = self.sector_set.create( |
|
189 |
resource=self, slug=sector_id, title=sector_name |
|
190 |
) |
|
191 |
sectors[sector_id].sectorization_set.create( |
|
192 |
sector=sectors[sector_id], |
|
193 |
street_id=row['street_id'], |
|
194 |
parity=parity, |
|
195 |
min_housenumber=min_housenumber, |
|
196 |
max_housenumber=max_housenumber, |
|
197 |
) |
|
198 | ||
199 |
@endpoint( |
|
200 |
description=_('Update sectorization with a CSV file'), |
|
201 |
display_category=_('Management'), |
|
202 |
perm='can_access', |
|
203 |
methods=['put'], |
|
204 |
) |
|
205 |
def update(self, request): |
|
206 |
ext = mimetypes.guess_extension(request.content_type) |
|
207 |
if not ext: |
|
208 |
raise APIError( |
|
209 |
"can't guess filename extension for '%s' content type" % request.content_type, http_status=400 |
|
210 |
) |
|
211 |
name = self.csv_file.storage.get_available_name('api-uploaded-file' + ext) |
|
212 |
self.csv_file = ContentFile(content=request.body, name=name) |
|
213 |
try: |
|
214 |
self.clean() |
|
215 |
except ValidationError as e: |
|
216 |
raise APIError(e, http_status=400) |
|
217 |
self.save() |
|
218 |
return { |
|
219 |
'updated': self.csv_file.name, |
|
220 |
'data': [ |
|
221 |
{ |
|
222 |
'id': sector.slug, |
|
223 |
'text': sector.title, |
|
224 |
} |
|
225 |
for sector in self.sector_set.all() |
|
226 |
], |
|
227 |
} |
|
228 | ||
229 |
@endpoint( |
|
230 |
description=_('Get sectorization as a CSV file'), |
|
231 |
display_category=_('Management'), |
|
232 |
perm='can_access', |
|
233 |
parameters={ |
|
234 |
'even': { |
|
235 |
'description': _('Even numbers indicator (default: %s)') % PARITY_EVEN, |
|
236 |
'example_value': 'P', |
|
237 |
}, |
|
238 |
'odd': { |
|
239 |
'description': _('Odd numbers indicator (default: %s)') % PARITY_ODD, |
|
240 |
'example_value': 'I', |
|
241 |
}, |
|
242 |
'mix': { |
|
243 |
'description': _('Even or odd numbers indicator (default: %s)') % PARITY_ALL, |
|
244 |
'example_value': '', |
|
245 |
}, |
|
246 |
'limits': {'description': _('Show housenumber min/max (0/%s)') % MAX_HOUSENUMBER, 'type': 'bool'}, |
|
247 |
'repeat': {'description': _('Repeat sector id and name on all lines'), 'type': 'bool'}, |
|
248 |
}, |
|
249 |
) |
|
250 |
def export(self, request, even=None, odd=None, mix=None, limits=False, repeat=False): |
|
251 |
response = HttpResponse(content_type='text/csv') |
|
252 |
date = now().strftime('%Y-%m-%d_%H:%M') |
|
253 |
response['Content-Disposition'] = 'attachment; filename="sector-%s-%s.csv"' % (self.slug, date) |
|
254 |
writer = csv.writer(response, delimiter=',', quotechar='"', quoting=csv.QUOTE_ALL) |
|
255 |
if self.titles_in_first_line: |
|
256 |
writer.writerow(CSV_TITLES) |
|
257 |
parity = dict(PARITY_CHOICES) |
|
258 |
if even is not None: |
|
259 |
parity[PARITY_EVEN] = even |
|
260 |
if odd is not None: |
|
261 |
parity[PARITY_ODD] = odd |
|
262 |
if mix is not None: |
|
263 |
parity[PARITY_ALL] = mix |
|
264 |
for sector in self.sector_set.all().order_by('slug'): |
|
265 |
first = True |
|
266 |
for sectorization in sector.sectorization_set.all().order_by('street_id'): |
|
267 |
writer.writerow( |
|
268 |
[ |
|
269 |
sectorization.street_id, |
|
270 |
parity.get(sectorization.parity, mix), |
|
271 |
sectorization.min_housenumber if (limits or sectorization.min_housenumber) else '', |
|
272 |
sectorization.max_housenumber |
|
273 |
if (limits or sectorization.max_housenumber < MAX_HOUSENUMBER) |
|
274 |
else '', |
|
275 |
sector.slug if (repeat or first) else '', |
|
276 |
sector.title if (repeat or first) else '', |
|
277 |
] |
|
278 |
) |
|
279 |
first = False |
|
280 |
return response |
|
281 | ||
282 |
@endpoint( |
|
283 |
name='sectors', |
|
284 |
description=_('List of Sectors'), |
|
285 |
perm='can_access', |
|
286 |
display_category=_('Data sources'), |
|
287 |
parameters={ |
|
288 |
'id': {'description': _('Sector identifier (slug)')}, |
|
289 |
'q': {'description': _('Filter by Sector Title or Identifier')}, |
|
290 |
'street_id': {'description': _('Get sectors for this Street identifier')}, |
|
291 |
'house_number': { |
|
292 |
'description': _('Get sectors by this House Number (requires a street_id)'), |
|
293 |
'type': 'integer', |
|
294 |
}, |
|
295 |
}, |
|
296 |
) |
|
297 |
def sectors(self, request, q=None, id=None, street_id=None, house_number=None): |
|
298 |
if house_number and not street_id: |
|
299 |
raise APIError('house_number requires a street_id', http_status=400) |
|
300 | ||
301 |
# search by street and house number |
|
302 |
if street_id: |
|
303 |
query = Sectorization.objects.filter(sector__resource=self, street_id=street_id) |
|
304 |
if house_number is not None: |
|
305 |
house_number = int(house_number) |
|
306 |
query = query.filter(min_housenumber__lte=house_number, max_housenumber__gte=house_number) |
|
307 |
parity = PARITY_ODD if house_number % 2 else PARITY_EVEN |
|
308 |
query = query.filter(Q(parity=PARITY_ALL) | Q(parity=parity)) |
|
309 |
else: |
|
310 |
query = query.filter(parity=PARITY_ALL, min_housenumber=0, max_housenumber=MAX_HOUSENUMBER) |
|
311 |
return { |
|
312 |
'data': [ |
|
313 |
{ |
|
314 |
'id': sectorization.sector.slug, |
|
315 |
'text': sectorization.sector.title, |
|
316 |
} |
|
317 |
] |
|
318 |
for sectorization in query.reverse() |
|
319 |
} |
|
320 | ||
321 |
# list of sectors |
|
322 |
query = self.sector_set.all() |
|
323 |
if id is not None: |
|
324 |
query = query.filter(slug=id) |
|
325 |
elif q is not None: |
|
326 |
query = query.filter(Q(slug__icontains=q) | Q(title__icontains=q)) |
|
327 |
return { |
|
328 |
'data': [ |
|
329 |
{ |
|
330 |
'id': sector.slug, |
|
331 |
'text': sector.title, |
|
332 |
} |
|
333 |
for sector in query |
|
334 |
] |
|
335 |
} |
|
336 | ||
337 | ||
338 |
class Sector(models.Model): |
|
339 |
resource = models.ForeignKey(SectorResource, on_delete=models.CASCADE) |
|
340 |
title = models.CharField(max_length=256, verbose_name=_('Title')) |
|
341 |
slug = models.CharField(max_length=128, verbose_name=_('Identifier')) |
|
342 | ||
343 |
class Meta: |
|
344 |
ordering = ['resource', 'slug'] |
|
345 |
unique_together = ('resource', 'slug') |
|
346 | ||
347 |
def __str__(self): |
|
348 |
return '%s > %s [%s]' % (self.resource, self.title, self.slug) |
|
349 | ||
350 | ||
351 |
class Sectorization(models.Model): |
|
352 |
sector = models.ForeignKey(Sector, on_delete=models.CASCADE, verbose_name=_('Sector')) |
|
353 |
street_id = models.CharField(max_length=64, verbose_name=_('Street Identifier')) |
|
354 |
parity = models.SmallIntegerField( |
|
355 |
choices=PARITY_CHOICES, default=PARITY_ALL, verbose_name=_('Parity of numbers') |
|
356 |
) |
|
357 |
min_housenumber = models.PositiveIntegerField(default=0, verbose_name=_('Minimal house number')) |
|
358 |
max_housenumber = models.PositiveIntegerField( |
|
359 |
default=MAX_HOUSENUMBER, verbose_name=_('Maximal house number') |
|
360 |
) |
|
361 | ||
362 |
class Meta: |
|
363 |
ordering = ['street_id', 'min_housenumber', 'parity'] |
|
364 | ||
365 |
def __str__(self): |
|
366 |
return '%s, parity:%s, min:%s, max:%s → %s' % ( |
|
367 |
self.street_id, |
|
368 |
dict(PARITY_CHOICES).get(self.parity), |
|
369 |
self.min_housenumber, |
|
370 |
self.max_housenumber, |
|
371 |
self.sector, |
|
372 |
) |
|
373 | ||
374 |
@property |
|
375 |
def resource(self): |
|
376 |
return self.sector.resource |
|
377 | ||
378 |
def clean(self): |
|
379 |
if not self.max_housenumber or self.max_housenumber > MAX_HOUSENUMBER: |
|
380 |
self.max_housenumber = MAX_HOUSENUMBER |
|
381 |
if self.min_housenumber > self.max_housenumber: |
|
382 |
raise ValidationError(_('Minimal house number may not be lesser than maximal house number.')) |
passerelle/settings.py | ||
---|---|---|
158 | 158 |
'passerelle.apps.oxyd', |
159 | 159 |
'passerelle.apps.phonecalls', |
160 | 160 |
'passerelle.apps.photon', |
161 |
'passerelle.apps.sector', |
|
161 | 162 |
'passerelle.apps.solis', |
162 | 163 |
'passerelle.apps.twilio', |
163 | 164 |
'passerelle.apps.vivaticket', |
tests/test_sector.py | ||
---|---|---|
1 |
# passerelle - uniform access to multiple data sources and services |
|
2 |
# Copyright (C) 2021 Entr'ouvert |
|
3 |
# |
|
4 |
# This program is free software: you can redistribute it and/or modify it |
|
5 |
# under the terms of the GNU Affero General Public License as published |
|
6 |
# by the Free Software Foundation, either version 3 of the License, or |
|
7 |
# (at your option) any later version. |
|
8 |
# |
|
9 |
# This program is distributed in the hope that it will be useful, |
|
10 |
# but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
11 |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
12 |
# GNU Affero General Public License for more details. |
|
13 |
# |
|
14 |
# You should have received a copy of the GNU Affero General Public License |
|
15 |
# along with this program. If not, see <http://www.gnu.org/licenses/>. |
|
16 | ||
17 |
import datetime |
|
18 |
import os |
|
19 |
from posix import stat_result |
|
20 |
from stat import ST_MTIME |
|
21 | ||
22 |
import pytest |
|
23 |
import utils |
|
24 |
import webtest |
|
25 |
from django.core.exceptions import ValidationError |
|
26 |
from django.core.files import File |
|
27 |
from django.core.management import call_command |
|
28 |
from django.urls import reverse |
|
29 |
from django.utils.encoding import force_str, force_text |
|
30 |
from django.utils.six import StringIO |
|
31 |
from django.utils.timezone import now |
|
32 |
from test_manager import login |
|
33 | ||
34 |
from passerelle.apps.sector.models import Sectorization, SectorResource |
|
35 |
from passerelle.base.models import AccessRight |
|
36 | ||
37 |
CSV = """street_id,parity,min_housenumber,max_housenumber,sector_id,sector_name |
|
38 |
75114_1912,P,,, gs-moulin, Groupe Scolaire Moulin |
|
39 |
75114_1912,I,0,999999,gs-zola,Groupe Scolaire Zola |
|
40 |
75114_1913,N,0,999999,ecole-hugo,École Hugo |
|
41 |
75114_1914,,,10,ecole-hugo, École Hugo |
|
42 | ||
43 |
75114_1914,,11,, ecole-hugo2, École Hugo 2 |
|
44 |
75114_1915,,,,ecole-hugo2 , École Hugo 2 |
|
45 |
""" |
|
46 | ||
47 |
CSV_BOM = force_str(force_text(CSV, 'utf-8').encode('utf-8-sig')) |
|
48 | ||
49 |
CSV_NO_FIRST_LINE = """75114_1912,P,,, gs-moulin, Groupe Scolaire Moulin |
|
50 |
75114_1912,I,0,999999,gs-zola,Groupe Scolaire Zola |
|
51 |
75114_1913,N,0,999999,ecole-hugo,École Hugo |
|
52 |
75114_1914,,,10,ecole-hugo, École Hugo |
|
53 | ||
54 |
75114_1914,,11,, ecole-hugo, École Hugo""" |
|
55 | ||
56 |
CSV_REORDERED = """sector_id,sector_name,street_id,parity,min_housenumber,max_housenumber,foo,bar |
|
57 |
gs-moulin, Groupe Scolaire Moulin, 75114_1912,P,,,aaa,bbb |
|
58 |
gs-zola,Groupe Scolaire Zola,75114_1912,I,0,999999,xxx,yyy |
|
59 |
ecole-hugo,École Hugo,75114_1913,N,0,999999,000,1 |
|
60 |
,,75114_1999,N,0,999999,, |
|
61 |
""" |
|
62 | ||
63 |
CSV_MISSING_COLUMN = """street_id,min_housenumber,max_housenumber,sector_id,sector_name |
|
64 |
75114_1912,,,foo, |
|
65 |
,0,999999,gs-zola,Groupe Scolaire Zola""" |
|
66 | ||
67 |
CSV_MISSING_SECTOR = """street_id,parity,min_housenumber,max_housenumber,sector_id,sector_name |
|
68 |
75114_1912,P,,, , |
|
69 |
75114_1912,I,0,999999,gs-zola,Groupe Scolaire Zola""" |
|
70 | ||
71 |
CSV_MISSING_STREET = """street_id,parity,min_housenumber,max_housenumber,sector_id,sector_name |
|
72 |
75114_1912,P,,,foo, |
|
73 |
,I,0,999999,gs-zola,Groupe Scolaire Zola""" |
|
74 | ||
75 |
pytestmark = pytest.mark.django_db |
|
76 | ||
77 | ||
78 |
@pytest.fixture |
|
79 |
def sector(db): |
|
80 |
return utils.setup_access_rights( |
|
81 |
SectorResource.objects.create( |
|
82 |
slug='test', |
|
83 |
title='title', |
|
84 |
csv_file=File(StringIO(CSV), 'sectorization.csv'), |
|
85 |
) |
|
86 |
) |
|
87 | ||
88 | ||
89 |
def test_sector_creation(sector): |
|
90 |
assert '%s' % sector == 'title [test]' |
|
91 |
assert sector.sector_set.count() == 4 |
|
92 |
hugo = sector.sector_set.get(slug='ecole-hugo') |
|
93 |
assert Sectorization.objects.filter(sector=hugo).count() == 2 |
|
94 |
hugo2 = sector.sector_set.get(slug='gs-zola') |
|
95 |
assert Sectorization.objects.filter(sector=hugo2).count() == 1 |
|
96 |
sector.clean() |
|
97 |
sector.save() |
|
98 |
assert sector.sector_set.count() == 4 # no change |
|
99 |
# forced reset |
|
100 |
sector.sector_set.all().delete() |
|
101 |
assert sector.sector_set.count() == 0 |
|
102 |
sector.save() |
|
103 |
assert sector.sector_set.count() == 4 |
|
104 | ||
105 | ||
106 |
def test_sector_creation_bom(sector): |
|
107 |
# forced reset |
|
108 |
sector.sector_set.all().delete() |
|
109 |
assert sector.sector_set.count() == 0 |
|
110 |
sector.csv_file = File(StringIO(CSV_BOM), 'sectorization.csv') |
|
111 |
sector.clean() |
|
112 |
sector.save() |
|
113 |
assert sector.sector_set.count() == 4 |
|
114 | ||
115 | ||
116 |
def test_sector_creation_nofirstline(sector): |
|
117 |
sector.csv_file = File(StringIO(CSV_NO_FIRST_LINE), 'sectorization.csv') |
|
118 |
with pytest.raises(ValidationError, match='Invalid CSV file:.*missing column'): |
|
119 |
sector.clean() |
|
120 |
assert sector.sector_set.count() == 4 # nothing change from initial creation |
|
121 |
sector.titles_in_first_line = False |
|
122 |
sector.save() |
|
123 |
assert sector.sector_set.count() == 3 |
|
124 | ||
125 | ||
126 |
def test_sector_reordered(sector): |
|
127 |
assert sector.sector_set.count() == 4 |
|
128 |
sector.csv_file = File(StringIO(CSV_REORDERED), 'sectorization.csv') |
|
129 |
sector.save() |
|
130 |
assert sector.sector_set.count() == 3 |
|
131 | ||
132 | ||
133 |
def test_sector_empty_file(sector): |
|
134 |
sector.csv_file = File(StringIO(''), 'sectorization.csv') |
|
135 |
with pytest.raises(ValidationError, match='Invalid CSV file:.*failed to read CSV'): |
|
136 |
sector.clean() |
|
137 |
with pytest.raises(ValidationError, match='failed to read CSV'): |
|
138 |
sector.save() |
|
139 |
assert sector.sector_set.count() == 4 # nothing change |
|
140 | ||
141 | ||
142 |
def test_sector_missing_sector(sector): |
|
143 |
sector.csv_file = File(StringIO(CSV_MISSING_SECTOR), 'sectorization.csv') |
|
144 |
with pytest.raises(ValidationError, match='Invalid CSV file:.*missing sector_id, line 2'): |
|
145 |
sector.clean() |
|
146 |
with pytest.raises(ValidationError, match='missing sector_id, line 2'): |
|
147 |
sector.save() |
|
148 |
assert sector.sector_set.count() == 4 # nothing change |
|
149 | ||
150 | ||
151 |
def test_sector_missing_street(sector): |
|
152 |
sector.csv_file = File(StringIO(CSV_MISSING_STREET), 'sectorization.csv') |
|
153 |
with pytest.raises(ValidationError, match='Invalid CSV file:.*missing street_id, line 3'): |
|
154 |
sector.clean() |
|
155 |
with pytest.raises(ValidationError, match='missing street_id, line 3'): |
|
156 |
sector.save() |
|
157 |
assert sector.sector_set.count() == 4 # nothing change |
|
158 | ||
159 | ||
160 |
def test_sector_missing_column(sector): |
|
161 |
sector.csv_file = File(StringIO(CSV_MISSING_COLUMN), 'sectorization.csv') |
|
162 |
with pytest.raises(ValidationError, match='Invalid CSV file:.*missing column.*: parity\.'): |
|
163 |
sector.clean() |
|
164 |
with pytest.raises(ValidationError, match='missing column.*: parity\.'): |
|
165 |
sector.save() |
|
166 |
assert sector.sector_set.count() == 4 # nothing change |
|
167 | ||
168 | ||
169 |
def test_sector_endpoint_sectors(app, sector): |
|
170 |
url = reverse( |
|
171 |
'generic-endpoint', |
|
172 |
kwargs={ |
|
173 |
'connector': 'sector', |
|
174 |
'slug': sector.slug, |
|
175 |
'endpoint': 'sectors', |
|
176 |
}, |
|
177 |
) |
|
178 |
result = app.get(url).json |
|
179 |
assert result['err'] == 0 |
|
180 |
assert len(result['data']) == 4 |
|
181 |
assert {'id': 'ecole-hugo', 'text': 'École Hugo'} in result['data'] |
|
182 | ||
183 |
result = app.get(url, params={'id': 'ecole-hugo'}).json |
|
184 |
assert result['err'] == 0 |
|
185 |
assert len(result['data']) == 1 |
|
186 |
assert {'id': 'ecole-hugo', 'text': 'École Hugo'} in result['data'] |
|
187 | ||
188 |
result = app.get(url, params={'q': 'hugo'}).json |
|
189 |
assert result['err'] == 0 |
|
190 |
assert len(result['data']) == 2 |
|
191 |
assert {'id': 'ecole-hugo', 'text': 'École Hugo'} in result['data'] |
|
192 |
assert {'id': 'ecole-hugo2', 'text': 'École Hugo 2'} in result['data'] |
|
193 | ||
194 |
result = app.get(url, params={'q': 'foobar'}).json |
|
195 |
assert result['err'] == 0 |
|
196 |
assert len(result['data']) == 0 |
|
197 | ||
198 |
# search a sector by street and house number |
|
199 |
result = app.get(url, params={'street_id': '75114_1915'}).json |
|
200 |
assert result['err'] == 0 |
|
201 |
assert result['data'] == [{'id': 'ecole-hugo2', 'text': 'École Hugo 2'}] |
|
202 |
result = app.get(url, params={'street_id': '75114_1915', 'house_number': '123'}).json |
|
203 |
assert result['err'] == 0 |
|
204 |
assert result['data'] == [{'id': 'ecole-hugo2', 'text': 'École Hugo 2'}] |
|
205 |
result = app.get(url, params={'street_id': '75114_1912', 'house_number': '12'}).json # even |
|
206 |
assert result['err'] == 0 |
|
207 |
assert result['data'] == [{'id': 'gs-moulin', 'text': 'Groupe Scolaire Moulin'}] |
|
208 |
result = app.get(url, params={'street_id': '75114_1912', 'house_number': '13'}).json # odd |
|
209 |
assert result['err'] == 0 |
|
210 |
assert result['data'] == [{'id': 'gs-zola', 'text': 'Groupe Scolaire Zola'}] |
|
211 |
result = app.get(url, params={'street_id': '75114_1914', 'house_number': '5'}).json # 5 <= 10 |
|
212 |
assert result['err'] == 0 |
|
213 |
assert result['data'] == [{'id': 'ecole-hugo', 'text': 'École Hugo'}] |
|
214 |
result = app.get(url, params={'street_id': '75114_1914', 'house_number': '20'}).json # 20 >= 11 |
|
215 |
assert result['err'] == 0 |
|
216 |
assert result['data'] == [{'id': 'ecole-hugo2', 'text': 'École Hugo 2'}] |
|
217 |
# bad searches |
|
218 |
result = app.get(url, params={'street_id': '75114_1915', 'house_number': 'abc'}, status=400).json |
|
219 |
assert result['err'] == 1 |
|
220 |
assert result['err_desc'] == 'invalid value for parameter "house_number"' # not an integer |
|
221 |
result = app.get(url, params={'house_number': '123'}, status=400).json |
|
222 |
assert result['err'] == 1 |
|
223 |
assert result['err_desc'] == 'house_number requires a street_id' |
|
224 | ||
225 |
# access right is needed |
|
226 |
AccessRight.objects.all().delete() |
|
227 |
result = app.get(url, status=403).json |
|
228 |
assert result['err'] == 1 |
|
229 |
assert 'PermissionDenied' in result['err_class'] |
|
230 |
assert result['data'] is None |
|
231 | ||
232 | ||
233 |
def test_sector_endpoint_export(app, sector): |
|
234 |
url = reverse( |
|
235 |
'generic-endpoint', |
|
236 |
kwargs={ |
|
237 |
'connector': 'sector', |
|
238 |
'slug': sector.slug, |
|
239 |
'endpoint': 'export', |
|
240 |
}, |
|
241 |
) |
|
242 |
resp = app.get(url) |
|
243 |
assert resp.headers['content-type'] == 'text/csv' |
|
244 |
assert resp.text.startswith( |
|
245 |
'"street_id","parity","min_housenumber","max_housenumber","sector_id","sector_name"' |
|
246 |
) |
|
247 |
assert len(resp.text.splitlines()) == 7 |
|
248 |
sector.titles_in_first_line = False |
|
249 |
sector.save(import_csv=False) |
|
250 |
resp = app.get(url) |
|
251 |
assert resp.headers['content-type'] == 'text/csv' |
|
252 |
assert 'street_id' not in resp.text |
|
253 |
assert len(resp.text.splitlines()) == 6 |
|
254 |
sector.titles_in_first_line = True |
|
255 |
sector.save() |
|
256 | ||
257 |
resp = app.get(url) |
|
258 |
assert resp.headers['content-type'] == 'text/csv' |
|
259 |
# "street_id","parity","min_housenumber","max_housenumber","sector_id","sector_name" |
|
260 |
# "75114_1913","all","","","ecole-hugo","École Hugo" |
|
261 |
# "75114_1914","all","","10","","" |
|
262 |
# "75114_1915","all","","","ecole-hugo2","École Hugo 2" |
|
263 |
# "75114_1914","all","11","","","" |
|
264 |
# "75114_1912","odd","","","gs-zola","Groupe Scolaire Zola" |
|
265 |
# "75114_1912","even","","","gs-moulin","Groupe Scolaire Moulin" |
|
266 |
assert len(resp.text.splitlines()) == 7 |
|
267 |
assert resp.text.count('"all"') == 4 |
|
268 |
assert resp.text.count('"odd"') == 1 |
|
269 |
assert resp.text.count('"even"') == 1 |
|
270 |
assert resp.text.count('"ecole-hugo"') == 1 |
|
271 |
assert resp.text.count('"0"') == 0 |
|
272 |
assert resp.text.count('"999999"') == 0 |
|
273 | ||
274 |
# import -> export again |
|
275 |
initial_export = resp.text |
|
276 |
sector.csv_file = File(StringIO(initial_export), 'data.csv') |
|
277 |
sector.save() |
|
278 |
resp = app.get(url) |
|
279 |
assert resp.text == initial_export |
|
280 | ||
281 |
# modify export format |
|
282 |
resp = app.get( |
|
283 |
url, params={'odd': 'IMPAIRS', 'even': 'PAIRS', 'mix': 'TOUS', 'repeat': 'true', 'limits': 'true'} |
|
284 |
) |
|
285 |
assert len(resp.text.splitlines()) == 7 |
|
286 |
assert resp.text.count('"TOUS"') == 4 |
|
287 |
assert resp.text.count('"IMPAIRS"') == 1 |
|
288 |
assert resp.text.count('"PAIRS"') == 1 |
|
289 |
assert resp.text.count('"ecole-hugo"') == 2 # repeat |
|
290 |
assert resp.text.count('"0"') == 5 # limits |
|
291 |
assert resp.text.count('"999999"') == 5 |
|
292 | ||
293 |
# access right is needed |
|
294 |
AccessRight.objects.all().delete() |
|
295 |
result = app.get(url, status=403).json |
|
296 |
assert result['err'] == 1 |
|
297 |
assert 'PermissionDenied' in result['err_class'] |
|
298 |
assert result['data'] is None |
|
299 | ||
300 | ||
301 |
def test_sector_endpoint_update(app, sector): |
|
302 |
url = reverse( |
|
303 |
'generic-endpoint', |
|
304 |
kwargs={ |
|
305 |
'connector': 'sector', |
|
306 |
'slug': sector.slug, |
|
307 |
'endpoint': 'update', |
|
308 |
}, |
|
309 |
) |
|
310 |
assert sector.sector_set.count() == 4 |
|
311 |
result = app.put(url, params=CSV_REORDERED, headers={'Content-Type': 'text/csv'}).json |
|
312 |
assert sector.sector_set.count() == 3 |
|
313 |
assert result['err'] == 0 |
|
314 |
assert len(result['data']) == 3 |
|
315 |
assert result['updated'] == 'sector/test/api-uploaded-file.csv' |
|
316 | ||
317 |
result = app.put(url, params=CSV_MISSING_COLUMN, headers={'Content-Type': 'text/csv'}, status=400).json |
|
318 |
assert result['err'] == 1 |
|
319 |
assert "missing column" in result['err_desc'] |
|
320 | ||
321 |
result = app.put(url, params=CSV_REORDERED, headers={}, status=400).json |
|
322 |
assert result['err'] == 1 |
|
323 |
assert "can't guess filename extension" in result['err_desc'] |
|
324 | ||
325 |
# access right is needed |
|
326 |
AccessRight.objects.all().delete() |
|
327 |
result = app.put(url, params=CSV_REORDERED, headers={'Content-Type': 'text/csv'}, status=403).json |
|
328 |
assert result['err'] == 1 |
|
329 |
assert 'PermissionDenied' in result['err_class'] |
|
330 | ||
331 | ||
332 |
@pytest.mark.parametrize('remove_files', [False, True]) |
|
333 |
def test_daily_clean(settings, remove_files, sector): |
|
334 |
settings.SECTOR_REMOVE_ON_CLEAN = remove_files |
|
335 | ||
336 |
sectordata_dir = os.path.dirname(sector.csv_file.path) |
|
337 |
other_dir = os.path.join(settings.MEDIA_ROOT, 'foo', sector.slug) |
|
338 |
os.makedirs(other_dir) |
|
339 | ||
340 |
# create additional file in sector dir |
|
341 |
with open(os.path.join(sectordata_dir, 'csv-file.csv'), 'w'): |
|
342 |
pass |
|
343 |
os.makedirs(os.path.join(sectordata_dir, 'not-a-file')) |
|
344 |
# create additional file in other dir |
|
345 |
with open(os.path.join(other_dir, 'csv-file.csv'), 'w'): |
|
346 |
pass |
|
347 | ||
348 |
call_command('cron', 'daily') |
|
349 | ||
350 |
# not changed |
|
351 |
assert os.listdir(other_dir) == ['csv-file.csv'] |
|
352 |
# too soon to be removed |
|
353 |
dir_list = os.listdir(sectordata_dir) |
|
354 |
dir_list.sort() |
|
355 |
assert dir_list == ['csv-file.csv', 'not-a-file', 'sectorization.csv'] |
|
356 | ||
357 |
orig_os_stat = os.stat |
|
358 | ||
359 |
def _fake_stat(arg, delta): |
|
360 |
faked = list(orig_os_stat(arg)) |
|
361 |
faked[ST_MTIME] = (now() + delta).timestamp() |
|
362 |
return stat_result(faked) |
|
363 | ||
364 |
try: |
|
365 |
# 1 week ago but one minute too soon |
|
366 |
os.stat = lambda arg: _fake_stat(arg, datetime.timedelta(days=-7, minutes=1)) |
|
367 |
call_command('cron', 'daily') |
|
368 | ||
369 |
# not changed |
|
370 |
assert os.listdir(other_dir) == ['csv-file.csv'] |
|
371 |
# still too soon to be removed |
|
372 |
dir_list = os.listdir(sectordata_dir) |
|
373 |
dir_list.sort() |
|
374 |
assert dir_list == ['csv-file.csv', 'not-a-file', 'sectorization.csv'] |
|
375 | ||
376 |
# 1 week ago |
|
377 |
os.stat = lambda arg: _fake_stat(arg, datetime.timedelta(days=-7)) |
|
378 |
call_command('cron', 'daily') |
|
379 | ||
380 |
# not changed |
|
381 |
assert os.listdir(other_dir) == ['csv-file.csv'] |
|
382 |
# removed or moved |
|
383 |
dir_list = os.listdir(sectordata_dir) |
|
384 |
dir_list.sort() |
|
385 |
if remove_files: |
|
386 |
assert dir_list == ['not-a-file', 'sectorization.csv'] |
|
387 |
else: |
|
388 |
assert dir_list == ['not-a-file', 'sectorization.csv', 'unused-files'] |
|
389 |
assert os.listdir(os.path.join(sectordata_dir, 'unused-files')) == ['csv-file.csv'] |
|
390 | ||
391 |
# wrong storage directory, do nothing |
|
392 |
with open(os.path.join(other_dir, 'bar.csv'), 'w'): |
|
393 |
pass |
|
394 |
sector.csv_file.name = 'foo/%s/csv-file.csv' % sector.slug |
|
395 |
sector.save() |
|
396 |
assert sorted(os.listdir(other_dir)) == ['bar.csv', 'csv-file.csv'] |
|
397 |
call_command('cron', 'daily') |
|
398 |
assert sorted(os.listdir(other_dir)) == ['bar.csv', 'csv-file.csv'] |
|
399 | ||
400 |
# unknown file |
|
401 |
sector.csv_file.name = 'sector/%s/bar.csv' % sector.slug |
|
402 |
sector.save() |
|
403 |
call_command('cron', 'daily') |
|
404 |
finally: |
|
405 |
os.stat = orig_os_stat |
|
406 | ||
407 | ||
408 |
def test_sector_manage_create(app, admin_user): |
|
409 |
app = login(app) |
|
410 |
response = app.get(reverse('create-connector', kwargs={'connector': 'sector'})) |
|
411 |
response.form.set('title', 'test title') |
|
412 |
response.form.set('slug', 'test-slug') |
|
413 |
response.form.set('description', 'test description') |
|
414 |
response.form.set('csv_file', webtest.Upload('test.csv', CSV.encode('utf-8'), 'application/octet-stream')) |
|
415 |
response = response.form.submit() |
|
416 |
assert response.location |
|
417 |
response = response.follow() |
|
418 |
assert 'test title' in response |
|
419 |
assert 'test description' in response |
|
420 |
assert SectorResource.objects.count() == 1 |
|
421 |
resource = SectorResource.objects.get() |
|
422 |
assert resource.title == 'test title' |
|
423 |
assert resource.slug == 'test-slug' |
|
424 |
assert resource.description == 'test description' |
|
425 |
assert resource.csv_file.read() == CSV.encode('utf-8') |
|
426 |
assert resource.sector_set.count() == 4 |
|
0 |
- |