Projet

Général

Profil

0001-add-sector-connector-56001.patch

Thomas Noël, 15 août 2021 22:48

Télécharger (36,9 ko)

Voir les différences:

Subject: [PATCH 1/2] add sector connector (#56001)

 passerelle/apps/sector/__init__.py            |   0
 passerelle/apps/sector/admin.py               |  42 ++
 .../apps/sector/migrations/0001_initial.py    | 116 ++++++
 passerelle/apps/sector/migrations/__init__.py |   0
 passerelle/apps/sector/models.py              | 371 +++++++++++++++++
 passerelle/settings.py                        |   1 +
 tests/test_sector.py                          | 383 ++++++++++++++++++
 7 files changed, 913 insertions(+)
 create mode 100644 passerelle/apps/sector/__init__.py
 create mode 100644 passerelle/apps/sector/admin.py
 create mode 100644 passerelle/apps/sector/migrations/0001_initial.py
 create mode 100644 passerelle/apps/sector/migrations/__init__.py
 create mode 100644 passerelle/apps/sector/models.py
 create mode 100644 tests/test_sector.py
passerelle/apps/sector/admin.py
1
# passerelle - uniform access to multiple data sources and services
2
# Copyright (C) 2021  Entr'ouvert
3
#
4
# This program is free software: you can redistribute it and/or modify it
5
# under the terms of the GNU Affero General Public License as published
6
# by the Free Software Foundation, either version 3 of the License, or
7
# (at your option) any later version.
8
#
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
# GNU Affero General Public License for more details.
13
#
14
# You should have received a copy of the GNU Affero General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

  
17
from django.contrib import admin
18

  
19
from passerelle.apps.sector.models import Sector, Sectorization
20

  
21

  
22
class SectorAdmin(admin.ModelAdmin):
23
    prepopulated_fields = {'slug': ('title',)}
24
    list_display = ('title', 'slug', 'resource')
25
    list_filter = ('resource',)
26

  
27

  
28
class SectorizationAdmin(admin.ModelAdmin):
29
    list_display = (
30
        'id',
31
        'street_id',
32
        'parity',
33
        'min_housenumber',
34
        'max_housenumber',
35
        'sector',
36
        'resource',
37
    )
38
    list_filter = ('sector__resource',)
39

  
40

  
41
admin.site.register(Sector, SectorAdmin)
42
admin.site.register(Sectorization, SectorizationAdmin)
passerelle/apps/sector/migrations/0001_initial.py
1
# Generated by Django 2.2.19 on 2021-08-15 19:40
2

  
3
import django.db.models.deletion
4
from django.db import migrations, models
5

  
6
import passerelle.apps.sector.models
7

  
8

  
9
class Migration(migrations.Migration):
10

  
11
    initial = True
12

  
13
    dependencies = [
14
        ('base', '0029_auto_20210202_1627'),
15
    ]
16

  
17
    operations = [
18
        migrations.CreateModel(
19
            name='Sector',
20
            fields=[
21
                (
22
                    'id',
23
                    models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID'),
24
                ),
25
                ('title', models.CharField(max_length=256, verbose_name='Title')),
26
                ('slug', models.CharField(max_length=128, verbose_name='Identifier')),
27
            ],
28
            options={
29
                'ordering': ['resource', 'slug'],
30
            },
31
        ),
32
        migrations.CreateModel(
33
            name='SectorResource',
34
            fields=[
35
                (
36
                    'id',
37
                    models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID'),
38
                ),
39
                ('title', models.CharField(max_length=50, verbose_name='Title')),
40
                ('slug', models.SlugField(unique=True, verbose_name='Identifier')),
41
                ('description', models.TextField(verbose_name='Description')),
42
                (
43
                    'csv_file',
44
                    models.FileField(
45
                        help_text='CSV file',
46
                        upload_to=passerelle.apps.sector.models.upload_to,
47
                        verbose_name='Sectorization file',
48
                    ),
49
                ),
50
                (
51
                    'titles_in_first_line',
52
                    models.BooleanField(
53
                        default=True,
54
                        help_text='If not, column titles are: street_id,parity,min_housenumber,max_housenumber,sector_id,sector_name, …',
55
                        verbose_name='First line defines column titles',
56
                    ),
57
                ),
58
                (
59
                    'users',
60
                    models.ManyToManyField(
61
                        blank=True,
62
                        related_name='_sectorresource_users_+',
63
                        related_query_name='+',
64
                        to='base.ApiUser',
65
                    ),
66
                ),
67
            ],
68
            options={
69
                'abstract': False,
70
            },
71
        ),
72
        migrations.CreateModel(
73
            name='Sectorization',
74
            fields=[
75
                (
76
                    'id',
77
                    models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID'),
78
                ),
79
                ('street_id', models.CharField(max_length=64, verbose_name='Street Identifier')),
80
                (
81
                    'parity',
82
                    models.SmallIntegerField(
83
                        choices=[(0, 'all'), (1, 'odd'), (2, 'even')],
84
                        default=0,
85
                        verbose_name='Parity of numbers',
86
                    ),
87
                ),
88
                (
89
                    'min_housenumber',
90
                    models.PositiveIntegerField(default=0, verbose_name='Minimal house number'),
91
                ),
92
                (
93
                    'max_housenumber',
94
                    models.PositiveIntegerField(default=999999, verbose_name='Maximal house number'),
95
                ),
96
                (
97
                    'sector',
98
                    models.ForeignKey(
99
                        on_delete=django.db.models.deletion.CASCADE, to='sector.Sector', verbose_name='Sector'
100
                    ),
101
                ),
102
            ],
103
            options={
104
                'ordering': ['street_id', 'min_housenumber', 'parity'],
105
            },
106
        ),
107
        migrations.AddField(
108
            model_name='sector',
109
            name='resource',
110
            field=models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='sector.SectorResource'),
111
        ),
112
        migrations.AlterUniqueTogether(
113
            name='sector',
114
            unique_together={('resource', 'slug')},
115
        ),
116
    ]
passerelle/apps/sector/models.py
1
# passerelle - uniform access to multiple data sources and services
2
# Copyright (C) 2021  Entr'ouvert
3
#
4
# This program is free software: you can redistribute it and/or modify it
5
# under the terms of the GNU Affero General Public License as published
6
# by the Free Software Foundation, either version 3 of the License, or
7
# (at your option) any later version.
8
#
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
# GNU Affero General Public License for more details.
13
#
14
# You should have received a copy of the GNU Affero General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

  
17
import csv
18
import datetime
19
import mimetypes
20
import os
21

  
22
from django.conf import settings
23
from django.core.exceptions import ValidationError
24
from django.core.files.base import ContentFile
25
from django.db import models, transaction
26
from django.db.models import Q
27
from django.http import HttpResponse
28
from django.utils.encoding import force_bytes, force_str, smart_text
29
from django.utils.timezone import now
30
from django.utils.translation import ugettext_lazy as _
31

  
32
from passerelle.base.models import BaseResource
33
from passerelle.utils.api import endpoint
34
from passerelle.utils.jsonresponse import APIError
35

  
36
PARITY_ALL = 0
37
PARITY_ODD = 1
38
PARITY_EVEN = 2
39

  
40
PARITY_CHOICES = (
41
    (PARITY_ALL, _('all')),
42
    (PARITY_ODD, _('odd')),
43
    (PARITY_EVEN, _('even')),
44
)
45

  
46
CSV_TITLES = ['street_id', 'parity', 'min_housenumber', 'max_housenumber', 'sector_id', 'sector_name']
47

  
48
MAX_HOUSENUMBER = 999_999
49

  
50

  
51
def upload_to(instance, filename):
52
    return '%s/%s/%s' % (instance.get_connector_slug(), instance.slug, filename)
53

  
54

  
55
class SectorResource(BaseResource):
56
    csv_file = models.FileField(
57
        _('Sectorization file'),
58
        upload_to=upload_to,
59
        help_text=_('CSV file'),
60
    )
61
    titles_in_first_line = models.BooleanField(
62
        _('First line defines column titles'),
63
        default=True,
64
        help_text=_('If not, column titles are: %s, …') % ','.join(CSV_TITLES),
65
    )
66

  
67
    category = _('Geographic information system')
68
    hide_description_fields = ['csv_file']
69

  
70
    def __str__(self):
71
        return '%s [%s]' % (self.title, self.slug)
72

  
73
    def daily(self):
74
        super().daily()
75
        self.clean_old_csv_files()
76

  
77
    def clean_old_csv_files(self):
78
        if not os.path.exists(self.csv_file.path):
79
            return
80
        base_dir = os.path.dirname(self.csv_file.path)
81
        if os.path.dirname(self.csv_file.name) != os.path.join(self.get_connector_slug(), self.slug):
82
            # path is not compliant with upload_to, do nothing
83
            return
84

  
85
        for filename in os.listdir(base_dir):
86
            filepath = os.path.join(base_dir, filename)
87
            if not os.path.isfile(filepath):
88
                continue
89
            if os.path.basename(self.csv_file.name) == filename:
90
                # current file
91
                continue
92
            mtime = os.stat(filepath).st_mtime
93
            if mtime > (now() + datetime.timedelta(days=-7)).timestamp():
94
                # too young
95
                continue
96

  
97
            if getattr(settings, 'SECTOR_REMOVE_ON_CLEAN', False) is True:
98
                # remove
99
                os.unlink(filepath)
100
            else:
101
                # move file in unused-files dir
102
                unused_dir = os.path.join(base_dir, 'unused-files')
103
                os.makedirs(unused_dir, exist_ok=True)
104
                os.rename(filepath, os.path.join(unused_dir, filename))
105

  
106
    def clean(self, *args, **kwargs):
107
        try:
108
            self.import_csv()
109
        except Exception as e:
110
            raise ValidationError(_('Invalid CSV file: %s') % e)
111
        return super().clean(*args, **kwargs)
112

  
113
    @transaction.atomic
114
    def import_csv(self):
115
        try:
116
            self.csv_file.seek(0)
117
            content = force_bytes(self.csv_file.read())
118
            content = force_str(content.decode('utf-8-sig', 'ignore').encode('utf-8'))  # handle BOM
119
            dialect = csv.Sniffer().sniff(content)
120
            reader = csv.reader(content.splitlines(), dialect)
121
        except Exception as e:
122
            raise ValidationError(_('failed to read CSV (%s)') % e)
123
        if self.titles_in_first_line:
124
            first_line = next(reader)
125
            titles = [name.strip().lower() for name in first_line]
126
            if not set(CSV_TITLES).issubset(titles):
127
                raise ValidationError(
128
                    _('missing column(s) in header: %s.') % ', '.join(set(CSV_TITLES) - set(titles))
129
                )
130
        else:
131
            titles = CSV_TITLES
132
        indexes = [titles.index(t) for t in titles if t]
133
        captions = [titles[i] for i in indexes]
134

  
135
        # now ready to import data, first delete all sectors
136
        # sectorization will also be delete (cascade)
137
        # (will be cancelled by transaction on any import error)
138
        self.sector_set.all().delete()
139

  
140
        def get_cell(row, index):
141
            try:
142
                return row[index]
143
            except IndexError:
144
                return ''
145

  
146
        sector_id = sector_name = None
147
        sectors = {}
148
        for row in reader:
149
            if not row:
150
                continue  # do not consider empty lines
151
            row = [smart_text(x).strip() for x in row]
152
            row = {caption: get_cell(row, index) for caption, index in zip(captions, indexes)}
153

  
154
            if row['sector_id']:
155
                sector_id = row['sector_id']
156
                sector_name = row['sector_name'] or row['sector_id']
157
            if not sector_id:
158
                raise ValidationError(_('missing sector_id, line %s') % reader.line_num)
159
            if not row['street_id']:
160
                raise ValidationError(_('missing street_id, line %s') % reader.line_num)
161
            if not row['parity']:
162
                parity = PARITY_ALL
163
            elif row['parity'].lower()[0] in (str(PARITY_EVEN), 'e', 'p'):  # p = pair (even, in french)
164
                parity = PARITY_EVEN
165
            elif row['parity'].lower()[0] in (str(PARITY_ODD), 'o', 'i'):  # i = impair (odd, in french)
166
                parity = PARITY_ODD
167
            else:
168
                parity = PARITY_ALL
169
            try:
170
                min_housenumber = int(row['min_housenumber'])
171
            except (ValueError, TypeError):
172
                min_housenumber = 0
173
            try:
174
                max_housenumber = int(row['max_housenumber'])
175
            except (ValueError, TypeError):
176
                max_housenumber = MAX_HOUSENUMBER
177

  
178
            if sector_id not in sectors:
179
                sectors[sector_id] = self.sector_set.create(resource=self, slug=sector_id, title=sector_name)
180
            sectors[sector_id].sectorization_set.create(
181
                sector=sectors[sector_id],
182
                street_id=row['street_id'],
183
                parity=parity,
184
                min_housenumber=min_housenumber,
185
                max_housenumber=max_housenumber,
186
            )
187

  
188
    @endpoint(
189
        description=_('Update sectorization with a CSV file'),
190
        display_category=_('Management'),
191
        perm='can_access',
192
        methods=['put'],
193
    )
194
    def update(self, request):
195
        ext = mimetypes.guess_extension(request.content_type)
196
        if not ext:
197
            raise APIError(
198
                "can't guess filename extension for '%s' content type" % request.content_type, http_status=400
199
            )
200
        name = self.csv_file.storage.get_available_name('api-uploaded-file' + ext)
201
        self.csv_file = ContentFile(content=request.body, name=name)
202
        try:
203
            self.clean()
204
        except ValidationError as e:
205
            raise APIError(e, http_status=400)
206
        self.save()
207
        return {
208
            'updated': self.csv_file.name,
209
            'data': [
210
                {
211
                    'id': sector.slug,
212
                    'text': sector.title,
213
                }
214
                for sector in self.sector_set.all()
215
            ],
216
        }
217

  
218
    @endpoint(
219
        description=_('Get sectorization as a CSV file'),
220
        display_category=_('Management'),
221
        perm='can_access',
222
        parameters={
223
            'even': {
224
                'description': _('Even numbers indicator (default: %s)') % PARITY_EVEN,
225
                'example_value': 'P',
226
            },
227
            'odd': {
228
                'description': _('Odd numbers indicator (default: %s)') % PARITY_ODD,
229
                'example_value': 'I',
230
            },
231
            'mix': {
232
                'description': _('Even or odd numbers indicator (default: %s)') % PARITY_ALL,
233
                'example_value': '',
234
            },
235
            'limits': {'description': _('Show housenumber min/max (0/%s)') % MAX_HOUSENUMBER, 'type': 'bool'},
236
            'repeat': {'description': _('Repeat sector id and name on all lines'), 'type': 'bool'},
237
        },
238
    )
239
    def export(self, request, even=None, odd=None, mix=None, limits=False, repeat=False):
240
        response = HttpResponse(content_type='text/csv')
241
        date = now().strftime('%Y-%m-%d_%H:%M')
242
        response['Content-Disposition'] = 'attachment; filename="sector-%s-%s.csv"' % (self.slug, date)
243
        writer = csv.writer(response, delimiter=',', quotechar='"', quoting=csv.QUOTE_ALL)
244
        if self.titles_in_first_line:
245
            writer.writerow(CSV_TITLES)
246
        parity = dict(PARITY_CHOICES)
247
        if even is not None:
248
            parity[PARITY_EVEN] = even
249
        if odd is not None:
250
            parity[PARITY_ODD] = odd
251
        if mix is not None:
252
            parity[PARITY_ALL] = mix
253
        for sector in self.sector_set.all().order_by('slug'):
254
            first = True
255
            for sectorization in sector.sectorization_set.all().order_by('street_id'):
256
                writer.writerow(
257
                    [
258
                        sectorization.street_id,
259
                        parity.get(sectorization.parity, mix),
260
                        sectorization.min_housenumber if (limits or sectorization.min_housenumber) else '',
261
                        sectorization.max_housenumber
262
                        if (limits or sectorization.max_housenumber < MAX_HOUSENUMBER)
263
                        else '',
264
                        sector.slug if (repeat or first) else '',
265
                        sector.title if (repeat or first) else '',
266
                    ]
267
                )
268
                first = False
269
        return response
270

  
271
    @endpoint(
272
        name='sectors',
273
        description=_('List of Sectors'),
274
        perm='can_access',
275
        display_category=_('Data sources'),
276
        parameters={
277
            'id': {'description': _('Sector identifier (slug)')},
278
            'q': {'description': _('Filter by Sector Title or Identifier')},
279
            'street_id': {'description': _('Get sectors for this Street identifier')},
280
            'house_number': {
281
                'description': _('Get sectors by this House Number (requires a street_id)'),
282
                'type': 'integer',
283
            },
284
        },
285
    )
286
    def sectors(self, request, q=None, id=None, street_id=None, house_number=None):
287
        if house_number and not street_id:
288
            raise APIError('house_number requires a street_id', http_status=400)
289

  
290
        # search by street and house number
291
        if street_id:
292
            query = Sectorization.objects.filter(sector__resource=self, street_id=street_id)
293
            if house_number is not None:
294
                house_number = int(house_number)
295
                query = query.filter(min_housenumber__lte=house_number, max_housenumber__gte=house_number)
296
                parity = PARITY_ODD if house_number % 2 else PARITY_EVEN
297
                query = query.filter(Q(parity=PARITY_ALL) | Q(parity=parity))
298
            else:
299
                query = query.filter(parity=PARITY_ALL, min_housenumber=0, max_housenumber=MAX_HOUSENUMBER)
300
            return {
301
                'data': [
302
                    {
303
                        'id': sectorization.sector.slug,
304
                        'text': sectorization.sector.title,
305
                    }
306
                ]
307
                for sectorization in query.reverse()
308
            }
309

  
310
        # list of sectors
311
        query = self.sector_set.all()
312
        if id is not None:
313
            query = query.filter(slug=id)
314
        elif q is not None:
315
            query = query.filter(Q(slug__icontains=q) | Q(title__icontains=q))
316
        return {
317
            'data': [
318
                {
319
                    'id': sector.slug,
320
                    'text': sector.title,
321
                }
322
                for sector in query
323
            ]
324
        }
325

  
326

  
327
class Sector(models.Model):
328
    resource = models.ForeignKey(SectorResource, on_delete=models.CASCADE)
329
    title = models.CharField(max_length=256, verbose_name=_('Title'))
330
    slug = models.CharField(max_length=128, verbose_name=_('Identifier'))
331

  
332
    class Meta:
333
        ordering = ['resource', 'slug']
334
        unique_together = ('resource', 'slug')
335

  
336
    def __str__(self):
337
        return '%s > %s [%s]' % (self.resource, self.title, self.slug)
338

  
339

  
340
class Sectorization(models.Model):
341
    sector = models.ForeignKey(Sector, on_delete=models.CASCADE, verbose_name=_('Sector'))
342
    street_id = models.CharField(max_length=64, verbose_name=_('Street Identifier'))
343
    parity = models.SmallIntegerField(
344
        choices=PARITY_CHOICES, default=PARITY_ALL, verbose_name=_('Parity of numbers')
345
    )
346
    min_housenumber = models.PositiveIntegerField(default=0, verbose_name=_('Minimal house number'))
347
    max_housenumber = models.PositiveIntegerField(
348
        default=MAX_HOUSENUMBER, verbose_name=_('Maximal house number')
349
    )
350

  
351
    class Meta:
352
        ordering = ['street_id', 'min_housenumber', 'parity']
353

  
354
    def __str__(self):
355
        return '%s, parity:%s, min:%s, max:%s → %s' % (
356
            self.street_id,
357
            dict(PARITY_CHOICES).get(self.parity),
358
            self.min_housenumber,
359
            self.max_housenumber,
360
            self.sector,
361
        )
362

  
363
    @property
364
    def resource(self):
365
        return self.sector.resource
366

  
367
    def clean(self):
368
        if not self.max_housenumber or self.max_housenumber > MAX_HOUSENUMBER:
369
            self.max_housenumber = MAX_HOUSENUMBER
370
        if self.min_housenumber > self.max_housenumber:
371
            raise ValidationError(_('Minimal house number may not be lesser than maximal house number.'))
passerelle/settings.py
158 158
    'passerelle.apps.oxyd',
159 159
    'passerelle.apps.phonecalls',
160 160
    'passerelle.apps.photon',
161
    'passerelle.apps.sector',
161 162
    'passerelle.apps.solis',
162 163
    'passerelle.apps.twilio',
163 164
    'passerelle.apps.vivaticket',
tests/test_sector.py
1
# passerelle - uniform access to multiple data sources and services
2
# Copyright (C) 2021 Entr'ouvert
3
#
4
# This program is free software: you can redistribute it and/or modify it
5
# under the terms of the GNU Affero General Public License as published
6
# by the Free Software Foundation, either version 3 of the License, or
7
# (at your option) any later version.
8
#
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
# GNU Affero General Public License for more details.
13
#
14
# You should have received a copy of the GNU Affero General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

  
17
import datetime
18
import os
19
from posix import stat_result
20
from stat import ST_MTIME
21

  
22
import pytest
23
import utils
24
from django.core.exceptions import ValidationError
25
from django.core.files import File
26
from django.core.management import call_command
27
from django.urls import reverse
28
from django.utils.encoding import force_str, force_text
29
from django.utils.six import StringIO
30
from django.utils.timezone import now
31

  
32
from passerelle.apps.sector.models import Sectorization, SectorResource
33
from passerelle.base.models import AccessRight
34

  
35
CSV = """street_id,parity,min_housenumber,max_housenumber,sector_id,sector_name
36
75114_1912,P,,, gs-moulin, Groupe Scolaire Moulin
37
75114_1912,I,0,999999,gs-zola,Groupe Scolaire Zola
38
75114_1913,N,0,999999,ecole-hugo,École Hugo
39
75114_1914,,,10,ecole-hugo, École Hugo
40

  
41
75114_1914,,11,, ecole-hugo2, École Hugo 2
42
75114_1915,,,,ecole-hugo2 , École Hugo 2
43
"""
44

  
45
CSV_BOM = force_str(force_text(CSV, 'utf-8').encode('utf-8-sig'))
46

  
47
CSV_NO_FIRST_LINE = """75114_1912,P,,, gs-moulin, Groupe Scolaire Moulin
48
75114_1912,I,0,999999,gs-zola,Groupe Scolaire Zola
49
75114_1913,N,0,999999,ecole-hugo,École Hugo
50
75114_1914,,,10,ecole-hugo, École Hugo
51

  
52
75114_1914,,11,, ecole-hugo, École Hugo"""
53

  
54
CSV_REORDERED = """sector_id,sector_name,street_id,parity,min_housenumber,max_housenumber,foo,bar
55
gs-moulin, Groupe Scolaire Moulin, 75114_1912,P,,,aaa,bbb
56
gs-zola,Groupe Scolaire Zola,75114_1912,I,0,999999,xxx,yyy
57
ecole-hugo,École Hugo,75114_1913,N,0,999999,000,1
58
,,75114_1999,N,0,999999,,
59
"""
60

  
61
CSV_MISSING_COLUMN = """street_id,min_housenumber,max_housenumber,sector_id,sector_name
62
75114_1912,,,foo,
63
,0,999999,gs-zola,Groupe Scolaire Zola"""
64

  
65
CSV_MISSING_SECTOR = """street_id,parity,min_housenumber,max_housenumber,sector_id,sector_name
66
75114_1912,P,,, ,
67
75114_1912,I,0,999999,gs-zola,Groupe Scolaire Zola"""
68

  
69
CSV_MISSING_STREET = """street_id,parity,min_housenumber,max_housenumber,sector_id,sector_name
70
75114_1912,P,,,foo,
71
,I,0,999999,gs-zola,Groupe Scolaire Zola"""
72

  
73

  
74
@pytest.fixture
75
def sector(db):
76
    return utils.setup_access_rights(
77
        SectorResource.objects.create(
78
            slug='test',
79
            title='title',
80
            csv_file=File(StringIO(CSV), 'sectorization.csv'),
81
        )
82
    )
83

  
84

  
85
def test_sector_creation(sector):
86
    assert sector.sector_set.count() == 0
87
    assert '%s' % sector == 'title [test]'
88

  
89
    sector.clean()
90
    assert sector.sector_set.count() == 4
91
    hugo = sector.sector_set.get(slug='ecole-hugo')
92
    assert Sectorization.objects.filter(sector=hugo).count() == 2
93
    hugo2 = sector.sector_set.get(slug='gs-zola')
94
    assert Sectorization.objects.filter(sector=hugo2).count() == 1
95

  
96

  
97
def test_sector_creation_bom(sector):
98
    sector.clean()
99
    assert sector.sector_set.count() == 4
100

  
101

  
102
def test_sector_creation_nofirstline(sector):
103
    sector.csv_file = File(StringIO(CSV_NO_FIRST_LINE), 'sectorization.csv')
104
    with pytest.raises(ValidationError, match='Invalid CSV file:.*missing column'):
105
        sector.clean()
106
    assert sector.sector_set.count() == 0
107
    sector.titles_in_first_line = False
108
    sector.clean()
109
    assert sector.sector_set.count() == 3
110

  
111

  
112
def test_sector_reordered(sector):
113
    sector.csv_file = File(StringIO(CSV_REORDERED), 'sectorization.csv')
114
    sector.clean()
115
    assert sector.sector_set.count() == 3
116

  
117

  
118
def test_sector_empty_file(sector):
119
    sector.csv_file = File(StringIO(''), 'sectorization.csv')
120
    with pytest.raises(ValidationError, match='Invalid CSV file:.*failed to read CSV'):
121
        sector.clean()
122

  
123

  
124
def test_sector_missing_sector(sector):
125
    sector.csv_file = File(StringIO(CSV_MISSING_SECTOR), 'sectorization.csv')
126
    with pytest.raises(ValidationError, match='Invalid CSV file:.*missing sector_id, line 2'):
127
        sector.clean()
128

  
129

  
130
def test_sector_missing_street(sector):
131
    sector.csv_file = File(StringIO(CSV_MISSING_STREET), 'sectorization.csv')
132
    with pytest.raises(ValidationError, match='Invalid CSV file:.*missing street_id, line 3'):
133
        sector.clean()
134

  
135

  
136
def test_sector_missing_column(sector):
137
    sector.csv_file = File(StringIO(CSV_MISSING_COLUMN), 'sectorization.csv')
138
    with pytest.raises(ValidationError, match='Invalid CSV file:.*missing column.*: parity\.'):
139
        sector.clean()
140

  
141

  
142
def test_sector_endpoint_sectors(app, sector):
143
    url = reverse(
144
        'generic-endpoint',
145
        kwargs={
146
            'connector': 'sector',
147
            'slug': sector.slug,
148
            'endpoint': 'sectors',
149
        },
150
    )
151
    result = app.get(url).json
152
    assert result['err'] == 0
153
    assert len(result['data']) == 0
154
    sector.clean()
155
    result = app.get(url).json
156
    assert result['err'] == 0
157
    assert len(result['data']) == 4
158
    assert {'id': 'ecole-hugo', 'text': 'École Hugo'} in result['data']
159

  
160
    result = app.get(url, params={'id': 'ecole-hugo'}).json
161
    assert result['err'] == 0
162
    assert len(result['data']) == 1
163
    assert {'id': 'ecole-hugo', 'text': 'École Hugo'} in result['data']
164

  
165
    result = app.get(url, params={'q': 'hugo'}).json
166
    assert result['err'] == 0
167
    assert len(result['data']) == 2
168
    assert {'id': 'ecole-hugo', 'text': 'École Hugo'} in result['data']
169
    assert {'id': 'ecole-hugo2', 'text': 'École Hugo 2'} in result['data']
170

  
171
    result = app.get(url, params={'q': 'foobar'}).json
172
    assert result['err'] == 0
173
    assert len(result['data']) == 0
174

  
175
    # search a sector by street and house number
176
    result = app.get(url, params={'street_id': '75114_1915'}).json
177
    assert result['err'] == 0
178
    assert result['data'] == [{'id': 'ecole-hugo2', 'text': 'École Hugo 2'}]
179
    result = app.get(url, params={'street_id': '75114_1915', 'house_number': '123'}).json
180
    assert result['err'] == 0
181
    assert result['data'] == [{'id': 'ecole-hugo2', 'text': 'École Hugo 2'}]
182
    result = app.get(url, params={'street_id': '75114_1912', 'house_number': '12'}).json  # even
183
    assert result['err'] == 0
184
    assert result['data'] == [{'id': 'gs-moulin', 'text': 'Groupe Scolaire Moulin'}]
185
    result = app.get(url, params={'street_id': '75114_1912', 'house_number': '13'}).json  # odd
186
    assert result['err'] == 0
187
    assert result['data'] == [{'id': 'gs-zola', 'text': 'Groupe Scolaire Zola'}]
188
    result = app.get(url, params={'street_id': '75114_1914', 'house_number': '5'}).json  # 5 <= 10
189
    assert result['err'] == 0
190
    assert result['data'] == [{'id': 'ecole-hugo', 'text': 'École Hugo'}]
191
    result = app.get(url, params={'street_id': '75114_1914', 'house_number': '20'}).json  # 20 >= 11
192
    assert result['err'] == 0
193
    assert result['data'] == [{'id': 'ecole-hugo2', 'text': 'École Hugo 2'}]
194
    # bad searches
195
    result = app.get(url, params={'street_id': '75114_1915', 'house_number': 'abc'}, status=400).json
196
    assert result['err'] == 1
197
    assert result['err_desc'] == 'invalid value for parameter "house_number"'  # not an integer
198
    result = app.get(url, params={'house_number': '123'}, status=400).json
199
    assert result['err'] == 1
200
    assert result['err_desc'] == 'house_number requires a street_id'
201

  
202
    # access right is needed
203
    AccessRight.objects.all().delete()
204
    result = app.get(url, status=403).json
205
    assert result['err'] == 1
206
    assert 'PermissionDenied' in result['err_class']
207
    assert result['data'] is None
208

  
209

  
210
def test_sector_endpoint_export(app, sector):
211
    url = reverse(
212
        'generic-endpoint',
213
        kwargs={
214
            'connector': 'sector',
215
            'slug': sector.slug,
216
            'endpoint': 'export',
217
        },
218
    )
219
    resp = app.get(url)
220
    assert resp.headers['content-type'] == 'text/csv'
221
    assert resp.text.startswith(
222
        '"street_id","parity","min_housenumber","max_housenumber","sector_id","sector_name"'
223
    )
224
    assert len(resp.text.splitlines()) == 1
225
    sector.titles_in_first_line = False
226
    sector.save()
227
    resp = app.get(url)
228
    assert resp.headers['content-type'] == 'text/csv'
229
    assert resp.text == ''
230
    sector.titles_in_first_line = True
231
    sector.save()
232

  
233
    sector.clean()
234
    resp = app.get(url)
235
    assert resp.headers['content-type'] == 'text/csv'
236
    # "street_id","parity","min_housenumber","max_housenumber","sector_id","sector_name"
237
    # "75114_1913","all","","","ecole-hugo","École Hugo"
238
    # "75114_1914","all","","10","",""
239
    # "75114_1915","all","","","ecole-hugo2","École Hugo 2"
240
    # "75114_1914","all","11","","",""
241
    # "75114_1912","odd","","","gs-zola","Groupe Scolaire Zola"
242
    # "75114_1912","even","","","gs-moulin","Groupe Scolaire Moulin"
243
    assert len(resp.text.splitlines()) == 7
244
    assert resp.text.count('"all"') == 4
245
    assert resp.text.count('"odd"') == 1
246
    assert resp.text.count('"even"') == 1
247
    assert resp.text.count('"ecole-hugo"') == 1
248
    assert resp.text.count('"0"') == 0
249
    assert resp.text.count('"999999"') == 0
250

  
251
    # import -> export again
252
    initial_export = resp.text
253
    sector.csv_file = File(StringIO(initial_export), 'data.csv')
254
    sector.clean()
255
    resp = app.get(url)
256
    assert resp.text == initial_export
257

  
258
    # modify export format
259
    resp = app.get(
260
        url, params={'odd': 'IMPAIRS', 'even': 'PAIRS', 'mix': 'TOUS', 'repeat': 'true', 'limits': 'true'}
261
    )
262
    assert len(resp.text.splitlines()) == 7
263
    assert resp.text.count('"TOUS"') == 4
264
    assert resp.text.count('"IMPAIRS"') == 1
265
    assert resp.text.count('"PAIRS"') == 1
266
    assert resp.text.count('"ecole-hugo"') == 2  # repeat
267
    assert resp.text.count('"0"') == 5  # limits
268
    assert resp.text.count('"999999"') == 5
269

  
270
    # access right is needed
271
    AccessRight.objects.all().delete()
272
    result = app.get(url, status=403).json
273
    assert result['err'] == 1
274
    assert 'PermissionDenied' in result['err_class']
275
    assert result['data'] is None
276

  
277

  
278
def test_sector_endpoint_update(app, sector):
279
    url = reverse(
280
        'generic-endpoint',
281
        kwargs={
282
            'connector': 'sector',
283
            'slug': sector.slug,
284
            'endpoint': 'update',
285
        },
286
    )
287
    sector.clean()
288
    assert sector.sector_set.count() == 4
289
    result = app.put(url, params=CSV_REORDERED, headers={'Content-Type': 'text/csv'}).json
290
    assert sector.sector_set.count() == 3
291
    assert result['err'] == 0
292
    assert len(result['data']) == 3
293
    assert result['updated'] == 'sector/test/api-uploaded-file.csv'
294

  
295
    result = app.put(url, params=CSV_MISSING_COLUMN, headers={'Content-Type': 'text/csv'}, status=400).json
296
    assert result['err'] == 1
297
    assert "missing column" in result['err_desc']
298

  
299
    result = app.put(url, params=CSV_REORDERED, headers={}, status=400).json
300
    assert result['err'] == 1
301
    assert "can't guess filename extension" in result['err_desc']
302

  
303
    # access right is needed
304
    AccessRight.objects.all().delete()
305
    result = app.put(url, params=CSV_REORDERED, headers={'Content-Type': 'text/csv'}, status=403).json
306
    assert result['err'] == 1
307
    assert 'PermissionDenied' in result['err_class']
308

  
309

  
310
@pytest.mark.parametrize('remove_files', [False, True])
311
def test_daily_clean(settings, remove_files, sector):
312
    settings.SECTOR_REMOVE_ON_CLEAN = remove_files
313

  
314
    sectordata_dir = os.path.dirname(sector.csv_file.path)
315
    other_dir = os.path.join(settings.MEDIA_ROOT, 'foo', sector.slug)
316
    os.makedirs(other_dir)
317

  
318
    # create additional file in sector dir
319
    with open(os.path.join(sectordata_dir, 'csv-file.csv'), 'w'):
320
        pass
321
    os.makedirs(os.path.join(sectordata_dir, 'not-a-file'))
322
    # create additional file in other dir
323
    with open(os.path.join(other_dir, 'csv-file.csv'), 'w'):
324
        pass
325

  
326
    call_command('cron', 'daily')
327

  
328
    # not changed
329
    assert os.listdir(other_dir) == ['csv-file.csv']
330
    # too soon to be removed
331
    dir_list = os.listdir(sectordata_dir)
332
    dir_list.sort()
333
    assert dir_list == ['csv-file.csv', 'not-a-file', 'sectorization.csv']
334

  
335
    orig_os_stat = os.stat
336

  
337
    def _fake_stat(arg, delta):
338
        faked = list(orig_os_stat(arg))
339
        faked[ST_MTIME] = (now() + delta).timestamp()
340
        return stat_result(faked)
341

  
342
    try:
343
        # 1 week ago but one minute too soon
344
        os.stat = lambda arg: _fake_stat(arg, datetime.timedelta(days=-7, minutes=1))
345
        call_command('cron', 'daily')
346

  
347
        # not changed
348
        assert os.listdir(other_dir) == ['csv-file.csv']
349
        # still too soon to be removed
350
        dir_list = os.listdir(sectordata_dir)
351
        dir_list.sort()
352
        assert dir_list == ['csv-file.csv', 'not-a-file', 'sectorization.csv']
353

  
354
        # 1 week ago
355
        os.stat = lambda arg: _fake_stat(arg, datetime.timedelta(days=-7))
356
        call_command('cron', 'daily')
357

  
358
        # not changed
359
        assert os.listdir(other_dir) == ['csv-file.csv']
360
        # removed or moved
361
        dir_list = os.listdir(sectordata_dir)
362
        dir_list.sort()
363
        if remove_files:
364
            assert dir_list == ['not-a-file', 'sectorization.csv']
365
        else:
366
            assert dir_list == ['not-a-file', 'sectorization.csv', 'unused-files']
367
            assert os.listdir(os.path.join(sectordata_dir, 'unused-files')) == ['csv-file.csv']
368

  
369
        # wrong storage directory, do nothing
370
        with open(os.path.join(other_dir, 'bar.csv'), 'w'):
371
            pass
372
        sector.csv_file.name = 'foo/%s/csv-file.csv' % sector.slug
373
        sector.save()
374
        assert sorted(os.listdir(other_dir)) == ['bar.csv', 'csv-file.csv']
375
        call_command('cron', 'daily')
376
        assert sorted(os.listdir(other_dir)) == ['bar.csv', 'csv-file.csv']
377

  
378
        # unknown file
379
        sector.csv_file.name = 'sector/%s/bar.csv' % sector.slug
380
        sector.save()
381
        call_command('cron', 'daily')
382
    finally:
383
        os.stat = orig_os_stat
0
-