Projet

Général

Profil

0001-add-sector-connector-56001.patch

Thomas Noël, 30 août 2021 22:42

Télécharger (39,3 ko)

Voir les différences:

Subject: [PATCH 1/2] add sector connector (#56001)

 passerelle/apps/sector/__init__.py            |   0
 passerelle/apps/sector/admin.py               |  42 ++
 .../apps/sector/migrations/0001_initial.py    | 116 +++++
 passerelle/apps/sector/migrations/__init__.py |   0
 passerelle/apps/sector/models.py              | 382 ++++++++++++++++
 passerelle/settings.py                        |   1 +
 tests/test_sector.py                          | 426 ++++++++++++++++++
 7 files changed, 967 insertions(+)
 create mode 100644 passerelle/apps/sector/__init__.py
 create mode 100644 passerelle/apps/sector/admin.py
 create mode 100644 passerelle/apps/sector/migrations/0001_initial.py
 create mode 100644 passerelle/apps/sector/migrations/__init__.py
 create mode 100644 passerelle/apps/sector/models.py
 create mode 100644 tests/test_sector.py
passerelle/apps/sector/admin.py
1
# passerelle - uniform access to multiple data sources and services
2
# Copyright (C) 2021  Entr'ouvert
3
#
4
# This program is free software: you can redistribute it and/or modify it
5
# under the terms of the GNU Affero General Public License as published
6
# by the Free Software Foundation, either version 3 of the License, or
7
# (at your option) any later version.
8
#
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
# GNU Affero General Public License for more details.
13
#
14
# You should have received a copy of the GNU Affero General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

  
17
from django.contrib import admin
18

  
19
from passerelle.apps.sector.models import Sector, Sectorization
20

  
21

  
22
class SectorAdmin(admin.ModelAdmin):
23
    prepopulated_fields = {'slug': ('title',)}
24
    list_display = ('title', 'slug', 'resource')
25
    list_filter = ('resource',)
26

  
27

  
28
class SectorizationAdmin(admin.ModelAdmin):
29
    list_display = (
30
        'id',
31
        'street_id',
32
        'parity',
33
        'min_housenumber',
34
        'max_housenumber',
35
        'sector',
36
        'resource',
37
    )
38
    list_filter = ('sector__resource',)
39

  
40

  
41
admin.site.register(Sector, SectorAdmin)
42
admin.site.register(Sectorization, SectorizationAdmin)
passerelle/apps/sector/migrations/0001_initial.py
1
# Generated by Django 2.2.19 on 2021-08-15 19:40
2

  
3
import django.db.models.deletion
4
from django.db import migrations, models
5

  
6
import passerelle.apps.sector.models
7

  
8

  
9
class Migration(migrations.Migration):
10

  
11
    initial = True
12

  
13
    dependencies = [
14
        ('base', '0029_auto_20210202_1627'),
15
    ]
16

  
17
    operations = [
18
        migrations.CreateModel(
19
            name='Sector',
20
            fields=[
21
                (
22
                    'id',
23
                    models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID'),
24
                ),
25
                ('title', models.CharField(max_length=256, verbose_name='Title')),
26
                ('slug', models.CharField(max_length=128, verbose_name='Identifier')),
27
            ],
28
            options={
29
                'ordering': ['resource', 'slug'],
30
            },
31
        ),
32
        migrations.CreateModel(
33
            name='SectorResource',
34
            fields=[
35
                (
36
                    'id',
37
                    models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID'),
38
                ),
39
                ('title', models.CharField(max_length=50, verbose_name='Title')),
40
                ('slug', models.SlugField(unique=True, verbose_name='Identifier')),
41
                ('description', models.TextField(verbose_name='Description')),
42
                (
43
                    'csv_file',
44
                    models.FileField(
45
                        help_text='CSV file',
46
                        upload_to=passerelle.apps.sector.models.upload_to,
47
                        verbose_name='Sectorization file',
48
                    ),
49
                ),
50
                (
51
                    'titles_in_first_line',
52
                    models.BooleanField(
53
                        default=True,
54
                        help_text='If not, column titles are: street_id,parity,min_housenumber,max_housenumber,sector_id,sector_name, …',
55
                        verbose_name='First line defines column titles',
56
                    ),
57
                ),
58
                (
59
                    'users',
60
                    models.ManyToManyField(
61
                        blank=True,
62
                        related_name='_sectorresource_users_+',
63
                        related_query_name='+',
64
                        to='base.ApiUser',
65
                    ),
66
                ),
67
            ],
68
            options={
69
                'abstract': False,
70
            },
71
        ),
72
        migrations.CreateModel(
73
            name='Sectorization',
74
            fields=[
75
                (
76
                    'id',
77
                    models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID'),
78
                ),
79
                ('street_id', models.CharField(max_length=64, verbose_name='Street Identifier')),
80
                (
81
                    'parity',
82
                    models.SmallIntegerField(
83
                        choices=[(0, 'all'), (1, 'odd'), (2, 'even')],
84
                        default=0,
85
                        verbose_name='Parity of numbers',
86
                    ),
87
                ),
88
                (
89
                    'min_housenumber',
90
                    models.PositiveIntegerField(default=0, verbose_name='Minimal house number'),
91
                ),
92
                (
93
                    'max_housenumber',
94
                    models.PositiveIntegerField(default=999999, verbose_name='Maximal house number'),
95
                ),
96
                (
97
                    'sector',
98
                    models.ForeignKey(
99
                        on_delete=django.db.models.deletion.CASCADE, to='sector.Sector', verbose_name='Sector'
100
                    ),
101
                ),
102
            ],
103
            options={
104
                'ordering': ['street_id', 'min_housenumber', 'parity'],
105
            },
106
        ),
107
        migrations.AddField(
108
            model_name='sector',
109
            name='resource',
110
            field=models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='sector.SectorResource'),
111
        ),
112
        migrations.AlterUniqueTogether(
113
            name='sector',
114
            unique_together={('resource', 'slug')},
115
        ),
116
    ]
passerelle/apps/sector/models.py
1
# passerelle - uniform access to multiple data sources and services
2
# Copyright (C) 2021  Entr'ouvert
3
#
4
# This program is free software: you can redistribute it and/or modify it
5
# under the terms of the GNU Affero General Public License as published
6
# by the Free Software Foundation, either version 3 of the License, or
7
# (at your option) any later version.
8
#
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
# GNU Affero General Public License for more details.
13
#
14
# You should have received a copy of the GNU Affero General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

  
17
import csv
18
import datetime
19
import mimetypes
20
import os
21

  
22
from django.conf import settings
23
from django.core.exceptions import ValidationError
24
from django.core.files.base import ContentFile
25
from django.db import models, transaction
26
from django.db.models import Q
27
from django.http import HttpResponse
28
from django.utils.encoding import force_bytes, force_str, smart_text
29
from django.utils.timezone import now
30
from django.utils.translation import ugettext_lazy as _
31

  
32
from passerelle.base.models import BaseResource
33
from passerelle.utils.api import endpoint
34
from passerelle.utils.jsonresponse import APIError
35

  
36
PARITY_ALL = 0
37
PARITY_ODD = 1
38
PARITY_EVEN = 2
39

  
40
PARITY_CHOICES = (
41
    (PARITY_ALL, _('all')),
42
    (PARITY_ODD, _('odd')),
43
    (PARITY_EVEN, _('even')),
44
)
45

  
46
CSV_TITLES = ['street_id', 'parity', 'min_housenumber', 'max_housenumber', 'sector_id', 'sector_name']
47

  
48
MAX_HOUSENUMBER = 999_999
49

  
50

  
51
def upload_to(instance, filename):
52
    return '%s/%s/%s' % (instance.get_connector_slug(), instance.slug, filename)
53

  
54

  
55
class SectorResource(BaseResource):
56
    csv_file = models.FileField(
57
        _('Sectorization file'),
58
        upload_to=upload_to,
59
        help_text=_('CSV file'),
60
    )
61
    titles_in_first_line = models.BooleanField(
62
        _('First line defines column titles'),
63
        default=True,
64
        help_text=_('If not, column titles are: %s, …') % ','.join(CSV_TITLES),
65
    )
66

  
67
    category = _('Geographic information system')
68
    hide_description_fields = ['csv_file']
69

  
70
    def __str__(self):
71
        return '%s [%s]' % (self.title, self.slug)
72

  
73
    def daily(self):
74
        super().daily()
75
        self.clean_old_csv_files()
76

  
77
    def clean_old_csv_files(self):
78
        if not os.path.exists(self.csv_file.path):
79
            return
80
        base_dir = os.path.dirname(self.csv_file.path)
81
        if os.path.dirname(self.csv_file.name) != os.path.join(self.get_connector_slug(), self.slug):
82
            # path is not compliant with upload_to, do nothing
83
            return
84

  
85
        for filename in os.listdir(base_dir):
86
            filepath = os.path.join(base_dir, filename)
87
            if not os.path.isfile(filepath):
88
                continue
89
            if os.path.basename(self.csv_file.name) == filename:
90
                # current file
91
                continue
92
            mtime = os.stat(filepath).st_mtime
93
            if mtime > (now() + datetime.timedelta(days=-7)).timestamp():
94
                # too young
95
                continue
96

  
97
            if getattr(settings, 'SECTOR_REMOVE_ON_CLEAN', False) is True:
98
                # remove
99
                os.unlink(filepath)
100
            else:
101
                # move file in unused-files dir
102
                unused_dir = os.path.join(base_dir, 'unused-files')
103
                os.makedirs(unused_dir, exist_ok=True)
104
                os.rename(filepath, os.path.join(unused_dir, filename))
105

  
106
    def clean(self, *args, **kwargs):
107
        try:
108
            self.import_csv(validate_only=True)
109
        except Exception as e:
110
            raise ValidationError(_('Invalid CSV file: %s') % e)
111
        return super().clean(*args, **kwargs)
112

  
113
    def save(self, *args, **kwargs):
114
        import_csv = kwargs.pop('import_csv', True)
115
        result = super().save(*args, **kwargs)
116
        if import_csv:
117
            self.import_csv()
118
        return result
119

  
120
    @transaction.atomic
121
    def import_csv(self, validate_only=False):
122
        try:
123
            self.csv_file.seek(0)
124
            content = force_bytes(self.csv_file.read())
125
            content = force_str(content.decode('utf-8-sig', 'ignore').encode('utf-8'))  # handle BOM
126
            dialect = csv.Sniffer().sniff(content)
127
            reader = csv.reader(content.splitlines(), dialect)
128
        except Exception as e:
129
            raise ValidationError(_('failed to read CSV (%s)') % e)
130
        if self.titles_in_first_line:
131
            first_line = next(reader)
132
            titles = [name.strip().lower() for name in first_line]
133
            if not set(CSV_TITLES).issubset(titles):
134
                raise ValidationError(
135
                    _('missing column(s) in header: %s.') % ', '.join(set(CSV_TITLES) - set(titles))
136
                )
137
        else:
138
            titles = CSV_TITLES
139
        indexes = [titles.index(t) for t in titles if t]
140
        captions = [titles[i] for i in indexes]
141

  
142
        # now ready to import data, first delete all sectors
143
        # sectorization will also be delete (cascade)
144
        # (will be cancelled by transaction on any import error)
145
        if not validate_only:
146
            self.sector_set.all().delete()
147

  
148
        def get_cell(row, index):
149
            try:
150
                return row[index]
151
            except IndexError:
152
                return ''
153

  
154
        sector_id = sector_name = None
155
        sectors = {}
156
        for row in reader:
157
            if not row:
158
                continue  # do not consider empty lines
159
            row = [smart_text(x).strip() for x in row]
160
            row = {caption: get_cell(row, index) for caption, index in zip(captions, indexes)}
161

  
162
            if row['sector_id']:
163
                sector_id = row['sector_id']
164
                sector_name = row['sector_name'] or row['sector_id']
165
            if not sector_id:
166
                raise ValidationError(_('missing sector_id, line %s') % reader.line_num)
167
            if not row['street_id']:
168
                raise ValidationError(_('missing street_id, line %s') % reader.line_num)
169
            if not row['parity']:
170
                parity = PARITY_ALL
171
            elif row['parity'].lower()[0] in (str(PARITY_EVEN), 'e', 'p'):  # p = pair (even, in french)
172
                parity = PARITY_EVEN
173
            elif row['parity'].lower()[0] in (str(PARITY_ODD), 'o', 'i'):  # i = impair (odd, in french)
174
                parity = PARITY_ODD
175
            else:
176
                parity = PARITY_ALL
177
            try:
178
                min_housenumber = int(row['min_housenumber'])
179
            except (ValueError, TypeError):
180
                min_housenumber = 0
181
            try:
182
                max_housenumber = int(row['max_housenumber'])
183
            except (ValueError, TypeError):
184
                max_housenumber = MAX_HOUSENUMBER
185

  
186
            if not validate_only:
187
                if sector_id not in sectors:
188
                    sectors[sector_id] = self.sector_set.create(
189
                        resource=self, slug=sector_id, title=sector_name
190
                    )
191
                sectors[sector_id].sectorization_set.create(
192
                    sector=sectors[sector_id],
193
                    street_id=row['street_id'],
194
                    parity=parity,
195
                    min_housenumber=min_housenumber,
196
                    max_housenumber=max_housenumber,
197
                )
198

  
199
    @endpoint(
200
        description=_('Update sectorization with a CSV file'),
201
        display_category=_('Management'),
202
        perm='can_access',
203
        methods=['put'],
204
    )
205
    def update(self, request):
206
        ext = mimetypes.guess_extension(request.content_type)
207
        if not ext:
208
            raise APIError(
209
                "can't guess filename extension for '%s' content type" % request.content_type, http_status=400
210
            )
211
        name = self.csv_file.storage.get_available_name('api-uploaded-file' + ext)
212
        self.csv_file = ContentFile(content=request.body, name=name)
213
        try:
214
            self.clean()
215
        except ValidationError as e:
216
            raise APIError(e, http_status=400)
217
        self.save()
218
        return {
219
            'updated': self.csv_file.name,
220
            'data': [
221
                {
222
                    'id': sector.slug,
223
                    'text': sector.title,
224
                }
225
                for sector in self.sector_set.all()
226
            ],
227
        }
228

  
229
    @endpoint(
230
        description=_('Get sectorization as a CSV file'),
231
        display_category=_('Management'),
232
        perm='can_access',
233
        parameters={
234
            'even': {
235
                'description': _('Even numbers indicator (default: %s)') % PARITY_EVEN,
236
                'example_value': 'P',
237
            },
238
            'odd': {
239
                'description': _('Odd numbers indicator (default: %s)') % PARITY_ODD,
240
                'example_value': 'I',
241
            },
242
            'mix': {
243
                'description': _('Even or odd numbers indicator (default: %s)') % PARITY_ALL,
244
                'example_value': '',
245
            },
246
            'limits': {'description': _('Show housenumber min/max (0/%s)') % MAX_HOUSENUMBER, 'type': 'bool'},
247
            'repeat': {'description': _('Repeat sector id and name on all lines'), 'type': 'bool'},
248
        },
249
    )
250
    def export(self, request, even=None, odd=None, mix=None, limits=False, repeat=False):
251
        response = HttpResponse(content_type='text/csv')
252
        date = now().strftime('%Y-%m-%d_%H:%M')
253
        response['Content-Disposition'] = 'attachment; filename="sector-%s-%s.csv"' % (self.slug, date)
254
        writer = csv.writer(response, delimiter=',', quotechar='"', quoting=csv.QUOTE_ALL)
255
        if self.titles_in_first_line:
256
            writer.writerow(CSV_TITLES)
257
        parity = dict(PARITY_CHOICES)
258
        if even is not None:
259
            parity[PARITY_EVEN] = even
260
        if odd is not None:
261
            parity[PARITY_ODD] = odd
262
        if mix is not None:
263
            parity[PARITY_ALL] = mix
264
        for sector in self.sector_set.all().order_by('slug'):
265
            first = True
266
            for sectorization in sector.sectorization_set.all().order_by('street_id'):
267
                writer.writerow(
268
                    [
269
                        sectorization.street_id,
270
                        parity.get(sectorization.parity, mix),
271
                        sectorization.min_housenumber if (limits or sectorization.min_housenumber) else '',
272
                        sectorization.max_housenumber
273
                        if (limits or sectorization.max_housenumber < MAX_HOUSENUMBER)
274
                        else '',
275
                        sector.slug if (repeat or first) else '',
276
                        sector.title if (repeat or first) else '',
277
                    ]
278
                )
279
                first = False
280
        return response
281

  
282
    @endpoint(
283
        name='sectors',
284
        description=_('List of Sectors'),
285
        perm='can_access',
286
        display_category=_('Data sources'),
287
        parameters={
288
            'id': {'description': _('Sector identifier (slug)')},
289
            'q': {'description': _('Filter by Sector Title or Identifier')},
290
            'street_id': {'description': _('Get sectors for this Street identifier')},
291
            'house_number': {
292
                'description': _('Get sectors by this House Number (requires a street_id)'),
293
                'type': 'integer',
294
            },
295
        },
296
    )
297
    def sectors(self, request, q=None, id=None, street_id=None, house_number=None):
298
        if house_number and not street_id:
299
            raise APIError('house_number requires a street_id', http_status=400)
300

  
301
        # search by street and house number
302
        if street_id:
303
            query = Sectorization.objects.filter(sector__resource=self, street_id=street_id)
304
            if house_number is not None:
305
                house_number = int(house_number)
306
                query = query.filter(min_housenumber__lte=house_number, max_housenumber__gte=house_number)
307
                parity = PARITY_ODD if house_number % 2 else PARITY_EVEN
308
                query = query.filter(Q(parity=PARITY_ALL) | Q(parity=parity))
309
            else:
310
                query = query.filter(parity=PARITY_ALL, min_housenumber=0, max_housenumber=MAX_HOUSENUMBER)
311
            return {
312
                'data': [
313
                    {
314
                        'id': sectorization.sector.slug,
315
                        'text': sectorization.sector.title,
316
                    }
317
                ]
318
                for sectorization in query.reverse()
319
            }
320

  
321
        # list of sectors
322
        query = self.sector_set.all()
323
        if id is not None:
324
            query = query.filter(slug=id)
325
        elif q is not None:
326
            query = query.filter(Q(slug__icontains=q) | Q(title__icontains=q))
327
        return {
328
            'data': [
329
                {
330
                    'id': sector.slug,
331
                    'text': sector.title,
332
                }
333
                for sector in query
334
            ]
335
        }
336

  
337

  
338
class Sector(models.Model):
339
    resource = models.ForeignKey(SectorResource, on_delete=models.CASCADE)
340
    title = models.CharField(max_length=256, verbose_name=_('Title'))
341
    slug = models.CharField(max_length=128, verbose_name=_('Identifier'))
342

  
343
    class Meta:
344
        ordering = ['resource', 'slug']
345
        unique_together = ('resource', 'slug')
346

  
347
    def __str__(self):
348
        return '%s > %s [%s]' % (self.resource, self.title, self.slug)
349

  
350

  
351
class Sectorization(models.Model):
352
    sector = models.ForeignKey(Sector, on_delete=models.CASCADE, verbose_name=_('Sector'))
353
    street_id = models.CharField(max_length=64, verbose_name=_('Street Identifier'))
354
    parity = models.SmallIntegerField(
355
        choices=PARITY_CHOICES, default=PARITY_ALL, verbose_name=_('Parity of numbers')
356
    )
357
    min_housenumber = models.PositiveIntegerField(default=0, verbose_name=_('Minimal house number'))
358
    max_housenumber = models.PositiveIntegerField(
359
        default=MAX_HOUSENUMBER, verbose_name=_('Maximal house number')
360
    )
361

  
362
    class Meta:
363
        ordering = ['street_id', 'min_housenumber', 'parity']
364

  
365
    def __str__(self):
366
        return '%s, parity:%s, min:%s, max:%s → %s' % (
367
            self.street_id,
368
            dict(PARITY_CHOICES).get(self.parity),
369
            self.min_housenumber,
370
            self.max_housenumber,
371
            self.sector,
372
        )
373

  
374
    @property
375
    def resource(self):
376
        return self.sector.resource
377

  
378
    def clean(self):
379
        if not self.max_housenumber or self.max_housenumber > MAX_HOUSENUMBER:
380
            self.max_housenumber = MAX_HOUSENUMBER
381
        if self.min_housenumber > self.max_housenumber:
382
            raise ValidationError(_('Minimal house number may not be lesser than maximal house number.'))
passerelle/settings.py
158 158
    'passerelle.apps.oxyd',
159 159
    'passerelle.apps.phonecalls',
160 160
    'passerelle.apps.photon',
161
    'passerelle.apps.sector',
161 162
    'passerelle.apps.solis',
162 163
    'passerelle.apps.twilio',
163 164
    'passerelle.apps.vivaticket',
tests/test_sector.py
1
# passerelle - uniform access to multiple data sources and services
2
# Copyright (C) 2021 Entr'ouvert
3
#
4
# This program is free software: you can redistribute it and/or modify it
5
# under the terms of the GNU Affero General Public License as published
6
# by the Free Software Foundation, either version 3 of the License, or
7
# (at your option) any later version.
8
#
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
# GNU Affero General Public License for more details.
13
#
14
# You should have received a copy of the GNU Affero General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

  
17
import datetime
18
import os
19
from posix import stat_result
20
from stat import ST_MTIME
21

  
22
import pytest
23
import utils
24
import webtest
25
from django.core.exceptions import ValidationError
26
from django.core.files import File
27
from django.core.management import call_command
28
from django.urls import reverse
29
from django.utils.encoding import force_str, force_text
30
from django.utils.six import StringIO
31
from django.utils.timezone import now
32
from test_manager import login
33

  
34
from passerelle.apps.sector.models import Sectorization, SectorResource
35
from passerelle.base.models import AccessRight
36

  
37
CSV = """street_id,parity,min_housenumber,max_housenumber,sector_id,sector_name
38
75114_1912,P,,, gs-moulin, Groupe Scolaire Moulin
39
75114_1912,I,0,999999,gs-zola,Groupe Scolaire Zola
40
75114_1913,N,0,999999,ecole-hugo,École Hugo
41
75114_1914,,,10,ecole-hugo, École Hugo
42

  
43
75114_1914,,11,, ecole-hugo2, École Hugo 2
44
75114_1915,,,,ecole-hugo2 , École Hugo 2
45
"""
46

  
47
CSV_BOM = force_str(force_text(CSV, 'utf-8').encode('utf-8-sig'))
48

  
49
CSV_NO_FIRST_LINE = """75114_1912,P,,, gs-moulin, Groupe Scolaire Moulin
50
75114_1912,I,0,999999,gs-zola,Groupe Scolaire Zola
51
75114_1913,N,0,999999,ecole-hugo,École Hugo
52
75114_1914,,,10,ecole-hugo, École Hugo
53

  
54
75114_1914,,11,, ecole-hugo, École Hugo"""
55

  
56
CSV_REORDERED = """sector_id,sector_name,street_id,parity,min_housenumber,max_housenumber,foo,bar
57
gs-moulin, Groupe Scolaire Moulin, 75114_1912,P,,,aaa,bbb
58
gs-zola,Groupe Scolaire Zola,75114_1912,I,0,999999,xxx,yyy
59
ecole-hugo,École Hugo,75114_1913,N,0,999999,000,1
60
,,75114_1999,N,0,999999,,
61
"""
62

  
63
CSV_MISSING_COLUMN = """street_id,min_housenumber,max_housenumber,sector_id,sector_name
64
75114_1912,,,foo,
65
,0,999999,gs-zola,Groupe Scolaire Zola"""
66

  
67
CSV_MISSING_SECTOR = """street_id,parity,min_housenumber,max_housenumber,sector_id,sector_name
68
75114_1912,P,,, ,
69
75114_1912,I,0,999999,gs-zola,Groupe Scolaire Zola"""
70

  
71
CSV_MISSING_STREET = """street_id,parity,min_housenumber,max_housenumber,sector_id,sector_name
72
75114_1912,P,,,foo,
73
,I,0,999999,gs-zola,Groupe Scolaire Zola"""
74

  
75
pytestmark = pytest.mark.django_db
76

  
77

  
78
@pytest.fixture
79
def sector(db):
80
    return utils.setup_access_rights(
81
        SectorResource.objects.create(
82
            slug='test',
83
            title='title',
84
            csv_file=File(StringIO(CSV), 'sectorization.csv'),
85
        )
86
    )
87

  
88

  
89
def test_sector_creation(sector):
90
    assert '%s' % sector == 'title [test]'
91
    assert sector.sector_set.count() == 4
92
    hugo = sector.sector_set.get(slug='ecole-hugo')
93
    assert Sectorization.objects.filter(sector=hugo).count() == 2
94
    hugo2 = sector.sector_set.get(slug='gs-zola')
95
    assert Sectorization.objects.filter(sector=hugo2).count() == 1
96
    sector.clean()
97
    sector.save()
98
    assert sector.sector_set.count() == 4  # no change
99
    # forced reset
100
    sector.sector_set.all().delete()
101
    assert sector.sector_set.count() == 0
102
    sector.save()
103
    assert sector.sector_set.count() == 4
104

  
105

  
106
def test_sector_creation_bom(sector):
107
    # forced reset
108
    sector.sector_set.all().delete()
109
    assert sector.sector_set.count() == 0
110
    sector.csv_file = File(StringIO(CSV_BOM), 'sectorization.csv')
111
    sector.clean()
112
    sector.save()
113
    assert sector.sector_set.count() == 4
114

  
115

  
116
def test_sector_creation_nofirstline(sector):
117
    sector.csv_file = File(StringIO(CSV_NO_FIRST_LINE), 'sectorization.csv')
118
    with pytest.raises(ValidationError, match='Invalid CSV file:.*missing column'):
119
        sector.clean()
120
    assert sector.sector_set.count() == 4  # nothing change from initial creation
121
    sector.titles_in_first_line = False
122
    sector.save()
123
    assert sector.sector_set.count() == 3
124

  
125

  
126
def test_sector_reordered(sector):
127
    assert sector.sector_set.count() == 4
128
    sector.csv_file = File(StringIO(CSV_REORDERED), 'sectorization.csv')
129
    sector.save()
130
    assert sector.sector_set.count() == 3
131

  
132

  
133
def test_sector_empty_file(sector):
134
    sector.csv_file = File(StringIO(''), 'sectorization.csv')
135
    with pytest.raises(ValidationError, match='Invalid CSV file:.*failed to read CSV'):
136
        sector.clean()
137
    with pytest.raises(ValidationError, match='failed to read CSV'):
138
        sector.save()
139
    assert sector.sector_set.count() == 4  # nothing change
140

  
141

  
142
def test_sector_missing_sector(sector):
143
    sector.csv_file = File(StringIO(CSV_MISSING_SECTOR), 'sectorization.csv')
144
    with pytest.raises(ValidationError, match='Invalid CSV file:.*missing sector_id, line 2'):
145
        sector.clean()
146
    with pytest.raises(ValidationError, match='missing sector_id, line 2'):
147
        sector.save()
148
    assert sector.sector_set.count() == 4  # nothing change
149

  
150

  
151
def test_sector_missing_street(sector):
152
    sector.csv_file = File(StringIO(CSV_MISSING_STREET), 'sectorization.csv')
153
    with pytest.raises(ValidationError, match='Invalid CSV file:.*missing street_id, line 3'):
154
        sector.clean()
155
    with pytest.raises(ValidationError, match='missing street_id, line 3'):
156
        sector.save()
157
    assert sector.sector_set.count() == 4  # nothing change
158

  
159

  
160
def test_sector_missing_column(sector):
161
    sector.csv_file = File(StringIO(CSV_MISSING_COLUMN), 'sectorization.csv')
162
    with pytest.raises(ValidationError, match='Invalid CSV file:.*missing column.*: parity\.'):
163
        sector.clean()
164
    with pytest.raises(ValidationError, match='missing column.*: parity\.'):
165
        sector.save()
166
    assert sector.sector_set.count() == 4  # nothing change
167

  
168

  
169
def test_sector_endpoint_sectors(app, sector):
170
    url = reverse(
171
        'generic-endpoint',
172
        kwargs={
173
            'connector': 'sector',
174
            'slug': sector.slug,
175
            'endpoint': 'sectors',
176
        },
177
    )
178
    result = app.get(url).json
179
    assert result['err'] == 0
180
    assert len(result['data']) == 4
181
    assert {'id': 'ecole-hugo', 'text': 'École Hugo'} in result['data']
182

  
183
    result = app.get(url, params={'id': 'ecole-hugo'}).json
184
    assert result['err'] == 0
185
    assert len(result['data']) == 1
186
    assert {'id': 'ecole-hugo', 'text': 'École Hugo'} in result['data']
187

  
188
    result = app.get(url, params={'q': 'hugo'}).json
189
    assert result['err'] == 0
190
    assert len(result['data']) == 2
191
    assert {'id': 'ecole-hugo', 'text': 'École Hugo'} in result['data']
192
    assert {'id': 'ecole-hugo2', 'text': 'École Hugo 2'} in result['data']
193

  
194
    result = app.get(url, params={'q': 'foobar'}).json
195
    assert result['err'] == 0
196
    assert len(result['data']) == 0
197

  
198
    # search a sector by street and house number
199
    result = app.get(url, params={'street_id': '75114_1915'}).json
200
    assert result['err'] == 0
201
    assert result['data'] == [{'id': 'ecole-hugo2', 'text': 'École Hugo 2'}]
202
    result = app.get(url, params={'street_id': '75114_1915', 'house_number': '123'}).json
203
    assert result['err'] == 0
204
    assert result['data'] == [{'id': 'ecole-hugo2', 'text': 'École Hugo 2'}]
205
    result = app.get(url, params={'street_id': '75114_1912', 'house_number': '12'}).json  # even
206
    assert result['err'] == 0
207
    assert result['data'] == [{'id': 'gs-moulin', 'text': 'Groupe Scolaire Moulin'}]
208
    result = app.get(url, params={'street_id': '75114_1912', 'house_number': '13'}).json  # odd
209
    assert result['err'] == 0
210
    assert result['data'] == [{'id': 'gs-zola', 'text': 'Groupe Scolaire Zola'}]
211
    result = app.get(url, params={'street_id': '75114_1914', 'house_number': '5'}).json  # 5 <= 10
212
    assert result['err'] == 0
213
    assert result['data'] == [{'id': 'ecole-hugo', 'text': 'École Hugo'}]
214
    result = app.get(url, params={'street_id': '75114_1914', 'house_number': '20'}).json  # 20 >= 11
215
    assert result['err'] == 0
216
    assert result['data'] == [{'id': 'ecole-hugo2', 'text': 'École Hugo 2'}]
217
    # bad searches
218
    result = app.get(url, params={'street_id': '75114_1915', 'house_number': 'abc'}, status=400).json
219
    assert result['err'] == 1
220
    assert result['err_desc'] == 'invalid value for parameter "house_number"'  # not an integer
221
    result = app.get(url, params={'house_number': '123'}, status=400).json
222
    assert result['err'] == 1
223
    assert result['err_desc'] == 'house_number requires a street_id'
224

  
225
    # access right is needed
226
    AccessRight.objects.all().delete()
227
    result = app.get(url, status=403).json
228
    assert result['err'] == 1
229
    assert 'PermissionDenied' in result['err_class']
230
    assert result['data'] is None
231

  
232

  
233
def test_sector_endpoint_export(app, sector):
234
    url = reverse(
235
        'generic-endpoint',
236
        kwargs={
237
            'connector': 'sector',
238
            'slug': sector.slug,
239
            'endpoint': 'export',
240
        },
241
    )
242
    resp = app.get(url)
243
    assert resp.headers['content-type'] == 'text/csv'
244
    assert resp.text.startswith(
245
        '"street_id","parity","min_housenumber","max_housenumber","sector_id","sector_name"'
246
    )
247
    assert len(resp.text.splitlines()) == 7
248
    sector.titles_in_first_line = False
249
    sector.save(import_csv=False)
250
    resp = app.get(url)
251
    assert resp.headers['content-type'] == 'text/csv'
252
    assert 'street_id' not in resp.text
253
    assert len(resp.text.splitlines()) == 6
254
    sector.titles_in_first_line = True
255
    sector.save()
256

  
257
    resp = app.get(url)
258
    assert resp.headers['content-type'] == 'text/csv'
259
    # "street_id","parity","min_housenumber","max_housenumber","sector_id","sector_name"
260
    # "75114_1913","all","","","ecole-hugo","École Hugo"
261
    # "75114_1914","all","","10","",""
262
    # "75114_1915","all","","","ecole-hugo2","École Hugo 2"
263
    # "75114_1914","all","11","","",""
264
    # "75114_1912","odd","","","gs-zola","Groupe Scolaire Zola"
265
    # "75114_1912","even","","","gs-moulin","Groupe Scolaire Moulin"
266
    assert len(resp.text.splitlines()) == 7
267
    assert resp.text.count('"all"') == 4
268
    assert resp.text.count('"odd"') == 1
269
    assert resp.text.count('"even"') == 1
270
    assert resp.text.count('"ecole-hugo"') == 1
271
    assert resp.text.count('"0"') == 0
272
    assert resp.text.count('"999999"') == 0
273

  
274
    # import -> export again
275
    initial_export = resp.text
276
    sector.csv_file = File(StringIO(initial_export), 'data.csv')
277
    sector.save()
278
    resp = app.get(url)
279
    assert resp.text == initial_export
280

  
281
    # modify export format
282
    resp = app.get(
283
        url, params={'odd': 'IMPAIRS', 'even': 'PAIRS', 'mix': 'TOUS', 'repeat': 'true', 'limits': 'true'}
284
    )
285
    assert len(resp.text.splitlines()) == 7
286
    assert resp.text.count('"TOUS"') == 4
287
    assert resp.text.count('"IMPAIRS"') == 1
288
    assert resp.text.count('"PAIRS"') == 1
289
    assert resp.text.count('"ecole-hugo"') == 2  # repeat
290
    assert resp.text.count('"0"') == 5  # limits
291
    assert resp.text.count('"999999"') == 5
292

  
293
    # access right is needed
294
    AccessRight.objects.all().delete()
295
    result = app.get(url, status=403).json
296
    assert result['err'] == 1
297
    assert 'PermissionDenied' in result['err_class']
298
    assert result['data'] is None
299

  
300

  
301
def test_sector_endpoint_update(app, sector):
302
    url = reverse(
303
        'generic-endpoint',
304
        kwargs={
305
            'connector': 'sector',
306
            'slug': sector.slug,
307
            'endpoint': 'update',
308
        },
309
    )
310
    assert sector.sector_set.count() == 4
311
    result = app.put(url, params=CSV_REORDERED, headers={'Content-Type': 'text/csv'}).json
312
    assert sector.sector_set.count() == 3
313
    assert result['err'] == 0
314
    assert len(result['data']) == 3
315
    assert result['updated'] == 'sector/test/api-uploaded-file.csv'
316

  
317
    result = app.put(url, params=CSV_MISSING_COLUMN, headers={'Content-Type': 'text/csv'}, status=400).json
318
    assert result['err'] == 1
319
    assert "missing column" in result['err_desc']
320

  
321
    result = app.put(url, params=CSV_REORDERED, headers={}, status=400).json
322
    assert result['err'] == 1
323
    assert "can't guess filename extension" in result['err_desc']
324

  
325
    # access right is needed
326
    AccessRight.objects.all().delete()
327
    result = app.put(url, params=CSV_REORDERED, headers={'Content-Type': 'text/csv'}, status=403).json
328
    assert result['err'] == 1
329
    assert 'PermissionDenied' in result['err_class']
330

  
331

  
332
@pytest.mark.parametrize('remove_files', [False, True])
333
def test_daily_clean(settings, remove_files, sector):
334
    settings.SECTOR_REMOVE_ON_CLEAN = remove_files
335

  
336
    sectordata_dir = os.path.dirname(sector.csv_file.path)
337
    other_dir = os.path.join(settings.MEDIA_ROOT, 'foo', sector.slug)
338
    os.makedirs(other_dir)
339

  
340
    # create additional file in sector dir
341
    with open(os.path.join(sectordata_dir, 'csv-file.csv'), 'w'):
342
        pass
343
    os.makedirs(os.path.join(sectordata_dir, 'not-a-file'))
344
    # create additional file in other dir
345
    with open(os.path.join(other_dir, 'csv-file.csv'), 'w'):
346
        pass
347

  
348
    call_command('cron', 'daily')
349

  
350
    # not changed
351
    assert os.listdir(other_dir) == ['csv-file.csv']
352
    # too soon to be removed
353
    dir_list = os.listdir(sectordata_dir)
354
    dir_list.sort()
355
    assert dir_list == ['csv-file.csv', 'not-a-file', 'sectorization.csv']
356

  
357
    orig_os_stat = os.stat
358

  
359
    def _fake_stat(arg, delta):
360
        faked = list(orig_os_stat(arg))
361
        faked[ST_MTIME] = (now() + delta).timestamp()
362
        return stat_result(faked)
363

  
364
    try:
365
        # 1 week ago but one minute too soon
366
        os.stat = lambda arg: _fake_stat(arg, datetime.timedelta(days=-7, minutes=1))
367
        call_command('cron', 'daily')
368

  
369
        # not changed
370
        assert os.listdir(other_dir) == ['csv-file.csv']
371
        # still too soon to be removed
372
        dir_list = os.listdir(sectordata_dir)
373
        dir_list.sort()
374
        assert dir_list == ['csv-file.csv', 'not-a-file', 'sectorization.csv']
375

  
376
        # 1 week ago
377
        os.stat = lambda arg: _fake_stat(arg, datetime.timedelta(days=-7))
378
        call_command('cron', 'daily')
379

  
380
        # not changed
381
        assert os.listdir(other_dir) == ['csv-file.csv']
382
        # removed or moved
383
        dir_list = os.listdir(sectordata_dir)
384
        dir_list.sort()
385
        if remove_files:
386
            assert dir_list == ['not-a-file', 'sectorization.csv']
387
        else:
388
            assert dir_list == ['not-a-file', 'sectorization.csv', 'unused-files']
389
            assert os.listdir(os.path.join(sectordata_dir, 'unused-files')) == ['csv-file.csv']
390

  
391
        # wrong storage directory, do nothing
392
        with open(os.path.join(other_dir, 'bar.csv'), 'w'):
393
            pass
394
        sector.csv_file.name = 'foo/%s/csv-file.csv' % sector.slug
395
        sector.save()
396
        assert sorted(os.listdir(other_dir)) == ['bar.csv', 'csv-file.csv']
397
        call_command('cron', 'daily')
398
        assert sorted(os.listdir(other_dir)) == ['bar.csv', 'csv-file.csv']
399

  
400
        # unknown file
401
        sector.csv_file.name = 'sector/%s/bar.csv' % sector.slug
402
        sector.save()
403
        call_command('cron', 'daily')
404
    finally:
405
        os.stat = orig_os_stat
406

  
407

  
408
def test_sector_manage_create(app, admin_user):
409
    app = login(app)
410
    response = app.get(reverse('create-connector', kwargs={'connector': 'sector'}))
411
    response.form.set('title', 'test title')
412
    response.form.set('slug', 'test-slug')
413
    response.form.set('description', 'test description')
414
    response.form.set('csv_file', webtest.Upload('test.csv', CSV.encode('utf-8'), 'application/octet-stream'))
415
    response = response.form.submit()
416
    assert response.location
417
    response = response.follow()
418
    assert 'test title' in response
419
    assert 'test description' in response
420
    assert SectorResource.objects.count() == 1
421
    resource = SectorResource.objects.get()
422
    assert resource.title == 'test title'
423
    assert resource.slug == 'test-slug'
424
    assert resource.description == 'test description'
425
    assert resource.csv_file.read() == CSV.encode('utf-8')
426
    assert resource.sector_set.count() == 4
0
-